IT小栈

  • 主页
  • Java基础
  • RocketMQ
  • Kafka
  • Redis
  • Shiro
  • Spring
  • Spring Boot
  • Spring Cloud
  • 资料链接
  • 关于
所有文章 友链

IT小栈

  • 主页
  • Java基础
  • RocketMQ
  • Kafka
  • Redis
  • Shiro
  • Spring
  • Spring Boot
  • Spring Cloud
  • 资料链接
  • 关于

Kafka位移主题

2020-03-19

上一节我们介绍了位移提交,消费的位移如何保存时其中提到了位移主题__consumer_offsets这个是Kafka的内部主题,还有一个内部主题 __transaction_state(事务有关),后面我们再介绍,内部主题由Kafka自身进行维护,本节我们将揭开内部主题__consumer_offsets神秘的面纱。

1、背景

  • 老版本
    • Consumer 重启后根据 ZK 中 offset 从上次位置继续消费
    • 但是 ZK 不适合高频写操作
  • 新版本
    • 将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 __consumer_offsets 中
    • 因此该 Topic 的主要作用:保存 Kafka Consumer offset

将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 consumer_offsets 中。可以这么说,consumer_offsets 的主要作用是保存 Kafka 消费者的位移信息。它要求这个提交过程不仅要实现高持久性,还要支持高频的写操作。显然,Kafka 的主题设计天然就满足这两个条件,因此,使用 Kafka 主题来保存位移这件事情,实际上就是一个水到渠成的想法了。

这里我想再次强调一下,和你创建的其他主题一样,位移主题就是普通的 Kafka 主题。你可以手动地创建它、修改它,甚至是删除它。只不过,它同时也是一个内部主题,大部分情况下,你其实并不需要“搭理”它,也不用花心思去管理它,把它丢给 Kafka 就完事了。

2、位移主题创建

在 Kafka 0.11 之前,当 Kafka 自动创建该主题时,它会综合考虑当前运行的 Broker 台数和 Broker 端参数 offsets.topic.replication.factor 值,然后取两者的较小值作为该主题的副本数,但这就违背了用户设置 offsets.topic.replication.factor 的初衷。这正是很多用户感到困扰的地方:我的集群中有 100 台 Broker,offsets.topic.replication.factor 也设成了 3,为什么我的 __consumer_offsets 主题只有 1 个副本?

其实,这就是因为这个主题是在只有一台 Broker 启动时被创建的。在 0.11 版本之后,社区修正了这个问题。也就是说,0.11 之后,Kafka 会严格遵守 offsets.topic.replication.factor 值。如果当前运行的 Broker 数量小于 offsets.topic.replication.factor 值,Kafka 会创建主题失败,并显式抛出异常。

你也可以选择手动创建位移主题,具体方法就是,在 Kafka 集群尚未启动任何 Consumer 之前,使用 Kafka API 创建它。手动创建的好处在于,你可以创建满足你实际场景需要的位移主题。比如很多人说 50 个分区对我来讲太多了,我不想要这么多分区,那么你可以自己创建它,不用理会 offsets.topic.num.partitions 的值。不过我给你的建议是,还是让 Kafka 自动创建比较好。

目前 Kafka 源码中有一些地方硬编码了 50 分区数,因此如果你自行创建了一个不同于默认分区数的位移主题,可能会碰到各种各种奇怪的问题。这是社区的一个 bug,目前代码已经修复了,但依然在审核中。

主题的副本值已经是 1 了,我们能否把它增加到 3 呢?当然可以。我们来看一下具体的方法。

第 1 步是创建一个 json 文件,显式提供 50 个分区对应的副本数。注意,replicas 中的 3 台 Broker 排列顺序不同,目的是将 Leader 副本均匀地分散在 Broker 上。该文件具体格式如下:

1
2
3
4
5
6
7
8
{"version":1, "partitions":[
{"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]},
{"topic":"__consumer_offsets","partition":1,"replicas":[0,2,1]},
{"topic":"__consumer_offsets","partition":2,"replicas":[1,0,2]},
{"topic":"__consumer_offsets","partition":3,"replicas":[1,2,0]},
...
{"topic":"__consumer_offsets","partition":49,"replicas":[0,1,2]}
]}

第 2 步是执行 kafka-reassign-partitions 脚本,命令如下

1
bin/kafka-reassign-partitions.sh --zookeeper zookeeper_host:port --reassignment-json-file reassign.json --execute

当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题

Kafka 自动创建,分区数配置根据 Broker 参数 offsets.topic.num.partitions,默认是 50,Kafka 会创建一个 50 分区的位移主题。

位移主题的副本数取决于 Broker 参数 offsets.topic.replication.factor,默认是 1。

Kafka 集群尚未启动任何 Consumer 之前,使用 Kafka API 创建它(不建议自己创建)

3、消息格式

你可能会好奇,这个主题存的到底是什么格式的消息呢?所谓的消息格式,你可以简单地理解为是一个 KV 对。Key 和 Value 分别表示消息的键值和消息体,在 Kafka 中它们就是字节数组而已。想象一下,如果让你来设计这个主题,你觉得消息格式应该长什么样子呢?我先不说社区的设计方案,我们自己先来设计一下。

首先从 Key 说起。一个 Kafka 集群中的 Consumer 数量会有很多,既然这个主题保存的是 Consumer 的位移数据,那么消息格式中必须要有字段来标识这个位移数据是哪个 Consumer 的。这种数据放在哪个字段比较合适呢?显然放在 Key 中比较合适。

现在我们知道该主题消息的 Key 中应该保存标识 Consumer 的字段,那么,当前 Kafka 中什么字段能够标识 Consumer 呢?还记得之前我们说 Consumer Group 时提到的 Group ID 吗?没错,就是这个字段,它能够标识唯一的 Consumer Group。说到这里,我再多说几句。除了 Consumer Group,Kafka 还支持独立 Consumer,也称 Standalone Consumer。它的运行机制与 Consumer Group 完全不同,但是位移管理的机制却是相同的。因此,即使是 Standalone Consumer,也有自己的 Group ID 来标识它自己,所以也适用于这套消息格式。

现在知道 Key 中保存了 Group ID,但是只保存 Group ID 就可以了吗?别忘了,Consumer 提交位移是在分区层面上进行的,即它提交的是某个或某些分区的位移,那么很显然,Key 中还应该保存 Consumer 要提交位移的分区。

好了,我们来总结一下我们的结论。位移主题的 Key 中应该保存 3 部分内容:<Group ID,主题名,分区号 >。如果你认同这样的结论,那么恭喜你,社区就是这么设计的!接下来,我们再来看看消息体的设计。也许你会觉得消息体应该很简单,保存一个位移值就可以了。实际上,社区的方案要复杂得多,比如消息体还保存了位移提交的一些其他元数据,诸如时间戳和用户自定义的数据等。保存这些元数据是为了帮助 Kafka 执行各种各样后续的操作,比如删除过期位移消息等。但总体来说,我们还是可以简单地认为消息体就是保存了位移值。

当然了,位移主题的消息格式可不是只有这一种。事实上,它有 3 种消息格式。除了刚刚我们说的这种格式,还有 2 种格式:

  1. 用于保存 Consumer Group 信息的消息。
  2. 用于删除 Group 过期位移甚至是删除 Group 的消息。

第 1 种格式非常神秘,以至于你几乎无法在搜索引擎中搜到它的身影。不过,你只需要记住它是用来注册 Consumer Group 的就可以了。

第 2 种格式相对更加有名一些。它有个专属的名字:tombstone 消息,即墓碑消息,也称 delete mark。下次你在 Google 或百度中见到这些词,不用感到惊讶,它们指的是一个东西。这些消息只出现在源码中而不暴露给你。它的主要特点是它的消息体是 null,即空消息体。那么,何时会写入这类消息呢?一旦某个 Consumer Group 下的所有 Consumer 实例都停止了,而且它们的位移数据都已被删除时,Kafka 会向位移主题的对应分区写入 tombstone 消息,表明要彻底删除这个 Group 的信息。

4、压实(Compact) 策略

我们上面分析了位移主题的产生的背景、消息格式以及创建时机,下面我们讨论一个问题,我们上一节说过了,消费位移的提交分为主动和手动提交两种方式。默认就是手动提交,每隔5秒提交一次消费位移,当然这个参数是可以修改的。

事实上,很多与 Kafka 集成的大数据框架都是禁用自动提交位移的,如 Spark、Flink 等。这就引出了另一种位移提交方式:手动提交位移,即设置 enable.auto.commit = false。一旦设置了 false,作为 Consumer 应用开发的你就要承担起位移提交的责任。Kafka Consumer API 为你提供了位移提交的方法,如 consumer.commitSync 等。当调用这些方法时,Kafka 会向位移主题写入相应的消息。

如果你选择的是自动提交位移,那么就可能存在一个问题:只要 Consumer 一直启动着,它就会无限期地向位移主题写入消息。

我们来举个极端一点的例子。假设 Consumer 当前消费到了某个主题的最新一条消息,位移是 100,之后该主题没有任何新消息产生,故 Consumer 无消息可消费了,所以位移永远保持在 100。由于是自动提交位移,位移主题中会不停地写入位移 =100 的消息。显然 Kafka 只需要保留这类消息中的最新一条就可以了,之前的消息都是可以删除的。这就要求 Kafka 必须要有针对位移主题消息特点的消息删除策略,否则这种消息会越来越多,最终撑爆整个磁盘。

Kafka 是怎么删除位移主题中的过期消息的呢?答案就是 Compaction。国内很多文献都将其翻译成压缩,我个人是有一点保留意见的。在英语中,压缩的专有术语是 Compression,它的原理和 Compaction 很不相同,我更倾向于翻译成压实,或干脆采用 JVM 垃圾回收中的术语:整理。

不管怎么翻译,Kafka 使用 Compact 策略来删除位移主题中的过期消息,避免该主题无限期膨胀。那么应该如何定义 Compact 策略中的过期呢?对于同一个 Key 的两条消息 M1 和 M2,如果 M1 的发送时间早于 M2,那么 M1 就是过期消息。Compact 的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起。我在这里贴一张来自官网的图片,来说明 Compact 过程。

图中位移为 0、2 和 3 的消息的 Key 都是 K1。意味着<Group ID,主题名,分区号 >相同存储了很多次消费位移,对于之前的消费位移我们是不关心的只会获取最新的一个。Compact 之后,分区只需要保存位移为 3 的消息,因为它是最新发送的。Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据。这个后台线程叫 Log Cleaner。很多实际生产环境中都出现过位移主题无限膨胀占用过多磁盘空间的问题,如果你的环境中也有这个问题,我建议你去检查一下 Log Cleaner 线程的状态,通常都是这个线程挂掉了导致的。

本文作者: 顾 明 训
本文链接: https://www.itzones.cn/2020/03/19/Kafka位移主题/
版权声明: 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!
  • kafka位移主题
  • kafka

扫一扫,分享到微信

微信分享二维码
Kafka副本机制
Kafka位移提交
  1. 1. 1、背景
  2. 2. 2、位移主题创建
  3. 3. 3、消息格式
  4. 4. 4、压实(Compact) 策略
© 2020 IT小栈
载入天数...载入时分秒... || 本站总访问量次 || 本站访客数人次
Hexo Theme Yilia by Litten
  • 所有文章
  • 友链

tag:

  • jvm
  • Java基础
  • kafka HW
  • kafka Leader Epoch
  • kafka
  • kafka位移主题
  • kafka位移提交
  • kafka副本机制
  • kafka ISR
  • zookeeper
  • kafka消息丢失
  • kafka日志存储
  • kafka Log Clean
  • kafka Log Compaction
  • kafka消费位移设置
  • kafka Rebalance
  • kafka分区算法
  • kafka生产者拦截器
  • kafka SASL/SCRAM
  • kafka ACL
  • redis
  • redis Ziplist
  • redis Hashtable
  • redis LinkedList
  • redis QuickList
  • redis intset
  • redis String
  • redis SDS
  • redis SkipList
  • redisDb
  • redisServer
  • redis 简介
  • Redis Cluster
  • 主从同步
  • RocketMQ高可用HA
  • 事务消息
  • 内存映射
  • MMAP
  • 同步刷盘
  • 异步刷盘
  • 消息存储文件
  • RocketMQ安装
  • 延迟消息
  • RocketMQ入门
  • 推拉模式
  • PushConsumer
  • 消费结果处理
  • rebalance
  • RocketMQ权限控制
  • RocketMQ ACL
  • 消息过滤
  • 消息重试
  • 消费位置
  • 集群消费
  • 广播消费
  • 运维命令
  • shiro源码分析
  • shiro入门
  • IOC和DI
  • Spring创建Bean
  • Bean生命周期
  • Sping属性注入
  • 异常
  • SpringMVC
  • springCloud
  • Eureka

    缺失模块。
    1、请确保node版本大于6.2
    2、在博客根目录(注意不是yilia根目录)执行以下命令:
    npm i hexo-generator-json-content --save

    3、在根目录_config.yml里添加配置:

      jsonContent:
        meta: false
        pages: false
        posts:
          title: true
          date: true
          path: true
          text: false
          raw: false
          content: false
          slug: false
          updated: false
          comments: false
          link: false
          permalink: false
          excerpt: false
          categories: false
          tags: true
    

  • 我的OSCHINA
  • 我的CSDN
  • 我的GITHUB
  • 一生太水