AI 日报

带你吃透Kafka的可靠性设计

  • By 51ITO
  • Dec 06, 2023 - 2 min read



作者 | 蔡柱梁

审校 | 重楼

目录

  1. 前言
  2. 可靠性分析
  3. 副本设计
  4. leader选举机制
  5. 日志同步机制

1 前言

本文里面涉及到较多基础概念,如果忘记了,那么可以去看下《一文带你快速入门kafka》。

对于一个消息中间件而言,可靠性是是至关重要的要素之一。不管是面试或者实际工作中,我们都不得不面对几个问题:是几个九?消息会不会丢失?如何保证幂等?如何顺序消费?这篇文章中笔者会和大家一起去看 Kafka 是如何设计的。

2 可靠性分析

针对上面的几个问题,Kafka 需要考虑包括不限于以下问题:

  • 可用性

– Kafka 支持分布式架构,实现了故障转移,避免单点问题

如何避免脑裂问题(这个要了解 Kafka 的 leader 选举机制)

– 多副本机制,支持容灾备份

  1. 数据一致性如何保证
  2. 数据同步要如何实现
  • 消息问题

– 生产者投递消息

broker回复投递成功,但是消息丢失了。出现这种情况,一般是以下几种情况:

  1. acks 配置成 0,生产者以为自己投递成功了,但其实并没有成功写入 leader
  2. 消息持久化在 leader 分区,但是没有同步给 follower 就宕机了

这个问题也好解决:生产者可以在发送消息前,先将消息持久化。至多是用了存储空间,现在磁盘空间可以说是最不值钱的了,而且我们还可以定期进行归档压缩/删除处理,问题不大。

– 消费者消费消息遇到消息丢失或者消息重复处理

  • 消息丢失

消息丢失一般是以下这几种情况:

  1. 消费者拿到消息了,但是处理过程中发生异常
  2. 消费者提交消费位移的设计不合理

针对这个问题,我们通常拿到消息会选择将消息持久化在本地,然后再做消息处理,处理出问题也可以重复处理。这种设计满足我们大多数场景,但是对于消息生产速度远高于我们持久化速度的场景可能就不适用了,因为我们要考虑消息堆积问题。不管是这个问题,还是有些场景下无法找生产者重新投递消息的问题,都让我们期待着消息中间件可以支持消息回溯功能。

  • 重复消费

这个可以交由使用者自己做幂等处理

– 消息需要有序消费

我们知道 Kafka 是分区内消息有序的。当然,需要有序的消息就只能使用一个分区,无疑是以 Kafka 的水平扩展能力作为代价的。如果是需要全局有序,而我们又确定使用 Kafka,而且单分区的吞吐量不能满足要求,那么我们只能自己进行额外设计来保证了。

2.1 acks配置对消息丢失的影响

2.1.1 acks=1

消息成功写入 leader 后,就会告诉生产者投递成功

如上图例子,一共三个分区,其中 follower1 和 follower2 均属于 ISR。假设 leader 成功写入 3 和 4 之后,ISR 还没同步,leader 就宕机了,这样就丢失了 3 和 4,如下图

2.1.2 acks=-1 或者 acks=all

消息不仅要成功写入 leader,还要 ISR 中的所有 follower 同步完成后,才会告知生产者投递成功。

还是 2.1.1 的例子,这里无非会有两种情况

  • leader 在同步完成后宕机

  • leader 在同步完成前宕机

这个配置对 Kafka 的性能会有较大影响,需要自己斟酌得失。

2.2 unclean.leader.election.enable

这个配置是用来控制 Kafka 是否可以选举非 ISR 中的副本为 leader,0.11.0.0 之后的版本默认为 false。虽然设置为 true 可以提高 Kafka 的可用性,但是会降低 Kafka 数据的可靠性。

2.3 小结

上面提出的问题均有指出和 Kafka 相关部分的设计是哪些,这里再总结下:

  • 如何避免脑裂问题——了解 Kafka 的 leader 选举机制
  • 数据同步&数据一致性问题——了解 Kafka 的多副本设计
  • 消息顺序消费问题——了解 Kafka 的日志同步机制(分区有序)

3.副本设计

副本( Replica )是分布式系统中常见的概念之一,指的是分布式系统对数据和服务提供的一种冗余方式。

我们知道 Kafka 通过多副本机制,增强了容灾备份的能力,并且实现了故障转移。这无疑是大大提高了 Kafka 的可用性,下面笔者会带着大家一起看 Kafka 的副本机制是如何设计的。

在此之前,先简单复习几个相关的概念:

  • 副本是相对分区而言的,即副本是指某个分区的副本
  • 在多副本情况下,其中一个副本为 leader,其它均为 follower。只有 leader 对外提供服务,follower 仅同步leader 数据
  • 分区中所有的副本集合称为 AR,ISR 是与 leader 保持同步状态的副本集合,leader 也是 ISR 中的一员
  • LEO 是每个分区下一条消息要写入的位置
  • HW 是 ISR 中最小的 LEO,消费者只能拉取 HW 之前的消息

3.1 失效副本

正常情况下,分区中所有副本都应该属于 ISR,但是网络具有不可靠性。因此,难免在某一个时刻会有一些成员会被踢出 ISR,这些副本要么处于同步失效状态,要么功能失效,这些副本统称为失效副本。

功能失效指的是无法工作,比如某个 broker 宕机了,那么在它上面的分区就会失效。

同步失效又是怎么判断是否同步失效的呢?是通过参数 replica.lag.time.max.ms 来判断的,默认是 10000 毫秒。当某个 follower 同步滞后 leader 的时间超过 10 秒,则判定为同步失效。

具体实现原理如下:

当 follower 将 leader LEO 之前的消息全部同步完成,那么会认为该 follower 已经追上 leader,并更新 lastCaughtUpTimeMs。Kafka 的副本管理器有一个副本过期检测的定时任务,如果发现当前时间 - lastCaughtUpTimeMs > 10秒,则判定同步失效。

除了时间设置以外,还有另一个参数 replica.lag.max.message(默认4000,这个是 broker 级别的参数),也是用来判定失效副本的。

一般情况下,这两个参数都是使用默认值就可以,因为如果没有调优经验,自己乱配置,容易导致 ISR 变动过于频繁。同时,需要监控失效副本的数量,因为它是衡量 Kafka 是否健康的一个很重要的指标。

PS:新加入的副本因子/宕机恢复后重新加入的副本在追赶上 leader 之前,也会一直处于失效状态。

3.1.1 失效副本的作用

失效副本为 Kafka 带来了什么收益呢?为什么需要设计这么一个状态呢?

大家不妨试想下:假设允许 ISR 中有一个副本同步一直跟不上 leader。当 leader 发生宕机时,这个 follower 被选举成了新的 leader,那么这时就会发生消息丢失。

一般会造成副本失效基本是以下两个原因:

  • follower 副本进程卡顿,在一段时间内无法发起同步请求,比如说频繁发生 FULL GC
  • follower 同步过慢,在一段时间内无法追上 leader,比如 I/O有问题(笔者实际工作中遇到过一次,公司搭建自己的物理机房,用了二手服务器,有一台服务器I/O老化导致读写数据慢,导致了副本失效,消息堆积等问题)

3.2 LEO 与 HW

这一小节会更进一步去讲解它们之间的关系,让大家可以更清楚 Kafka 的副本同步机制。

假设现在有 3 个 broker,某个 topic 有 1 个分区,3 个副本。现在有一个 producer 发送了一条消息,这 3 个副本会发生些什么操作。

具体步骤如下:

  1. producer 发送消息到 leader
  2. leader 将消息追加到日志,并且更新日志的偏移量
  3. follower 执行定时任务向 leader 发送 fetch request 同步数据,该请求会带上自己的 LEO
  4. leader 读取本地日志,并更新 follower 的信息
  5. leader 返回 fetch response 给 follower,response 会包含 HW
  6. follower 将消息追加到本地日志,并更新日志的偏移量

为了更直观地理解上面的步骤,下面将会用图来展示。

1.一个新建的 topic 被写入了 5 条消息,两个 follower 去拉取数据

2.leader 给 follower 返回 fetch response,并且 leader 又被写入了 5 条消息

 其中 follower1 同步了 2 条数据,而 follower2 同步了 3 条数据。

 而 follower 的 HW = min(自己的LEO, 同步回来的HW)

3.follower 再次同步数据,同时 leader 又被写入了 5 条消息

 leader 更新了 HW

4.leader 给 follower 返回 fetch response

 根据公式,follower 更新 HW = 3

在一个分区中,leader 所在 broker 会记录所有副本的 LEO 和 自己的 HW;而 follower 所在的 broker 只会记录自己的 LEO 和 HW。因此,在逻辑层面上,我们可以得到下图:

0.11.0.0版本之前,Kafka 是基于 HW 的同步机制,但是这个设计有可能出现数据丢失和数据不一致的问题。Kafka 后面的版本通过 leader epoch 来进行优化。

3.3 数据丢失 & 数据不一致的解决方案

3.2小节说到了 LEO 与 HW 的更新机制,并且提到这种设计可能会出现数据丢失和数据不一致。我们先一起来看下这两问题是如何产生的。

3.3.1 数据丢失

假设某一分区在某一时刻的状态如下图(L 代表是 leader)

可以看见副本A的 LEO 是 2,HW 是 1;副本B的 LEO 是 2,HW 是 2。显然,哪怕没有新的消息写入副本B中,副本A也要过一小段时间才能追上副本A,并更新 HW。

假设在副本A更新 HW = 2之前,A宕机了,随后立马就恢复。这里会有一个截断机制——根据宕机之前持久化的HW 恢复消息。也就是说,A只恢复了 m1,m2 丢失了。

再假设 A 刚恢复,B 也宕机了,A 成为了 leader。这时 B 又恢复了,并成为了 follower。由于 follower 的 HW 不能比 leader 的 HW 高,所以 B 的 m2 也丢失了。

总结:这里大家可以发现 follower 的 HW 更新是有一定间隙的,像我这个例子其实 follower 是拿到 m2 了,只不过 HW 要追上 leader 需要等下一次的 fetch request。除非配置 acks=-1 并且配置min.insync.replicas 大于 1,unclean.leader.election.enable = true 才行。

3.3.2 数据不一致

假设某一分区在某一时刻,副本A 的 HW = 2,LEO = 2;副本B 的 HW = 1,LEO = 1。

又假设它们同时挂了,B 先恢复。这时,B 会成为 leader,如下图

此时,B 写入新消息 m3,并将 HW、LEO 更新为 2。此时,A 也恢复了。由于 A 的 HW 也是 2,所以没有截断消息。如下图

这样一来,A 中 offset = 1 的消息是 m2,B 中 offset = 1 的消息是 m3,数据不一致了。

3.3.3 leader epoch

为了解决 3.3.1 和 3.3.2 的问题,Kafka 从 0.11.0.0 开始引入 leader epoch,在需要截断时使用 leader epoch 作为依据,而不再是 HW。

如果看框架代码比较多的同学应该知道 epoch 是相当于版本的这么一个概念。leader epoch 的初始值是 0,每变更一次 leader,leader epoch 就会增加 1。另外,每个副本中还会增加一个矢量 StartOffset>,其中 StartOffset 是当前 leader epoch 下写入第一条消息的偏移量。每个副本的 Log 下都有一个 leader-epoch-checkpoint 文件,在发生 leader 变更时,会将对应的矢量追加到这个文件中。

3.3.3.1 解决数据丢失问题

还是3.3.1的例子,只不过多了 leader epoch 矢量信息。

副本A:HW=1,LEO=2,LE(leader epoch)=0,Offset(StartOffset)=0

leader-副本B:HW=2,LEO=2,LE=0,Offset(StartOffset)=0

假设在副本A更新 HW = 2之前,A宕机了,随后立马就恢复。不过这里不会立马进行截断日志操作,而是会发送一个 OffsetsForLeaderEpochRequest 请求给 B,B 作为目前的 leader 在收到请求之后会返回 OffsetsForLeaderEpochResponse 给 A。

我们先来看下 OffsetsForLeaderEpochRequest 和 OffsetsForLeaderEpochResponse 的数据结构。如下图

  • OffsetsForLeaderEpochRequest

 A 会将自己的 leader epoch 信息给 leader(A的 leader epoch 这里简化成 LE_A)。这里会出现两种情况:

– 变更了 leader

 B 会返回 LE_A+1 的 StartOffset 给 A

– 没有变更 leader

 B 会返回 A 的 LEO 给 A

 因此,我们可以把 OffsetsForLeaderEpochRequest 看作是一个查询 follower 当前 leader_epoch 的 LEO。

  • OffsetsForLeaderEpochResponse

这个例子中,B 会返回2给 A,而此时的 A 的 LEO 刚好是 2,所以不用进行截断日志。如下图:

如果此时B挂了,A成了 leader,并有 m3 写入,就会得到下图

可以看见 m2 并没有丢失,并且也更新了 leader_epoch 矢量为 (1,2)。

3.3.3.2 解决数据不一致问题

上图是3.3.2的例子。副本A是 leader,B 是 follower。

A 的 HW=2,LEO=2,LE=(0,0)

B 的 HW=1,LEO=1,LE=(0,0)

此时,A 和 B 同时宕机,并且 B 先恢复成为了 leader。此时,epoch 变成了 1。另外,新消息 m3 成功写入,就会得到下图

接着,A 也恢复了,这时 A 不会急着截断日志,而是给 leader 发送 OffsetsForLeaderEpochRequest,B 会返回 LEO = 1 给 A。因此,A 会截断日志,删除 m2。之后,再给 B 发送 fetch request,得到 B 的响应并更新后,将得到下图

这样数据不一致问题就解决了。

这里大家可能会有疑问,m2不是丢失了吗?是的,这种设计因为更新具有一定的间隙,并且没有事务管理,所以会有丢失消息的风险。

从 CAP 定理来看,这里的设计属于 AP。为什么这么说呢?大家不妨想一下,如果为了不丢失数据,这里加了事务控制的设计,那么对于分区而言它的吞吐量是会下降的,甚至是不可用的,因为响应速度是由短板的副本所决定的。对于定位是高吞吐量的 Kafka 而言,这显然是不可接受的。

3.4 小结

Kafka 通过多副本机制增强了容灾备份的能力,并且基于多副本机制实现了故障转移,避免了单点问题,但同时也引进了新的问题——数据丢失和数据不一致。从 0.11.0.0 版本开始,Kafka 增加了 leader epoch,它对这两个问题进行了优化。虽然无法完全避免消息丢失,但是从实际的使用角度而言,这个问题其实并不大。有实际工作经验的同学应该都知道,我们发送消息难以避免需要重推,哪怕消息中间件做到了百分百不丢失,其实我们在使用时仍然会做防止消息丢失的设计。相对而言,数据一致性就更重要了,否则很容易让订阅消息的下游系统出现脏数据。

4 leader 选举机制

在 Kafka 集群中会有一个或者多个 broker,其中有一个 broker 会被选举为控制器,它负责管理整个集群中所有分区和副本的状态。分区的 leader 出现故障时,由控制器负责为其选举新的 leader;当某个分区的 ISR 发生变化时,由控制器负责通知所有 broker 更新其元数据信息;当某个 topic 的分区数量发生变化时,还是由控制器负责分区的重新分配。因此,只要控制器正常工作,分区的 leader 就是唯一的,不会有脑裂问题。

那么 Kafka 是如何保证控制器只有一个的呢?如果控制器发生异常了怎么办?控制器的选举和异常恢复又是怎样的?

4.1 控制器

控制器是 broker 维度的角色,它负责管理整个集群中所有分区和副本的状态。

Kafka 中的控制器选举工作依赖 ZooKeeper,成功竞选为控制器的 broker 会在 ZooKeeper 中创建 /controller 临时节点,节点会存储以下信息:

{
 "version ": 1,
 "brokerid": 0,
 "timestamp": "1529210278988"
}

其中 version 目前是固定值不用管,brokerid 是成为控制器的 broker 的 id,timestamp 是 broker.id=0 的 broker 成为控制器的时间戳。

在任意时刻,Kafka 集群中有且仅有一个控制器。每个 broker 启动时会尝试读取 ZooKeeper 的 /controller 节点的 brokerid 的值,如果 brokerid ≠ -1,则表示当前集群已有控制器,broker 就会放弃竞选;如果不存在 /controller 节点,broker 就会尝试创建节点,创建成功的 broker 就会成为控制器,将自己的 ID 赋予 brokerid,而对于创建节点失败的 broker 则会在内存中保存当前控制器的 brokerid 值,这个值标识为 activeControllerId。

上面是启动 Kafka 集群以及正常情况下添加 broker 情况下的选举过程。那么当控制器出现故障时,就需要重新选举了。ZooKeeper 中还有一个与控制器有关的 /controller_epoch 节点,该节点是持久节点,里面存储了一个整型的 controller_epoch 值,初始值是 1。当控制器发生变化时,controller_epoch 就会加 1。每个和控制器交互的请求一定会带上 controller_epoch,当控制器发现请求带上的 controller_epoch 比自己内存的小,那么这个请求则是无效请求;如果请求带上的 controller_epoch 比自己内存的大,说明自己不再是控制器。由此可见,Kafka 是通过 controller_epoch 来保证控制器的唯一性,进而保证相关操作的一致性。

这里再扩展下,说下作为控制器的 broker 多出来的责任:

  • 监听分区相关的变化

– 在 ZooKeeper 的 /admin/reassign_partitions 节点注册 PartitionReassignmentHandler,用来处理分区重分配的动作。

– 在 ZooKeeper 的 /isr_change_notification 节点注册 IsrChangeNotificetionHandler,用来处理 ISR 集合变更的动作。

– 在 ZooKeeper 的 /admin/preferred-replica-election 节点注册 PreferredReplicaElectionHandler,用来处理优先副本的选举动作。

  • 监听主题相关的变化

– 在 ZooKeeper 的 /brokers/topics 节点注册 TopicChangeHandler,用来处理主题增减的变化。

– 在 ZooKeeper 的 /admin/delete_topics 节点注册 TopicDeletionHandler,用来处理删除主题的动作。

  • 监听 broker 相关的变化

 在 ZooKeeper 的 /brokers/ids 节点注册 BrokerChangeHandler,用来处理 broker 增减的变化。

  • 从 ZooKeeper 中读取当前所有与主题、分区及 broker 有关的信息并进行相应的管理

 对所有主题对应的在 ZooKeeper 中的 /brokers/topics/ 节点添加 PartitionModificationsHandler,用来监听主题中的分区分配变化。

  • 启动并管理分区状态机和副本状态机
  • 更新集群的元数据信息
  • 如果设置了 auto.leader.rebalance.enable = true,则还会开启一个名为“auto-leader-rebalance-task”的定时任务来负责维护分区的优先副本的均衡

成功竞选控制器的 broker 会在当选后,读取包括不限于上面提到的在 ZooKeeper 中的节点的数据,初始化上下文信息,并且进行管理。在 Kafka 中,因为会有大量需要读取或者更新上下文信息的操作,所以这里会有多线程问题。如果单纯采用锁机制实现,那么整体性能会大打折扣。因此,控制器采用的是单线程基于事件队列的模型。将所有相关的操作、事件进一步封装成一个个事件,按照事件发生的顺序存入 LinkedBlockingQueue 中,最后再使用一个专用线程按 FIFO 的原则处理各个事件。

控制器、非控制器 broker、ZooKeeper 的关系图如下:

只有控制器会注册相应的监听器关注节点的数据变化,其他 broker 则不关注这些节点的数据变化(除了 /controller)。因为所有 broker 都会关心当前的控制器到底是谁,当 /controller 的数据发生变化时,就要更新自己内存中的 activeControllerId。如果原来是控制器的 broker,发现自己现在不是了,就需要关闭资源,如注销只有控制器才需要的监听器等。不管什么原因造成 /controller 的 brokerid 变更,再重新选举控制器之前,要先确定参选的 broker 里面是否有前控制器,如果有,就要先“退位”,再开始新的选举。

优点:只有控制器注册监听器,可以有效避免严重依赖 ZooKeeper 的设计的通病——脑裂、羊群效应、ZooKeeper 过载。

5 日志同步机制

在多副本的设计中,要实现数据一致性和顺序性,最简单有效的办法就是选举 leader,由 leader 负责写入顺序,follower 复制同步即可。只要 leader 不出问题,如宕机、脑裂等,那么就不需要担心 follower 的数据同步问题。

不过,一个分布式系统肯定需要考虑故障转移的。这时就需要考虑 leader 宕机后,选举新 leader 的问题。上面讲了控制器会保证分区 leader 的唯一性,但是数据丢失的问题,还是需要 follower 里面有跟上 leader 的才行。因此,Kafka 里面会有 ISR 这么一个概念。另外,如果需要做到告知客户端成功提交了某条消息,就需要保证新 leader 里面必需有这条消息,那么需要配置 acks=all 等相关配置。

作者介绍

蔡柱梁,51CTO社区编辑,从事Java后端开发8年,做过传统项目广电BOSS系统,后投身互联网电商,负责过订单,TMS,中间件等。