1. Kafka架构深入
1.1 Kafka 工作流程及文件存储机制
kafka中消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是面向topic的
topic是逻辑上的概念,而partition是物理上的概念,每个partition对应一个log文件,该log文件中存储的就是producer生产的数据。producer生产的数据会被不断追加到log文件的末端,且每条数据都有自己的offset
offset是一个long型的数字,通过这个offset可以确定一条在该partition下的唯一消息。在partition下是保证有序的,但是在topic下面没有保证有序性
消费者组中的每个消费者,都会实时记录自己消费到哪个offset以便出错恢复,从上次的位置继续消费
由于生产者生产的消息会不断追加到log文件末端,为防止log文件过大导致数据定位效率低,kafka采取了分片和索引机制,将每个partition分为多个segment(逻辑上的概念,index+log文件)
每个partition(目录)相当于一个巨型文件被平均分配到多个大小相等的segment(片段)数据文件中(每个segment文件中消息数量不一定相等),这种特性也方便old segment的删除,即方便已被消费的消息的清理,提高磁盘的利用率。每个partition只需要支持顺序读写就行,segment的文件生命周期由服务端配置参数(log.segment.bytes,log.roll.{ms,hours}等若干参数)决定
每个segment对应两个文件—-“.index”和“.log”文件。分别表示为segment索引文件和数据文件(引入索引文件的目的就是便于利用二分查找快速定位message位置)。这两个文件的命名规则为:
partition全局的第一个segment从0开始,后续每个segment文件名以当前segment的第一条消息的offset命名,数值大小为64位,20位数字字符长度,没有数字用0填充。
这些文件位于一个文件夹下(partition目录),该文件夹的命名规则:topic名+分区序号。例如,first这个topic有三个分区,则其对应的文件夹为first-0,first-1,first-2
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log
index 和log 文件以当前segment 的第一条消息的offset 命名。下图为index 文件和log 文件的结构示意图 “.index”文件存储大量的索引信息,“.log”文件存储大量的数据,索引文件中的元数据指向对应数据文件中message 的物理偏移地址。 index里面不仅存着message的起始偏移量,还存着message的大小,比如说message3大小1000k,那么只要在log文件里扫描756-1756就能快速定位message3 的位置
1.2 Kafka生产者
1.2.1 分区策略
1) 分区的原因
(1) 方便在集群中扩展,每个Partition 可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition 组成,因此整个集群就可以适应任意大小的数据了; (2) 可以提高并发,因为可以以Partition 为单位读写了。
2) 分区的原则
我们需要将producer 发送的数据封装成一个**ProducerRecord **对象。 (1) 指明partition 的情况下,直接将指明的值直接作为partiton 值; (2) 没有指明partition 值但有key 的情况下,将key 的hash 值与topic 的partition数进行取余得到partition 值; (3)既没有partition 值又没有key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与topic 可用的partition 总数取余得到partition值,也就是常说的round-robin 算法。
1.2.2 数据可靠性保证
为保证producer 发送的数据,能可靠的发送到指定的topic,topic 的每个partition 收到producer 发送的数据后,都需要向producer 发送ack(acknowledgement 确认收到),如果producer 收到ack,就会进行下一轮的发送,否则重新发送数据。
1) 副本数据同步策略
| 方案 | 优点 | 缺点 | | — | — | — | | 半数以上完成同步,就发送 ack | 延迟低 | 选举新的 leader 时,容忍 n 台节点的故障,需要 2n+1 个副 本 | | 全部完成同步,才发送 ack | 选举新的 leader 时,容忍 n 台节点的故障,需要 n+1 个副 本 | 延迟高 |
Kafka 选择了第二种方案,原因如下: (1)同样为了容忍n 台节点的故障,第一种方案需要2n+1 个副本,而第二种方案只需要n+1个副本,而Kafka 的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。 (2)虽然第二种方案的网络延迟会比较高,但网络延迟对Kafka 的影响较小
2) ISR
采用第二种方案之后,设想以下情景:leader 收到数据,所有follower 都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader 进行同步,那leader 就要一直等下去,直到它完成同步,才能发送ack。这个问题怎么解决呢? Leader 维护了一个动态的in-sync replica set (ISR),意为和leader 保持同步的follower 集合。当 ISR 中的 follower 完成数据的同步之后,leader 就会给 follower 发送 ack。如果 follower长时间未向leader 同步数据,则该follower 将被踢出ISR ,该时间阈值由**replica.lag.time.max.ms **参数设定。Leader 发生故障之后,就会从ISR 中选举新的leader。
3) ack 应答机制
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR 中的follower 全部接收成功。 所以Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。 acks 参数配置: acks:
配置编号 | 描述 | 存在的问题 |
---|---|---|
0 | producer 不等待 broker 的 ack,这一操作提供了一个最低的延迟,broker 一接收到还 没有写入磁盘就已经返回 | 当broker 故障时有可能丢失数据 |
1 | producer 等待broker 的ack,partition 的leader 落盘成功后返回ack | 如果在follower |
同步成功之前leader 故障,那么将会丢失数据 | ||
-1(all) | producer 等待broker 的ack,partition 的leader 和follower 全部落盘成功后才返回ack | 如果在follower 同步完成后,broker 发送ack 之前,leader 发生故障,那么会造成数据重复。 |
acks = 1 数据丢失案例
acks = -1 数据重复案例
4) 故障处理细节
HW(High Watermark) 指的是消费者能见到的最大的offset,ISR 队列中最小的LEO LEO(Log End Offset) 每个副本的最后一个offset (1)Follower故障 follower 发生故障后会被临时踢出 ISR,待该 follower 恢复后,follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。 等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了 (2)Leader故障 leader 发生故障之后,会从 ISR 中选出一个新的 leader,之后为保证多个副本之间的数据一致性,其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader 同步数据。 注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
1.2.3 Exactly Once 语义
At Least Once 将服务器的 ACK 级别设置为-1,可以保证 Producer 到 Server 之间不会丢失数据,但是不能保证数据不重复 At Most Once 将服务器 ACK 级别设置为** 0,可以保证生产者每条消息只会被发送一次,但是不能保证数据不丢失**
对于一些非常重要的信息,比如说交易数据,下游数据消费者要求数据既不重复也不丢失,即 Exactly Once 语义
在0.11 版本以前的Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。
0.11 版本的 Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指 Producer 不论向 Server 发送多少次重复数据,Server 端都只会持久化一条。幂等性结合 At Least Once 语义,就构成了 Kafka 的 Exactly Once 语义。 At Least Once + 幂等性= Exactly Once
要启用幂等性,只需要将Producer 的参数中enable.idompotence 设置为true 即可。Kafka 的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的Producer 在初始化的时候会被分配一个PID,发往同一Partition 的消息会附带Sequence Number。而Broker 端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker 只会持久化一条。 但是PID 重启就会变化,同时不同的Partition 也具有不同主键,所以幂等性无法保证跨分区跨会话的Exactly Once。
1.3 Kafka消费者
1.3.1 消费方式
consumer 采用pull(拉)模式从broker 中读取数据。
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull 模式则可以根据consumer 的消费能力以适当的速率消费消息。
pull 模式不足之处是,如果kafka 没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka 的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer 会等待一段时间之后再返回,这段时长即为timeout。
1.3.2 分区分配策略
一个consumer group 中有多个consumer,一个topic 有多个partition,所以必然会涉及到partition 的分配问题,即确定那个partition 由哪个consumer 来消费。 Kafka 有两种分配策略,一是RoundRobin,一是Range。
1)分配分区的前提条件
首先kafka设定了默认的消费逻辑:一个分区只能被同一个消费组(ConsumerGroup)内的一个消费者消费。 在这个消费逻辑设定下,假设目前某消费组内只有一个消费者C0,订阅了一个topic,这个topic包含6个分区,也就是说这个消费者C0订阅了6个分区,这时候可能会发生下列三种情况:
- 如果这时候消费者组内新增了一个消费者C1,这个时候就需要把之前分配给C0的6个分区拿出来3个分配给C1;
- 如果这时候这个topic多了一些分区,就要按照某种策略,把多出来的分区分配给C0和C1;
- 如果这时候C1消费者挂掉了或者退出了,不在消费者组里了,那所有的分区需要再次分配给C0。
总结一下,这三种情况其实就是kafka进行分区分配的前提条件:
- 同一个 Consumer Group 内新增消费者;
- 订阅的主题新增分区;
-
消费者离开当前所属的Consumer Group,包括shuts down 或 crashes。
只有满足了这三个条件的任意一个,才会进行分区分配 。分区的所有权从一个消费者移到另一个消费者称为重新平衡(rebalance),如何rebalance就涉及到本节提到的分区分配策略。kafka提供了消费者客户端参数partition.assignment.strategy用来设置消费者与订阅主题之间的分区分配策略。默认情况下,此参数的值为:org.apache.kafka.clients.consumer.RangeAssignor,即采用range分配策略。除此之外,Kafka中还提供了roundrobin分配策略和sticky分区分配策略。消费者客户端参数partition.asssignment.strategy可以配置多个分配策略,把它们以逗号分隔就可以了。
- RoundRobin 轮询调度 针对消费组
- Range **针对Topic**
(1)RoundRobin (2)Range
2)RoundRobin分配策略
RoundRobin策略的原理是将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序,然后通过轮询算法逐个将分区以此分配给每个消费者。
使用RoundRobin分配策略时会出现两种情况:
- 如果同一消费组内,所有的消费者订阅的消息都是相同的,那么 RoundRobin 策略的分区分配会是均匀的。
- 如果同一消费者组内,所订阅的消息是不相同的,那么在执行分区分配的时候,就不是完全的轮询分配,有可能会导致分区分配的不均匀。如果某个消费者没有订阅消费组内的某个 topic,那么在分配分区的时候,此消费者将不会分配到这个 topic 的任何分区。
我们分别举例说明: 第一种:比如我们有3个消费者(C0,C1,C2),都订阅了2个主题(T0 和 T1)并且每个主题都有 3 个分区(p0、p1、p2),那么所订阅的所有分区可以标识为T0p0、T0p1、T0p2、T1p0、T1p1、T1p2。此时使用RoundRobin分配策略后,得到的分区分配结果如下:
消费者线程 | 对应消费的分区序号 |
---|---|
C0 | T0p0、T1p0 |
C1 | T0p1、T1p1 |
C2 | T0p2、T1p2 |
可以看到,这时候的分区分配策略是比较平均的。
第二种:比如我们依然有3个消费者(C0,C1,C2),他们合在一起订阅了 3 个主题:T0、T1 和 T2(C0订阅的是主题T0,消费者C1订阅的是主题T0和T1,消费者C2订阅的是主题T0、T1和T2),这 3 个主题分别有 1、2、3 个分区(即:T0有1个分区(p0),T1有2个分区(p0、p1),T2有3个分区(p0、p1、p2)),即整个消费者所订阅的所有分区可以标识为 T0p0、T1p0、T1p1、T2p0、T2p1、T2p2。此时如果使用RoundRobin分配策略,得到的分区分配结果如下:
消费者线程 | 对应消费的分区序号 |
---|---|
C0 | T0p0 |
C1 | T1p0 |
C2 | T1p1、T2p0、T2p1、T2p2 |
这时候显然分配是不均匀的,因此在使用RoundRobin分配策略时,为了保证得均匀的分区分配结果,需要满足两个条件:
- 同一个消费者组里的每个消费者订阅的主题必须相同;
- 同一个消费者组里面的所有消费者的num.streams(消费者线程)必须相等。
如果无法满足,那最好不要使用RoundRobin分配策略。
3)Range分配策略
Range分配策略是面向每个主题的,首先会对同一个主题里面的分区按照序号进行排序,并把消费者线程按照字母顺序进行排序。然后用分区数除以消费者线程数量来判断每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。
我们假设有个名为T1的主题,包含了7个分区,它有两个消费者(C0和C1),其中C0的num.streams(消费者线程) = 1,C1的num.streams = 2。排序后的分区是:0,1,2,3,4,5,6;消费者线程排序后是:C0-0,C1-0,C1-1;一共有7个分区,3个消费者线程,进行计算7/3=2…1,商为2余数为1,则每个消费者线程消费2个分区,并且前面1个消费者线程多消费一个分区,结果会是这样的:
消费者线程 | 对应消费的分区序号 |
---|---|
C0-0 | 0,1,2 |
C1-0 | 3,4 |
C1-1 | 5,6 |
这样看好像还没什么问题,但是一般在咱们实际生产环境下,会有多个主题,我们假设有3个主题(T1,T2,T3),都有7个分区,那么按照咱们上面这种Range分配策略分配后的消费结果如下:
消费者线程 | 对应消费的分区序号 |
---|---|
C0-0 | T1(0,1,2),T2(0,1,2),T3(0,1,2) |
C1-0 | T1(3,4),T2(3,4),T3(3,4) |
C1-1 | T1(5,6),T2(5,6),T3(5,6) |
我们可以发现,在这种情况下,C0-0消费线程要多消费3个分区,这显然是不合理的,其实这就是Range分区分配策略的缺点。
4)ticky分配策略
最后介绍一下Sticky分配策略,这种分配策略是在kafka的0.11.X版本才开始引入的,是目前最复杂也是最优秀的分配策略。 Sticky分配策略的原理比较复杂,它的设计主要实现了两个目的:
- 分区的分配要尽可能的均匀;
- 分区的分配尽可能的与上次分配的保持相同。
如果这两个目的发生了冲突,优先实现第一个目的。
我们举例进行分析:比如我们有3个消费者(C0,C1,C2),都订阅了2个主题(T0 和 T1)并且每个主题都有 3 个分区(p0、p1、p2),那么所订阅的所有分区可以标识为T0p0、T0p1、T0p2、T1p0、T1p1、T1p2。此时使用Sticky分配策略后,得到的分区分配结果如下:
消费者线程 | 对应消费的分区序号 |
---|---|
C0 | T0p0、T1p0 |
C1 | T0p1、T1p1 |
C2 | T0p2、T1p2 |
哈哈,这里可能会惊呼,怎么和前面RoundRobin分配策略一样,其实底层实现并不一样。这里假设C2故障退出了消费者组,然后需要对分区进行再平衡操作,如果使用的是RoundRobin分配策略,它会按照消费者C0和C1进行重新轮询分配,再平衡后的结果如下:
消费者线程 | 对应消费的分区序号 |
---|---|
C0 | T0p0、T0p2、T1p1 |
C1 | T0p1、T1p0、T1p2 |
但是如果使用的是Sticky分配策略,再平衡后的结果会是这样:
消费者线程 | 对应消费的分区序号 |
---|---|
C0 | T0p0、T1p0、T0p2 |
C1 | T0p1、T1p1、T1p2 |
看出区别了吗?Stiky分配策略保留了再平衡之前的消费分配结果,并将原来消费者C2的分配结果分配给了剩余的两个消费者C0和C1,最终C0和C1的分配还保持了均衡。这时候再体会一下sticky(翻译为:粘粘的)这个词汇的意思,是不是豁然开朗了。
为什么要这么处理呢? 这是因为发生分区重分配后,对于同一个分区而言有可能之前的消费者和新指派的消费者不是同一个,对于之前消费者进行到一半的处理还要在新指派的消费者中再次处理一遍,这时就会浪费系统资源。而使用Sticky策略就可以让分配策略具备一定的“粘性”,尽可能地让前后两次分配相同,进而可以减少系统资源的损耗以及其它异常情况的发生。
1.4 offset的维护
在现实情况下,消费者在消费数据时可能会出现各种会导致宕机的故障问题,这个时候,如果消费者后续恢复了,它就需要从发生故障前的位置开始继续消费,而不是从头开始消费。所以消费者需要实时的记录自己消费到了哪个offset,便于后续发生故障恢复后继续消费。Kafka 0.9版本之前,consumer默认将offset保存在Zookeeper中,从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为 __consumer_offsets 。
offset的维护很简单,之所以单独列出来,是因为offset维护针对不同的kafka版本进行的处理是不同的,这点需要注意。
1.5 Kafka 高效读写数据
1) 顺序写磁盘
Kafka 的producer 生产数据,要写入到log 文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
2) 零复制技术
数据直接在内核完成输入和输出,不需要拷贝到用户空间再写出去。
1.6 Zookeeper 在Kafka 中的作用
Kafka 集群中有一个broker 会被选举为Controller,负责管理集群broker 的上下线,所有topic 的分区副本分配和leader 选举等工作。 Controller 的管理工作都是依赖于Zookeeper 的。
2种leader
1)Controller leader
当broker启动的时候,都会创建KafkaController对象,但是集群中只能有一个leader对外提供服务,这些每个节点上的KafkaController会在指定的zookeeper路径下创建临时节点,只有第一个成功创建的节点的KafkaController才可以成为leader,其余的都是follower。当leader故障后,所有的follower会收到通知,再次竞争在该路径下创建节点从而选举新的leader
4)Partition leader
由controller leader执行 以下为partition 的leader 选举过程:
1.7 Kafka事务
Kafka 从0.11 版本开始引入了事务支持。事务可以保证Kafka 在Exactly Once 语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。
1)Producer 事务
为了实现跨分区跨会话的事务,需要引入一个全局唯一的Transaction ID,并将Producer 获得的PID 和Transaction ID 绑定。这样当Producer 重启后就可以通过正在进行的Transaction ID 获得原来的PID。 为了管理Transaction,Kafka 引入了一个新的组件Transaction Coordinator。Producer 就是通过和Transaction Coordinator 交互获得Transaction ID 对应的任务状态。Transaction Coordinator 还负责将事务所有写入Kafka 的一个内部Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。
2)Consumer 事务
上述事务机制主要是从Producer 方面考虑,对于Consumer 而言,事务的保证就会相对较弱,尤其时无法保证Commit 的信息被精确消费。这是由于Consumer 可以通过offset 访问任意信息,而且不同的Segment File 生命周期不同,同一事务的消息可能会出现重启后被删除的情况。