kafka系列

Kafka 原理分析

Posted by Jason Lee on 2019-09-18

Kafka概述

Kafka 定义

Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于 大数据实时处理领域。

消息队列好处

可以参考我的另一篇博文MQ可以解决哪些实际问题

优缺点

  • 优点:

    • 可扩展。Kafka集群可以透明的扩展,增加新的服务器进集群。
    • 高性能。Kafka性能远超过传统的ActiveMQ、RabbitMQ等,Kafka支持Batch操作。
    • 容错性。Kafka每个Partition数据会复制到几台服务器,当某个Broker失效时,Zookeeper将通知生产者和消费者从而使用其他的Broker。
  • 缺点:

    • 重复消息。Kafka保证每条消息至少送达一次,虽然几率很小,但一条消息可能被送达多次。
    • 消息乱序。Kafka某一个固定的Partition内部的消息是保证有序的,如果一个Topic有多个Partition,partition之间的消息送达不保证有序。
    • 复杂性。Kafka需要Zookeeper的支持,Topic一般需要人工创建,部署和维护比一般MQ成本更高。

Kafka应用场景

Kafka是一个分布式的,可划分的,冗余备份的持久性的日志服务。它主要用于处理活跃的流式数据。在大数据领域的数据采集。离线数据采集等日志收集方面,Kafka几乎是规范。
两个作用:

  • 降低系统组网复杂度。
    降低编程复杂度,各个子系统不在是相互协商接口,各个子系统类似插口插在插座上,Kafka承担高速数据总线的作用。

  • Kafka主要特点:

    1. 同时为发布和订阅提供高吞吐量。据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)。
    2. 可进行持久化操作。将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。通过将数据持久化到硬盘以及replication防止数据丢失。
    3. 分布式系统,易于向外扩展。所有的producer、broker和consumer都会有多个,均为分布式的。无需停机即可扩展机器。
    4. 消息被处理的状态是在consumer端维护,而不是由server端维护。当失败时能自动平衡。
    5. 支持online和offline的场景。
  • Kafka的设计要点:

    1. 直接使用linux 文件系统的cache,来高效缓存数据。
    2. 采用linux Zero-Copy提高发送性能。传统的数据发送需要发送4次上下文切换,采用sendfile系统调用之后,数据直接在内核态交换,系统上下文切换减少 为2次。根据测试结果,可以提高60%的数据发送性能。
    3. Zero-Copy详细的技术

Kafka架构

Kafka架构图

  1. Producer :消息生产者,就是向 kafka broker 发消息的客户端;
  2. Consumer :消息消费者,向 kafka broker 取消息的客户端;
  3. Consumer Group (CG):消费者组,由多个 consumer 组成。消费者组内每个消费者负 责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所 有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
  4. Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。
  5. Topic :可以理解为一个队列,生产者和消费者面向的都是一个 topic;
  6. Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上, 一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列;
  7. Replica:副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本, 一个 leader 和若干个 follower。
  8. leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对 象都是 leader。
  9. follower:每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据 的同步。leader 发生故障时,某个 follower 会成为新的 follower。

Kafka深入理解

Kafka 生产者

Kafka 工作流程及文件存储机制


Kafka 中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向 topic 的。

topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log文件,该 log 文件中存储的就是 producer 生产的数据。Producer 生产的数据会被不断追加到该 log 文件末端,且每条数据都有自己的 offset。消费者组中的每个消费者,都会实时记录自己 消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费。

Kafka的日志存贮

在kafka的配置文件当中有个名叫log.dirs=/opt/module/kafka/logs 的配置文件,主要是存放的是 partition 用于存放日志文件
来看一下日志文件的分布图

partiton的分布

下面以一个Kafka集群中4个Broker举例,创建1个topic包含4个Partition,2 Replication;数据Producer流动如图所示:

  • (1)指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
  • (2)没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;
  • (3)既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后 面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的round-robin 算法。

ptopic中partition存储分布

下面示意图形象说明了partition中文件存储方式:

每个partion(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除。

每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。

  • partiton中segment文件存储结构

    • segment file组成:由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀”.index”和“.log”分别表示为segment索引文件、数据文件.

    • segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。

    建一个topicXXX包含1 partition,设置每个segment大小为500MB,并启动producer向Kafka broker写入大量数据,如下图2所示segment文件列表形象说明了上述2个规则:

    接下来我们看一下 kafka segment 文件的详解:

    上述图中索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中message的物理偏移地址。 其中以索引文件中元数据3,497为例,依次在数据文件中表示第3个message(在全局partiton表示第368772个message)、以及该消息的物理偏移地址为497。

    从上述图3了解到segment data file由许多message组成,下面详细说明message物理结构如下:

    关键字解释说明
    8 byte offset在parition(分区)内的每条消息都有一个有序的id号,这个id号被称为偏移(offset),它可以唯一确定每条消息在parition(分区)内的位置。即offset表示partiion的第多少message
    4 byte message sizemessage大小
    4 byte CRC32用crc32校验message
    1 byte “magic”表示本次发布Kafka服务程序协议版本号
    1 byte “attributes”表示为独立版本、或标识压缩类型、或编码类型。
    4 byte key length表示key的长度,当key为-1时,K byte key字段不填
    K byte key可选
    value bytes payload表示实际消息数据。

在partition中如何通过offset查找message

例如读取offset=368776的message,需要通过下面2个步骤查找。

  • 第一步查找segment file 上述图2为例,其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.第二个文件00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.同样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就可以快速定位到具体文件。 当offset=368776时定位到00000000000000368769.index|log

  • 第二步通过segment file查找message 通过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log顺序查找直到offset=368776为止。

这样做的优点,segment index file采取稀疏索引存储方式,它减少索引文件大小,通过mmap可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。

数据可靠性保证

  • 副本数据同步策略以及

为了提高消息的可靠性,Kafka每个topic的partition有N个副本(replicas),其中N(大于等于1)是topic的复制因子(replica fator)的个数。Kafka通过多副本机制实现故障自动转移,当Kafka集群中一个broker失效情况下仍然保证服务可用。

在Kafka中发生复制时确保partition的日志能有序地写到其他节点上,N个replicas中,其中一个replica为leader,其他都为follower, leader处理partition的所有读写请求,与此同时,follower会被动定期地去复制leader上的数据。

假设我们有3个border, id 分别为 0,1,2,分别将信息放在 border0, border1, border2 目录下。此时我们创建一个 my-replicated-topic主题 我们将topic 分为3个分区和3个副本执行

bin/kafka-topics.sh --create --bootstrap-server localhost:9000 --replication-factor 3 --partitions 3 --topic my-replicated-topic
然后我们查看一下 topic 的分布情况

bin/kafka-topics.sh --describe --bootstrap-server localhost:9000 --topic my-replicated-topic

1
2
3
4
 Topic:my-replicated-topic	PartitionCount:3	ReplicationFactor:3	Configs:segment.bytes=536870912
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 0,1,2
Topic: my-replicated-topic Partition: 1 Leader: 1 Replicas: 1,0,2 Isr: 0,1,2
Topic: my-replicated-topic Partition: 2 Leader: 0 Replicas: 0,2,1 Isr: 0,1,2

我来解释一下 Partition 0 ,代表0号分区,由于我们有3个分区和3个副本,所以Replicas中为 0号分区所在的border 分别为 2,1,0 而leader :2 代表2 好分区负责读写 至于这里头个ISR 后面在继续分析

  • ACK应答机制

为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个 partition 收到 producer 发送的数据后,都需要向 producer 发送 ack(acknowledgement 确认收到),如果 producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。

kafka 提供了三种ACK机制

  • 0:producer 不等待 broker 的 ack,这一操作提供了一个最低的延迟,broker 一接收到还 没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据;

  • 1:producer 等待 broker 的 ack,partition 的 leader 落盘成功后返回 ack,如果在 follower 同步成功之前 leader 故障,那么将会丢失数据;

  • -1(all):producer 等待 broker 的 ack,partition 的 leader 和 follower 全部落盘成功后才 返回 ack。但是如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会 造成数据重复。

  • ISR机制
    上述过程有个问题,即当ack设置为all的时候,leader 收到数据,所有 follower 都开始同步数据, 但有一个 follower,因为某种故障,迟迟不能与 leader 进行同步,那 leader 就要一直等下去, 直到它完成同步,才能发送 ack。这个问题怎么解决呢?

    在zk中会保存AR(Assigned Replicas)列表,其中包含了分区所有的副本,其中 AR = ISR+OSR

    ISR(in sync replica)

    是kafka动态维护的一组同步副本,在ISR中有成员存活时,只有这个组的成员才可以成为leader,内部保存的为每次提交信息时必须同步的副本(acks = all时),每当leader挂掉时,在ISR集合中选举出一个follower作为leader提供服务,当ISR中的副本被认为坏掉的时候,会被踢出ISR,当重新跟上leader的消息数据时,重新进入ISR。

    当然 ISR 在这里可以认为是活跃度border节点,当ack=all 的时候,只要保证所有isr中的节点同步完毕之后,就可以回复ACK消息。

    kafka 会自己维护ISR里面border节点。kafka会根据中zk中的数据对border的活跃性做一个判断

    • 第一点:一个节点必须维持和zk的会话,通过zk的心跳检测实现
    • 第二点:如果节点是一个slave也就是复制节点,那么他必须复制leader节点不能太落后。这里的落后可以指两种情况
      • 1:数据复制落后,slave节点和leader节点的数据相差较大,这种情况有一个缺点,在生产者突然发送大量消息导致网络堵塞后,大量的slav复制受阻,导致数据复制落后被大量的踢出ISR。
      • 2:时间相差过大,指的是slave向leader请求复制的时间距离上次请求相隔时间过大。通过配置replica.lag.time.max就可以配置这个时间参数。这种方式解决了上述第一种方式导致的问题。

注意:最新的版本的kafka 已经干掉了 第一个中情况,采用了第二种时间的延迟的方式来维护ISR列表,原因是当kafka分批次发送消息,恰好这批消息是的 leader offset 大于ISR的临界值,那么kafka就会将所有的ISR 节点提出ISR列表,当同步完成之后,所有的follow 又和leader的值相等,这样就会将所有的节点加入ISR 这样一来,kafka 就会频繁的删减ISR列表

OSR(out sync replica): 保存的副本不必保证必须同步完成才进行确认,OSR内的副本是否同步了leader的数据,不影响数据的提交,OSR内的follower尽力的去同步leader,可能数据版本会落后。

  • 如果ISR内的全部副本挂掉怎么办?
    • 服务直接不可用一段时间等待ISR中副本恢复

      服务不可用方式这种适用在不允许消息丢失的情况下使用,适用于一致性大于可用性,可以有两种做法:

      • 设置ISR最小同步副本数量,如果ISR的当前数量大于设置的最小同步值,那么该分区才会接受写入,避免了ISR同步副本过少。如果小于最小值那么该分区将不接收写入。这个最小值设置只有在acks = all的时候才会生效。
    • 禁用unclean-leader选举,当isr中的所有副本全部不可用时,不可以使用OSR 中的副本作为leader,直接使服务不可用,直到等到ISR 中副本恢复再进行选举leader。

直接选择第一个副本作为leader的方式,适用于可用性大于一致性的场景,这也是kafka在isr中所有副本都死亡了的情况采用的默认处理方式,我们可以通过配置参数unclean.leader.election.enable来禁止这种行为,采用第一种方法。

  • ISR同步机制

  • base offset:起始位移,replica中第一天消息的offset

  • HW:replica高水印值,副本中最新一条已提交消息的位移。leader 的HW值也就是实际已提交消息的范围,每个replica都有HW值,但仅仅leader中的HW才能作为标示信息。什么意思呢,就是说当按照参数标准成功完成消息备份(成功同步给follower replica后)才会更新HW的值,代表消息理论上已经不会丢失,可以认为“已提交”。(也可以理解为指的是消费者能见到的最大的 offset)

  • LEO:日志末端位移,也就是replica中下一条待写入消息的offset,注意哈,是下一条并且是待写入的,并不是最后一条。

所以HW代表已经完成同步的数据的位置,LEO代表已经写入的最新位置,只有HW位置之前的才是可以被外界访问的数据。
现在就来看一下之前,broker从收到消息到返回响应这个黑盒子里发生了什么。

  • (1)follower 故障
    follower 发生故障后会被临时踢出 ISR,待该 follower 恢复后,follower 会读取本地磁盘 记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。 等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重 新加入 ISR 了。

  • (2)leader 故障
    leader 发生故障之后,会从 ISR 中选出一个新的 leader,之后,为保证多个副本之间的 更多数据一致性,其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader 同步数据。
    注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

Exactly Once 语义

  • At Least Once and At Most Once

    将服务器的 ACK 级别设置为-1,可以保证 Producer 到 Server 之间不会丢失数据,即 At Least Once 语义。相对的,将服务器 ACK 级别设置为 0,可以保证生产者每条消息只会被 发送一次,即 At Most Once 语义。

  • At Least Once

    可以保证数据不丢失,但是不能保证数据不重复;相对的,At Least Once 可以保证数据不重复,但是不能保证数据不丢失。但是,对于一些非常重要的信息,比如说 交易数据,下游数据消费者要求数据既不重复也不丢失,即 Exactly Once 语义。在 0.11 版 本以前的 Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局 去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。

0.11 版本的 Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指 Producer 不论 向 Server 发送多少次重复数据,Server 端都只会持久化一条。幂等性结合 At Least Once 语 义,就构成了 Kafka 的 Exactly Once 语义。即:
At Least Once + 幂等性 = Exactly Once

要启用幂等性,只需要将 Producer 的参数中 enable.idompotence 设置为 true 即可。Kafka
的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。

开启幂等性的 Producer 在 初始化的时候会被分配一个 PID,发往同一 Partition 的消息会附带 Sequence Number。而 Broker 端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker 只 会持久化一条。

但是 PID 重启就会变化,同时不同的 Partition 也具有不同主键,所以幂等性无法保证跨 分区跨会话的 Exactly Once。

  • Exactly Once

Kafka 生产者

消费方式

consumer 采用 pull(拉)模式从 broker 中读取数据。

pull 模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中,一直返回空数 据。针对这一点,Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有 数据可供消费,consumer 会等待一段时间之后再返回,这段时长即为 timeout。

消费者组

Kafka 存在 Consumer Group的概念,也就是 group.id 一样的 Consumer,这些 Consumer 属于同一个Consumer Group,组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。
当然,每个分区只能由同一个消费组内的一个consumer来消费。那么问题来了,同一个 Consumer Group 里面的 Consumer 是如何知道该消费哪些分区里面的数据呢?

分区分配策略

在 Kafka 内部存在两种默认的分区分配策略:Range 和 RoundRobin。当以下事件发生时,Kafka 将会进行一次分区分配:

  • 同一个 Consumer Group 内新增消费者
  • 消费者离开当前所属的Consumer Group,包括shuts down 或 crashes
  • 订阅的主题新增分区
  • 将分区的所有权从一个消费者移到另一个消费者称为重新平衡(rebalance),如何rebalance就涉及到本文提到的分区分配策略。

下面我们将详细介绍 Kafka 内置的两种分区分配策略。本文假设我们有个名为 T1 的主题,其包含了10个分区,然后我们有两个消费者(C1,C2)来消费这10个分区里面的数据,而且 C1 的 num.streams = 1,C2 的 num.streams = 2。

Kafka 有两种分配策略,一是 RoundRobin,一是 Range

  • RoundRobin

使用RoundRobin策略有两个前提条件必须满足:

  • 同一个Consumer Group里面的所有消费者的num.streams必须相等;
  • 每个消费者订阅的主题必须相同。

RoundRobin策略的工作原理:将所有主题的分区组成 TopicAndPartition 列表,然后对 TopicAndPartition 列表按照 hashCode 进行排序,然后按照轮询的方式进行分配

  • Range

Range策略是对每个主题而言的

  • 首先对同一个主题里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。
  • 在我们的例子里面,排完序的分区将会是0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消费者线程排完序将会是C1-0, C2-0, C2-1。
  • 然后将partitions的个数除于消费者线程的总数来决定每个消费者线程消费几个分区。
  • 如果除不尽,那么前面几个消费者线程将会多消费一个分区。

在我们的例子里面,我们有10个分区,3个消费者线程, 10 / 3 = 3,而且除不尽,那么消费者线程 C1-0 将会多消费一个分区,所以最后分区分配的结果看起来是这样的:

C1-0 将消费 0, 1, 2, 3 分区

C2-0 将消费 4, 5, 6 分区

C2-1 将消费 7, 8, 9 分区

假如我们有2个主题(T1和T2),分别有10个分区,那么最后分区分配的结果看起来是这样的:

C1-0 将消费 T1主题的 0, 1, 2, 3 分区以及 T2主题的 0, 1, 2, 3分区

C2-0 将消费 T1主题的 4, 5, 6 分区以及 T2主题的 4, 5, 6分区

C2-1 将消费 T1主题的 7, 8, 9 分区以及 T2主题的 7, 8, 9分区

可以看出,C1-0 消费者线程比其他消费者线程多消费了2个分区,这就是Range strategy的一个很明显的弊端。

Offset

Offset 存贮

在kafka 0.9版本之后,kafka为了降低zookeeper的io读写,减少network data transfer,也自己实现了在kafka server上存储consumer,topic,partitions,offset信息将消费的 offset 迁入到了 Kafka 一个名为 __consumer_offsets 的Topic中。
offset提交消息会根据消费组的key(消费组名称)进行分区. 对于一个给定的消费组,它的所有消息都会发送到唯一的broker,
这对offset的抓取请求会更加容易,因为不需要以分散-收集的方式对多个brokers发送请求并收集结果(只针对一个broker).

Offset 提交方式

  • 自动提交

    Kafka中偏移量的自动提交是由参数enable_auto_commitauto_commit_interval_ms控制的,当enable_auto_commit=True时,Kafka在消费的过程中会以频率为auto_commit_interval_ms向Kafka自带的topic(__consumer_offsets)进行偏移量提交,具体提交到哪个Partation是以算法:partation=hash(group_id)%50来计算的。

    如:group_id=test_group_1,则partation=hash(“test_group_1”)%50=28

  • 手动提交

    对于自动提交偏移量,如果auto_commit_interval_ms的值设置的过大,当消费者在自动提交偏移量之前异常退出,将导致kafka未提交偏移量,进而出现重复消费的问题,所以建议auto_commit_interval_ms的值越小越好。

    鉴于Kafka自动提交offset的不灵活性和不精确性(只能是按指定频率的提交),Kafka提供了手动提交offset策略。手动提交能对偏移量更加灵活精准地控制,以保证消息不被重复消费以及消息不被丢失。

    对于手动提交offset主要有3种方式:

    1. 同步提交 (consumer.commitSync())

      虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞
      吐量会收到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。

    2. 异步提交 consumer.commitAsync()

无论是同步提交还是异步提交 offset,都有可能会造成数据的漏消费或者重复消费。先
提交 offset 后消费,有可能造成数据的漏消费;而先消费后提交 offset,有可能会造成数据 的重复消费。

  • 自动管理

    Kafka 0.9 版本之前,offset 存储在 zookeeper,0.9 版本及之后,默认将 offset 存储在 Kafka 的一个内置的 topic 中。除此之外,Kafka 还可以选择自定义存储 offset。
    offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalace。

    • 当有新的消费者加入消费者组、已有的消费者推出消费者组或者所订阅的主题的分区发 生变化,就会触发到分区的重新分配,重新分配的过程叫做 Rebalance。

    • 消费者发生 Rebalance 之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的 offset 位置继续消费

    kafka 提供了ConsumerRebalanceListener 类来监控 rebanlace

    • 当我们发生rebanlace 的时候需要将将offset 从落地库中取出来,最典型的应用就是将消息的落地和offset的提交做成一个事物。

参考



支付宝打赏 微信打赏

赞赏一下