IT小栈

  • 主页
  • Java基础
  • RocketMQ
  • Kafka
  • Redis
  • Shiro
  • Spring
  • Spring Boot
  • Spring Cloud
  • 资料链接
  • 关于
所有文章 友链

IT小栈

  • 主页
  • Java基础
  • RocketMQ
  • Kafka
  • Redis
  • Shiro
  • Spring
  • Spring Boot
  • Spring Cloud
  • 资料链接
  • 关于

Kafka消费组位移重设

2020-04-01

Kafka 和传统的消息引擎在设计上是有很大区别的,其中一个比较显著的区别就是,Kafka 的消费者读取消息是可以重新消费的。这在很多实际场景中非常有用,可能我们需要获取前30分钟后的数据,或者指定某个位移之后的数据。

1、消费组开始消费位置

我们在研究如何设置消费组位移之前我们先看看消费者组在初始化的时候的消费是从哪开始的?在启动消费者的过程中,我们会指定消费的位置默认是从队列尾部开始消费,相关的参数配置auto.offset.reset=latest(默认值);当然我们也可以指定从队列的头部进行消费,设置auto.offset.reset=earliest,下面是一段关于消费者的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.10.15.235:9092;10.10.15.236:9092;10.10.15.237:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG,"gumx_record");
//自动提交偏移量
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
//偏移量提交的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
//key序列化
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//value序列化
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//从哪开始消费"latest"、 "earliest"、"none"
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("gumx"));
while(true){
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(2000));
for(ConsumerRecord<String,String> record : records){
System.out.println("");
System.out.printf("消息消费,分区= %d,offset= %d,key= %s,value= %s%n",record.partition(),record.offset(),record.key(),record.value());
}
}

auto.offset.reset这个参数只有在group.id消费组下的某个消费者实例第一次启动的时候才会生效即(该消费组在服务器中还没有存储其消费位移时)。

auto.offset.reset=earliest 时并不代表,消费时的位移从0开始的,存在日志清理的可能,所谓的从头开始消费是指已存在的消息文件的开始消费

earliest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。

latest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。

none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。

2、重设位移

Kafka,由于它是基于日志结构(log-based)的消息引擎,消费者在消费消息时,仅仅是从磁盘文件上读取数据而已,是只读的操作,因此消费者不会删除消息数据。同时,由于位移数据是由消费者控制的,因此它能够很容易地修改位移的值,实现重复消费历史数据的功能。

重设消费组位移方法有很多可分为两个维度

2.1、位移维度

这是指根据位移值来重设。也就是说,直接把消费者的位移值重设成我们给定的位移值。

策略 含义
Earliest 把位移调整到当前最早位移处,不一定是0,因为很久以前的消息可能被自动删除了
Latest 把位移调整到当前最新位移处
Current 把位移调整到当前最新提交位移处
Specified-Offset 把位移调整到指定移处
Shift-By-N 把位移调整到指定移处+N处,N允许负数

Earliest 策略表示将位移调整到主题当前最早位移处。这个最早位移不一定就是 0,因为在生产环境中,很久远的消息会被 Kafka 自动删除,所以当前最早位移很可能是一个大于 0 的值。如果你想要重新消费主题的所有消息,那么可以使用 Earliest 策略。

Latest 策略表示把位移重设成最新末端位移。如果你总共向某个主题发送了 15 条消息,那么最新末端位移就是 15。如果你想跳过所有历史消息,打算从最新的消息处开始消费的话,可以使用 Latest 策略。

Current 策略表示将位移调整成消费者当前提交的最新位移。有时候你可能会碰到这样的场景:你修改了消费者程序代码,并重启了消费者,结果发现代码有问题,你需要回滚之前的代码变更,同时也要把位移重设到消费者重启时的位置,那么,Current 策略就可以帮你实现这个功能。

Specified-Offset 策略则是比较通用的策略,表示消费者把位移值调整到你指定的位移处。这个策略的典型使用场景是,消费者程序在处理某条错误消息时,你可以手动地“跳过”此消息的处理。在实际使用过程中,可能会出现 corrupted 消息无法被消费的情形,此时消费者程序会抛出异常,无法继续工作。一旦碰到这个问题,你就可以尝试使用 Specified-Offset 策略来规避。

Shift-By-N 策略指定的就是位移的相对数值,即你给出要跳过的一段消息的距离即可。这里的“跳”是双向的,你既可以向前“跳”,也可以向后“跳”。比如,你想把位移重设成当前位移的前 100 条位移处,此时你需要指定 N 为 -100

2.2、时间维度

我们可以给定一个时间,让消费者把位移调整成大于该时间的最小位移;也可以给出一段时间间隔,比如 30 分钟前,然后让消费者直接将位移调回 30 分钟之前的位移值。

策略 含义
DateTime 把位移调整到大于给定时间的最小位移处
Duration 把位移调整到距离当前时间指定间隔位移处, 具体格式类似于java8的 java.time.Duration

DateTime 允许你指定一个时间,然后将位移重置到该时间之后的最早位移处。常见的使用场景是,你想重新消费昨天的数据,那么你可以使用该策略重设位移到昨天 0 点。

Duration 策略则是指给定相对的时间间隔,然后将位移调整到距离当前给定时间间隔的位移处,具体格式是 PnDTnHnMnS。如果你熟悉 Java 8 引入的 Duration 类的话,你应该不会对这个格式感到陌生。它就是一个符合 ISO-8601 规范的 Duration 格式,以字母 P 开头,后面由 4 部分组成,即 D、H、M 和 S,分别表示天、小时、分钟和秒。举个例子,如果你想将位移调回到 15 分钟前,那么你就可以指定 PT0H15M0S。

3、设置方式

3.1、消费者 API 方式设置

通过 Java API 的方式来重设位移,你需要调用 KafkaConsumer 的 seek 方法,或者是它的变种方法 seekToBeginning 和 seekToEnd。我们来看下它们的方法签名。

1
2
3
4
void seek(TopicPartition partition, long offset);
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);
void seekToBeginning(Collection<TopicPartition> partitions);
void seekToEnd(Collection<TopicPartition> partitions);

根据方法的定义,我们可以知道,每次调用 seek 方法只能重设一个分区的位移。OffsetAndMetadata 类是一个封装了 Long 型的位移和自定义元数据的复合类,只是一般情况下,自定义元数据为空,因此你基本上可以认为这个类表征的主要是消息的位移值。seek 的变种方法 seekToBeginning 和 seekToEnd 则拥有一次重设多个分区的能力。我们在调用它们时,可以一次性传入多个主题分区。

3.1.1、Earliest策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.15.235:9092;10.10.15.236:9092;10.10.15.237:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "gumx_record");
// 禁止自动提交偏移量
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// 偏移量提交的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// key序列化
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
// value序列化
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

String topic = "gumx";
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
try {
consumer.subscribe(Collections.singleton(topic));
consumer.poll(0);
consumer.seekToBeginning(consumer.partitionsFor(topic).stream()
.map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()))
.collect(Collectors.toList()));
} catch (Exception e) {
e.printStackTrace();
}

注意事项:

你要创建的消费者程序,要禁止自动提交位移。

组 ID 要设置成你要重设的消费者组的组 ID。

调用 seekToBeginning 方法时,需要一次性构造主题的所有分区对象。

最重要的是,一定要调用带长整型的 poll 方法,而不要调用 consumer.poll(Duration.ofSecond(0))。

3.1.2、Latest 策略

代码省略,其他代码可参考Earliest策略中的

1
2
consumer.seekToEnd( consumer.partitionsFor(topic).stream()
.map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())).collect(Collectors.toList()));

3.1.3、Current 策略

实现 Current 策略的方法很简单,我们需要借助 KafkaConsumer 的 committed 方法来获取当前提交的最新位移

1
2
3
4
5
6
consumer.partitionsFor(topic).stream()
.map(info -> new TopicPartition(topic, info.partition()))
.forEach(tp -> {
long committedOffset = consumer.committed(tp).offset();
consumer.seek(tp, committedOffset);
});

这段代码首先调用 partitionsFor 方法获取给定主题的所有分区,然后依次获取对应分区上的已提交位移,最后通过 seek 方法重设位移到已提交位移处。

3.1.4、 Specified-Offset 策略

1
2
3
4
5
long targetOffset = 1234L;
for (PartitionInfo info : consumer.partitionsFor(topic)) {
TopicPartition tp = new TopicPartition(topic, info.partition());
consumer.seek(tp, targetOffset);
}

3.1.5、Shift-By-N 策略

1
2
3
4
5
6
for(PartitionInfo info : consumer.partitionsFor(topic)) { 
TopicPartition tp = new TopicPartition(topic, info.partition());
// 假设向前跳123条消息
long targetOffset = consumer.committed(tp).offset() + 123L;
consumer.seek(tp, targetOffset);}
}

3.1.6、DateTime 策略

如果要实现 DateTime 策略,我们需要借助另一个方法:KafkaConsumer. offsetsForTimes 方法。假设我们要重设位移到 2019 年 6 月 20 日晚上 8 点,那么具体代码如下:

1
2
3
4
5
6
7
8
long ts = LocalDateTime.of(2019, 6, 20, 20, 0).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
Map<TopicPartition, Long> timeToSearch =
consumer.partitionsFor(topic).stream()
.map(info -> new TopicPartition(topic, info.partition()))
.collect(Collectors.toMap(Function.identity(), tp -> ts));
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : consumer.offsetsForTimes(timeToSearch).entrySet()) {
consumer.seek(entry.getKey(), entry.getValue().offset());
}

这段代码构造了 LocalDateTime 实例,然后利用它去查找对应的位移值,最后调用 seek,实现了重设位移。

3.1.7、Duration 策略

假设我们要将位移调回 30 分钟前,那么代码如下:

1
2
3
4
5
6
7
Map<TopicPartition, Long> timeToSearch = 
consumer.partitionsFor(topic).stream()
.map(info -> new TopicPartition(topic, info.partition()))
.collect(Collectors.toMap(Function.identity(), tp -> System.currentTimeMillis() - 30 * 1000 * 60));
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry:consumer.offsetsForTimes(timeToSearch).entrySet()) {
consumer.seek(entry.getKey(), entry.getValue().offset());
}

3.2、命令行方式设置

3.2.1、Earliest 策略直接指定–to-earliest

1
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute

3.2.2、Latest 策略直接指定–to-latest。

1
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute

3.2.3、Current 策略直接指定–to-curren

1
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute

3.2.4、Specified-Offset 策略直接指定–to-offset

1
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-offset --execute

3.2.5、Shift-By-N 策略直接指定–shift-by N

1
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --shift-by --execute

3.2.6、DateTime 策略直接指定–to-datetime

1
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-datetime 2019-06-20T20:00:00.000 --execute

3.2.7、Duration 策略,我们直接指定–by-duration

1
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --by-duration PT0H30M0S --execute

注意:命令行中的设置都是针对消费组中的所有主题(–all-topics)进行设置的,当然我们可以消费组中的单个主题进行设置,甚至是消费组中的某个主题下的某个分区。

对gumx这个主题进行操作设置到从头开始消费

1
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --topic gumx --to-earliest –execute

对gumx这个主题的分区编号是2的队列消息进行操作设置到从头开始消费

1
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --topic gumx:2 --to-earliest –execute
本文作者: 顾 明 训
本文链接: https://www.itzones.cn/2020/04/01/Kafka消费组位移重设/
版权声明: 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!
  • kafka消费位移设置
  • kafka

扫一扫,分享到微信

微信分享二维码
Kafka无消息丢失配置
Kafka日志清理
  1. 1. 1、消费组开始消费位置
  2. 2. 2、重设位移
    1. 2.1. 2.1、位移维度
    2. 2.2. 2.2、时间维度
  3. 3. 3、设置方式
    1. 3.1. 3.1、消费者 API 方式设置
      1. 3.1.1. 3.1.1、Earliest策略
      2. 3.1.2. 3.1.2、Latest 策略
      3. 3.1.3. 3.1.3、Current 策略
      4. 3.1.4. 3.1.4、 Specified-Offset 策略
      5. 3.1.5. 3.1.5、Shift-By-N 策略
      6. 3.1.6. 3.1.6、DateTime 策略
      7. 3.1.7. 3.1.7、Duration 策略
    2. 3.2. 3.2、命令行方式设置
      1. 3.2.1. 3.2.1、Earliest 策略直接指定–to-earliest
      2. 3.2.2. 3.2.2、Latest 策略直接指定–to-latest。
      3. 3.2.3. 3.2.3、Current 策略直接指定–to-curren
      4. 3.2.4. 3.2.4、Specified-Offset 策略直接指定–to-offset
      5. 3.2.5. 3.2.5、Shift-By-N 策略直接指定–shift-by N
      6. 3.2.6. 3.2.6、DateTime 策略直接指定–to-datetime
      7. 3.2.7. 3.2.7、Duration 策略,我们直接指定–by-duration
© 2020 IT小栈
载入天数...载入时分秒... || 本站总访问量次 || 本站访客数人次
Hexo Theme Yilia by Litten
  • 所有文章
  • 友链

tag:

  • jvm
  • Java基础
  • kafka HW
  • kafka Leader Epoch
  • kafka
  • kafka位移主题
  • kafka位移提交
  • kafka副本机制
  • kafka ISR
  • zookeeper
  • kafka消息丢失
  • kafka日志存储
  • kafka Log Clean
  • kafka Log Compaction
  • kafka消费位移设置
  • kafka Rebalance
  • kafka分区算法
  • kafka生产者拦截器
  • kafka SASL/SCRAM
  • kafka ACL
  • redis
  • redis Ziplist
  • redis Hashtable
  • redis LinkedList
  • redis QuickList
  • redis intset
  • redis String
  • redis SDS
  • redis SkipList
  • redisDb
  • redisServer
  • redis 简介
  • Redis Cluster
  • 主从同步
  • RocketMQ高可用HA
  • 事务消息
  • 内存映射
  • MMAP
  • 同步刷盘
  • 异步刷盘
  • 消息存储文件
  • RocketMQ安装
  • 延迟消息
  • RocketMQ入门
  • 推拉模式
  • PushConsumer
  • 消费结果处理
  • rebalance
  • RocketMQ权限控制
  • RocketMQ ACL
  • 消息过滤
  • 消息重试
  • 消费位置
  • 集群消费
  • 广播消费
  • 运维命令
  • shiro源码分析
  • shiro入门
  • IOC和DI
  • Spring创建Bean
  • Bean生命周期
  • Sping属性注入
  • 异常
  • SpringMVC
  • springCloud
  • Eureka

    缺失模块。
    1、请确保node版本大于6.2
    2、在博客根目录(注意不是yilia根目录)执行以下命令:
    npm i hexo-generator-json-content --save

    3、在根目录_config.yml里添加配置:

      jsonContent:
        meta: false
        pages: false
        posts:
          title: true
          date: true
          path: true
          text: false
          raw: false
          content: false
          slug: false
          updated: false
          comments: false
          link: false
          permalink: false
          excerpt: false
          categories: false
          tags: true
    

  • 我的OSCHINA
  • 我的CSDN
  • 我的GITHUB
  • 一生太水