IT小栈

  • 主页
  • Java基础
  • RocketMQ
  • Kafka
  • Redis
  • Shiro
  • Spring
  • Spring Boot
  • Spring Cloud
  • 资料链接
  • 关于
所有文章 友链

IT小栈

  • 主页
  • Java基础
  • RocketMQ
  • Kafka
  • Redis
  • Shiro
  • Spring
  • Spring Boot
  • Spring Cloud
  • 资料链接
  • 关于

Kafka无消息丢失配置

2020-04-02

Kafka在0.11 版本之前确实存在可能数据丢失的情况我们在《Kafka HW与Leader Epoch》一文中也有相应的说明,但是最新版本已经没有这些问题了,可能现在网上还在流传Kafka丢失数据,那很有肯能就是我们的配置问题,一些原理还没搞懂。本节我们重点分析下无消息丢失的参数如何配置。

我们先界定一下怎么才叫kafka丢失数据,有些锅我们可不背。Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。这些数据保证数据不会丢失。

第一个核心要素是“已提交的消息”。什么是已提交的消息?当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时,这条消息在 Kafka 看来就正式变为“已提交”消息了。

那为什么是若干个 Broker 呢?这取决于你对“已提交”的定义。你可以选择只要有一个 Broker 成功保存该消息就算是已提交,也可以是令所有 Broker 都成功保存该消息才算是已提交。不论哪种情况,Kafka 只对已提交的消息做持久化保证这件事情是不变的。

第二个核心要素就是“有限度的持久化保证”,也就是说 Kafka 不可能保证在任何情况下都做到不丢失消息。举个极端点的例子,如果地球都不存在了,Kafka 还能保存任何消息吗?显然不能!倘若这种情况下你依然还想要 Kafka 不丢消息,那么只能在别的星球部署 Kafka Broker 服务器了。现在你应该能够稍微体会出这里的“有限度”的含义了吧,其实就是说 Kafka 不丢消息是有前提条件的。假如你的消息保存在 N 个 Kafka Broker 上,那么这个前提条件就是这 N 个 Broker 中至少有 1 个存活。只要这个条件成立,Kafka 就能保证你的这条消息永远不会丢失。

1、发送的消息已经持久化到磁盘中了

2、消息所在的Broker至少要保证有一台是正常运行状态的

1、生产者程序

生产端我们需要保证消息已经被持久化到磁盘中,很多时候使我们对生产者的API没有理解到位,消息并没有发送到Broker端,我们可能还会责怪Kafka丢数据。

目前 Kafka Producer 是异步发送消息的,也就是说如果你调用的是 producer.send(msg) 这个 API,那么它通常会立即返回,但此时你不能认为消息发送已成功完成。这种发送方式有个有趣的名字,叫“fire and forget”,翻译一下就是“发射后不管”。这个术语原本属于导弹制导领域,后来被借鉴到计算机领域中,它的意思是,执行完一个操作后不去管它的结果是否成功。调用 producer.send(msg) 就属于典型的“fire and forget”,因此如果出现消息丢失,我们是无法知晓的。这个发送方式挺不靠谱吧,不过有些公司真的就是在使用这个 API 发送消息。

可能会有哪些因素导致消息没有发送成功呢?其实原因有很多,例如网络抖动,导致消息压根就没有发送到 Broker 端;或者消息本身不合格导致 Broker 拒绝接收(比如消息太大了,超过了 Broker 的承受能力)等。这么来看,让 Kafka“背锅”就有点冤枉它了。就像前面说过的,Kafka 不认为消息是已提交的,因此也就没有 Kafka 丢失消息这一说了。

不过,就算不是 Kafka 的“锅”,我们也要解决这个问题吧。实际上,解决此问题的方法非常简单:Producer 永远要使用带有回调通知的发送 API,也就是说不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。不要小瞧这里的 callback(回调),它能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Properties props = new Properties();
//kafka 地址
props.put("bootstrap.servers","10.10.15.235:9092;10.10.15.236:9092;10.10.15.237:9092");
//确认有几个副本接收到消息
//1:一个副本 (默认值)
//0:直接返回
//-1或all:所有副本接收到
props.put(ProducerConfig.ACKS_CONFIG,"all");
//客户端发送失败重试次数默认是0(可重试的异常才起效果,如网络问题等)
props.put(ProducerConfig.RETRIES_CONFIG,3);
//key序列化
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//value序列化
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
final Producer<String,String> producer = new KafkaProducer<String,String>(props);
String topic = "gumx";
String message = "发送消息!";
//异步发送,执行回调方法(异常时才会调用)
producer.send(new ProducerRecord<String,String>(topic,message),new Callback() {
@Override
public void onCompletion(RecordMetadata record, Exception e) {
if(null != e){
//发送异常处理
//相关的业务处理
e.printStackTrace();
}
}
});

1、不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。

2、设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明“所有副本” Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。这里的所有副本指代的是ISR中的副本。

3、设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。

2、消费者程序

Consumer 端丢失数据主要体现在 Consumer 端要消费的消息不见了。Consumer 程序有个“位移”的概念,表示的是这个 Consumer 当前消费到的 Topic 分区的位置。下面这张图来自于官网,它清晰地展示了 Consumer 端的位移数据。

比如对于 Consumer A 而言,它当前的位移值就是 9;Consumer B 的位移值是 11。

要解决Kafka 中 Consumer 端的消息丢失,办法很简单:维持先消费消息,再更新位移的顺序即可。这样就能最大限度地保证消息不丢失。

Kafka 获取到消息后开启了多个线程异步处理消息,而 Consumer 程序自动地向前更新位移。假如其中某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了,因此这条消息对于 Consumer 而言实际上是丢失了。这里的关键在于 Consumer 自动提交位移,你没有真正地确认消息是否真的被消费就“盲目”地更新了位移。

这个问题的解决方案也很简单:如果是多线程异步处理消费消息,Consumer 程序不要开启自动提交位移,而是要应用程序手动提交位移。在这里我要提醒你一下,单个 Consumer 程序使用多线程来消费消息说起来容易,写成代码却异常困难,因为你很难正确地处理位移的更新,也就是说避免无消费消息丢失很简单,但极易出现消息被消费了多次的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
try{
int count = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1000));
for (ConsumerRecord<String, String> record: records) {
process(record); // 处理消息
offsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1));
if(count % 100 == 0){
consumer.commitAsync(offsets, null); // 回调处理逻辑是null
}
count++;
}
}
}catch(Exception e) {
handle(e); // 处理异常
} finally {
try {
consumer.commitSync(offsets); // 最后一次提交使用同步阻塞式提交
} finally {
consumer.close();
}
}
}

3、Broker端配置

设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。

设置 replication.factor >= 3。副本因子,这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。

确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。

注意这里的min.insync.replicas和producer参数中的ack=all的关系

只有当ack=all(-1)时min.insync.replicas才生效。

引入min.insync.replicas的目的就是为了做一个下限的限制:不能只满足于ISR全部写入,还要保证ISR中的写入个数不少于min.insync.replicas。

4、特别需要注意

Kafka 还有一种特别隐秘的消息丢失场景:增加主题分区。当增加主题分区后,在某段“不凑巧”的时间间隔后,Producer 先于 Consumer 感知到新增加的分区,而 Consumer 设置的是“从最新位移处”开始读取消息,因此在 Consumer 感知到新分区前,Producer 发送的这些消息就全部“丢失”了,或者说 Consumer 无法读取到这些消息。

解决方式就是在consumer的消费参数中设置auto.offset.reset=earliest,那么我们解释一下为什么,默认我们是使用“latest”,当Consumer 感知到新分区时分区中已经有消息了,那么默认会从尾部进行消费;而使用了“earliest”,由于该消费组的对应该主题的新分区还没有记录消费位移,那么默认就会使用这个消费位点的策略,从头消费,这样就保证了消息不会丢失

本文作者: 顾 明 训
本文链接: https://www.itzones.cn/2020/04/02/Kafka无消息丢失配置/
版权声明: 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!
  • kafka消息丢失
  • kafka

扫一扫,分享到微信

微信分享二维码
Kafka认证授权
Kafka消费组位移重设
  1. 1. 1、生产者程序
  2. 2. 2、消费者程序
  3. 3. 3、Broker端配置
  4. 4. 4、特别需要注意
© 2020 IT小栈
载入天数...载入时分秒... || 本站总访问量次 || 本站访客数人次
Hexo Theme Yilia by Litten
  • 所有文章
  • 友链

tag:

  • jvm
  • Java基础
  • kafka HW
  • kafka Leader Epoch
  • kafka
  • kafka位移主题
  • kafka位移提交
  • kafka副本机制
  • kafka ISR
  • zookeeper
  • kafka消息丢失
  • kafka日志存储
  • kafka Log Clean
  • kafka Log Compaction
  • kafka消费位移设置
  • kafka Rebalance
  • kafka分区算法
  • kafka生产者拦截器
  • kafka SASL/SCRAM
  • kafka ACL
  • redis
  • redis Ziplist
  • redis Hashtable
  • redis LinkedList
  • redis QuickList
  • redis intset
  • redis String
  • redis SDS
  • redis SkipList
  • redisDb
  • redisServer
  • redis 简介
  • Redis Cluster
  • 主从同步
  • RocketMQ高可用HA
  • 事务消息
  • 内存映射
  • MMAP
  • 同步刷盘
  • 异步刷盘
  • 消息存储文件
  • RocketMQ安装
  • 延迟消息
  • RocketMQ入门
  • 推拉模式
  • PushConsumer
  • 消费结果处理
  • rebalance
  • RocketMQ权限控制
  • RocketMQ ACL
  • 消息过滤
  • 消息重试
  • 消费位置
  • 集群消费
  • 广播消费
  • 运维命令
  • shiro源码分析
  • shiro入门
  • IOC和DI
  • Spring创建Bean
  • Bean生命周期
  • Sping属性注入
  • 异常
  • SpringMVC
  • springCloud
  • Eureka

    缺失模块。
    1、请确保node版本大于6.2
    2、在博客根目录(注意不是yilia根目录)执行以下命令:
    npm i hexo-generator-json-content --save

    3、在根目录_config.yml里添加配置:

      jsonContent:
        meta: false
        pages: false
        posts:
          title: true
          date: true
          path: true
          text: false
          raw: false
          content: false
          slug: false
          updated: false
          comments: false
          link: false
          permalink: false
          excerpt: false
          categories: false
          tags: true
    

  • 我的OSCHINA
  • 我的CSDN
  • 我的GITHUB
  • 一生太水