IT小栈

  • 主页
  • Java基础
  • RocketMQ
  • Kafka
  • Redis
  • Shiro
  • Spring
  • Spring Boot
  • Spring Cloud
  • 资料链接
  • 关于
所有文章 友链

IT小栈

  • 主页
  • Java基础
  • RocketMQ
  • Kafka
  • Redis
  • Shiro
  • Spring
  • Spring Boot
  • Spring Cloud
  • 资料链接
  • 关于

Kafka位移提交

2020-03-18

Kafka的消费模式都是手动拉取,没有RoceketMQ的推拉模式,这就需要我们知道每次消费后消息的消费位移就需要我们自己来维护,当然Kafka为了解放开发人员同时提供了自动提交方式,手动提交分为异步和同步提交位移,本章我们就重点分析各种方式的利弊。

1、消费者组

提到位移提交那自然和消费者有关,那就不得不提Kafka的消费者组的概念。消费者组,即 Consumer Group,应该算是 Kafka 比较有亮点的设计了。那么何谓 Consumer Group 呢?用一句话概括就是:Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(Consumer Instance),它们共享一个公共的 ID,这个 ID 被称为 Group ID。组内的所有消费者协调在一起来消费订阅主题(Subscribed Topics)的所有分区(Partition)。当然,每个分区只能由同一个消费者组内的一个 Consumer 实例来消费。

Consumer Group 下可以有一个或多个 Consumer 实例。这里的实例可以是一个单独的进程,也可以是同一进程下的线程。在实际场景中,使用进程更为常见一些。

Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group。

Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费。这个分区当然也可以被其他的 Group 内的消费者实例消费

一个消费者可能消费0到多个分区,例如:某个主题有4个分区,然而消费者有5个那么根据上一条原则,一个分区只能有一个消费者,就会有消费者没有分配到队列

2、消费位移

Consumer Group,Kafka 是如何管理位移的呢?消费者在消费的过程中需要记录自己消费了多少数据,即消费位置信息。在 Kafka 中,我们如何保存这个位移数据,才能保证消息的消费可以正常进行不至于错乱、丢失、重复等。

切记不可与消息的位移等同,消息的位移指的是消息在某个分区持久化的位置,消费位移是指消息被消费者消费到的位置消息的位移>=消费位移

Kafka 有新旧客户端 API 之分,那自然也就有新旧 Consumer 之分。老版本的 Consumer 也有消费者组的概念,它和我们目前讨论的 Consumer Group 在使用感上并没有太多的不同,只是它管理位移的方式和新版本是不一样的。

老版本的 Consumer Group 把位移保存在 ZooKeeper 中。Apache ZooKeeper 是一个分布式的协调服务框架,Kafka 重度依赖它实现各种各样的协调管理。将位移保存在 ZooKeeper 外部系统的做法,最显而易见的好处就是减少了 Kafka Broker 端的状态保存开销。现在比较流行的提法是将服务器节点做成无状态的,这样可以自由地扩缩容,实现超强的伸缩性。Kafka 最开始也是基于这样的考虑,才将 Consumer Group 位移保存在独立于 Kafka 集群之外的框架中。

新版本的 Consumer Group 中,Kafka 社区重新设计了 Consumer Group 的位移管理方式,采用了将位移保存在 Kafka 内部主题的方法。这个内部主题就是让人既爱又恨的 __consumer_offsets。我会在专栏后面的内容中专门介绍这个神秘的主题。不过,现在你需要记住新版本的 Consumer Group 将位移保存在 Broker 端的内部主题中。后面我们会重点介绍位移主题。

如果消费者一直处于运行的状态那么这个偏移量没有什么用。不过如果这个消费者崩溃或者有新的消费者加入群组触发再均衡策略,那么再均衡之后该分区的消费者如果不是之前的那一位,那么新的小伙伴怎么知道之前的伙计消费到哪里呢。所以提交他自己的offset就发挥作用了

Consumer读取partition中的数据是通过调用发起一个fetch请求来执行的。而从KafkaConsumer来看,它有一个poll方法。但是这个poll方法只是可能会发起fetch请求。原因是:Consumer每次发起fetch请求时,读取到的数据是有限制的,通过配置项max.partition.fetch.bytes来限制的。而在执行poll方法时,会根据配置项个max.poll.records来限制一次最多pool多少个record。

那么就可能出现这样的情况: 在满足max.partition.fetch.bytes限制的情况下,假如fetch到了100个record,放到本地缓存后,由于max.poll.records限制每次只能poll出15个record。那么KafkaConsumer就需要执行7次才能将这一次通过网络发起的fetch请求所fetch到的这100个record消费完毕。其中前6次是每次pool中15个record,最后一次是poll出10个record。

在consumer中,还有另外一个配置项:max.poll.interval.ms ,它表示最大的poll数据间隔,如果超过这个间隔没有发起pool请求,但heartbeat仍旧在发,就认为该consumer处于 livelock状态。就会将该consumer退出consumer group。所以为了不使Consumer 自己被退出,Consumer 应该不停的发起poll(timeout)操作。而这个动作 KafkaConsumer Client是不会帮我们做的,这就需要自己在程序中不停的调用poll方法了。这就要求我们处理消息时的方法不易过重,处理时间过长(处理数据时间>max.poll.interval.ms)会被认为当前的consumer已经掉线。

3、自动提交

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Properties props = new Properties();
props.put("bootstrap.servers","10.10.15.235:9092;10.10.15.236:9092;10.10.15.237:9092");
props.put("group.id","gumx_record");
//自动提交偏移量
props.put("enable.auto.commit","true");
//偏移量提交的间隔时间
props.put("auto.commit.interval.ms","1000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("gumx"));

while(true){
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(2000));
for(ConsumerRecord<String,String> record : records){
System.out.println("");
System.out.printf("消息消费,分区= %d,offset= %d,key= %s,value= %s%n",record.partition(),record.offset(),record.key(),record.value());
}
}

开启自动提交位移的方法很简单。Consumer 端有个参数 enable.auto.commit,把它设置为 true 或者压根不设置它就可以了。因为它的默认值就是 true,即 Java Consumer 默认就是自动提交位移的。如果启用了自动提交,Consumer 端还有个参数就派上用场了:auto.commit.interval.ms。它的默认值是 5 秒,表明 Kafka 每 5 秒会为你自动提交一次位移。

一旦设置了 enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。但自动提交位移的一个问题在于,它可能会出现重复消费。

在默认情况下,Consumer 每 5 秒自动提交一次位移。现在,我们假设提交位移之后的 3 秒发生了 再均衡(Rebalance) 操作。在 Rebalance 之后,所有 Consumer 从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据了,故在 Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。虽然你能够通过减少 auto.commit.interval.ms 的值来提高提交频率,但这么做只能缩小重复消费的时间窗口,不可能完全消除它。这是自动提交机制的一个缺陷。

再均衡Rebalance,指的是消息消费的过程中由于消费组中加入、掉线消费者或订阅的主题增加了分区数等原因导致,目前消费者分配的消费格局被打破。并且在Rebalance过程中,消费者全部处于停滞状态,必须等待Rebalance完成才能恢复正常消费,后面章节中我们会重点介绍,以及如何避免这种现象。

4、手动同步提交

1
2
3
4
5
6
7
while (true) { 
ConsumerRecords records = consumer.poll(Duration.ofSeconds(2000));
// 处理消息
process(records);
// 同步提交
consumer.commitSync();
}

手动提交位移,它的好处就在于更加灵活,你完全能够把控位移提交的时机和频率。但是,它也有一个缺陷,就是在调用 commitSync() 时,Consumer 程序会处于阻塞状态,直到远端的 Broker 返回提交结果,这个状态才会结束。在任何系统中,因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈,会影响整个应用程序的 TPS。当然,你可以选择拉长提交间隔,但这样做的后果是 Consumer 的提交频率下降,在下次 Consumer 重启回来后,会有更多的消息被重新消费。

5、手动异步提交

1
2
3
4
5
6
7
8
9
10
11
12
while (true) { 
ConsumerRecords records = consumer.poll(Duration.ofSeconds(2000));
// 处理消息
process(records);
// 异步提交,offsets是个Map<TopicPartition, OffsetAndMetadata> 记录每个分区对应的消费位移
consumer.commitAsync((offsets, exception) -> {
if (exception != null)
//异常处理
handle(exception);
}
);
}

异步操作。调用 commitAsync() 之后,它会立即返回,不会阻塞,因此不会影响 Consumer 应用的 TPS。由于它是异步的,Kafka 提供了回调函数(callback),供你实现提交之后的逻辑,比如记录日志或处理异常等。下面这段代码展示了调用 commitAsync() 的方法:

commitAsync 是否能够替代 commitSync 呢?答案是不能。commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的。

消费者提了异步 commit 实际还没更新完offset,消费者再不断地poll,只要consumer没有重启,不会发生重复消费。因为在运行过程中consumer会记录已获取的消息位移

6、异步+同步手动提交

我们发现不管是异步还是同步都有其弊端,那么我们是不是可以结合两者一起使用。

  1. 我们可以利用 commitSync 的自动重试来规避那些瞬时错误,比如网络的瞬时抖动,Broker 端 GC 等。因为这些问题都是短暂的,自动重试通常都会成功,因此,我们不想自己重试,而是希望 Kafka Consumer 帮我们做这件事。
  2. 我们不希望程序总处于阻塞状态,影响 TPS。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
try { 
while(true) {
ConsumerRecords records = consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
commitAysnc(); // 使用异步提交规避阻塞
}
} catch(Exception e) {
handle(e); // 处理异常
} finally {
try {
consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
} finally {
consumer.close();
}
}

这段代码同时使用了 commitSync() 和 commitAsync()。对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据。将两者结合后,我们既实现了异步无阻塞式的位移管理,也确保了 Consumer 位移的正确性,所以,如果你需要自行编写代码开发一套 Kafka Consumer 应用,那么我推荐你使用上面的代码范例来实现手动的位移提交。

7、改进(推荐使用)

设想这样一个场景:你的 poll 方法返回的不是 500 条消息,而是 5000 条。那么,你肯定不想把这 5000 条消息都处理完之后再提交位移,因为一旦中间出现差错,之前处理的全部都要重来一遍。这类似于我们数据库中的事务处理。很多时候,我们希望将一个大事务分割成若干个小事务分别提交,这能够有效减少错误恢复的时间。

在 Kafka 中也是相同的道理。对于一次要处理很多消息的 Consumer 而言,它会关心社区有没有方法允许它在消费的中间进行位移提交。比如前面这个 5000 条消息的例子,你可能希望每处理完 100 条消息就提交一次位移,这样能够避免大批量的消息重新消费。

庆幸的是,Kafka Consumer API 为手动提交提供了这样的方法:commitSync(Map) 和 commitAsync(Map)。它们的参数是一个 Map 对象,键就是 TopicPartition,即消费的分区,而值是一个 OffsetAndMetadata 对象,保存的主要是位移数据。就拿刚刚提过的那个例子来说,如何每处理 100 条消息就提交一次位移呢?在这里,我以 commitAsync 为例,展示一段代码,实际上,commitSync 的调用方法和它是一模一样的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
try{
int count = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1000));
for (ConsumerRecord<String, String> record: records) {
process(record); // 处理消息
offsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1));
if(count % 100 == 0){
consumer.commitAsync(offsets, null); // 回调处理逻辑是null
}
count++;
}
}
}catch(Exception e) {
handle(e); // 处理异常
} finally {
try {
consumer.commitSync(offsets); // 最后一次提交使用同步阻塞式提交
} finally {
consumer.close();
}
}

最后一步consumer.commitSync(offsets); 提交很重要,我们需要自己去维护这个消费位移防止process(record); 消息处理过程抛异常导致将本次拉取的一批数据的位移进行提交。如果采用异步+同步手动提交中的代码范例,如果消息处理发生异常,consumer.commitSync();这种是默认将本次拉取的这批数据的最大消费位移提交,Kafka不知道在哪发生异常,这个由开发者控制默认就是全部处理完成。

本文作者: 顾 明 训
本文链接: https://www.itzones.cn/2020/03/18/Kafka位移提交/
版权声明: 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!
  • kafka位移提交
  • kafka

扫一扫,分享到微信

微信分享二维码
Kafka位移主题
Kafka生产者
  1. 1. 1、消费者组
  2. 2. 2、消费位移
  3. 3. 3、自动提交
  4. 4. 4、手动同步提交
  5. 5. 5、手动异步提交
  6. 6. 6、异步+同步手动提交
  7. 7. 7、改进(推荐使用)
© 2020 IT小栈
载入天数...载入时分秒... || 本站总访问量次 || 本站访客数人次
Hexo Theme Yilia by Litten
  • 所有文章
  • 友链

tag:

  • jvm
  • Java基础
  • kafka HW
  • kafka Leader Epoch
  • kafka
  • kafka位移主题
  • kafka位移提交
  • kafka副本机制
  • kafka ISR
  • zookeeper
  • kafka消息丢失
  • kafka日志存储
  • kafka Log Clean
  • kafka Log Compaction
  • kafka消费位移设置
  • kafka Rebalance
  • kafka分区算法
  • kafka生产者拦截器
  • kafka SASL/SCRAM
  • kafka ACL
  • redis
  • redis Ziplist
  • redis Hashtable
  • redis LinkedList
  • redis QuickList
  • redis intset
  • redis String
  • redis SDS
  • redis SkipList
  • redisDb
  • redisServer
  • redis 简介
  • Redis Cluster
  • 主从同步
  • RocketMQ高可用HA
  • 事务消息
  • 内存映射
  • MMAP
  • 同步刷盘
  • 异步刷盘
  • 消息存储文件
  • RocketMQ安装
  • 延迟消息
  • RocketMQ入门
  • 推拉模式
  • PushConsumer
  • 消费结果处理
  • rebalance
  • RocketMQ权限控制
  • RocketMQ ACL
  • 消息过滤
  • 消息重试
  • 消费位置
  • 集群消费
  • 广播消费
  • 运维命令
  • shiro源码分析
  • shiro入门
  • IOC和DI
  • Spring创建Bean
  • Bean生命周期
  • Sping属性注入
  • 异常
  • SpringMVC
  • springCloud
  • Eureka

    缺失模块。
    1、请确保node版本大于6.2
    2、在博客根目录(注意不是yilia根目录)执行以下命令:
    npm i hexo-generator-json-content --save

    3、在根目录_config.yml里添加配置:

      jsonContent:
        meta: false
        pages: false
        posts:
          title: true
          date: true
          path: true
          text: false
          raw: false
          content: false
          slug: false
          updated: false
          comments: false
          link: false
          permalink: false
          excerpt: false
          categories: false
          tags: true
    

  • 我的OSCHINA
  • 我的CSDN
  • 我的GITHUB
  • 一生太水