IT小栈

  • 主页
  • Java基础
  • RocketMQ
  • Kafka
  • Redis
  • Shiro
  • Spring
  • Spring Boot
  • Spring Cloud
  • 资料链接
  • 关于
所有文章 友链

IT小栈

  • 主页
  • Java基础
  • RocketMQ
  • Kafka
  • Redis
  • Shiro
  • Spring
  • Spring Boot
  • Spring Cloud
  • 资料链接
  • 关于

Kafka日志清理

2020-03-31

kafka的消息是持久化到磁盘中的,随着消息的不断写入磁盘空间也在不断增加,需要对消息做一定的清理操作。kafka提供了几种日志清理的操作主要分为两类日志删除和日志压缩。下面我们分别进行分析。

broker端参数 log.cleanup.policy来设置日志清理策略,默认是delete,即采用日志删除的清理策略。还有一种就是日志压缩 log.cleanup.policy=compact,并且还需要将log.cleaner.enable=true。日志的清理粒度可以控制在主题级别,主题的参数是cleanup.policy默认是delete。主题的日志清除策略优先级更高。

1、日志删除

kafka的日志管理器有一个日志清理的删除任务周期性的检测和删除不符合保留条件的日志分段文件,这个周期是Broker端的参数log.retention.check.interval.ms进行配置的,默认是300000,即5分钟。日志分段的保留策略有三种基于时间、基于日志大小、基于日志起始偏移量。

log.cleanup.policy=delete(Kafka中所有用户创建的topics,默认均为此策略)

日志分段的保留策略有三种如果都配置了则同时生效,任意一个生效都执行清除任务

1.1、基于时间

基于时间的删除策略就是日志删除任务会定期检查当前的日志文件中是否有超过设置的保留时间的日志分段文件。broker端的日志保留时间参数有三个分别是log.retention.ms、log.retention.minutes、log.retention.hours优先级依次递减其中log.retention.ms的优先级最高。默认配置的是log.retention.hours=168,即日志只保留最近7天的数据。

执行步骤

步骤一、查找过期的日志片段不是简单的根据日志片段的最后一次修改时间(lastModifiedTime)进行判断,因为日志片段的修改时间可能被篡改。lastModifiedTime不能反映当前的日志片段最后持久化到磁盘的时间。首先查找该日志片段对应的时间戳索引文件,获取最后一条时间戳索引项,若大于0则取其值进行比较,否则取当前日志片段的修改时间lastModifiedTime。

步骤二、若待删除的日志片段集合等于目前所在日志分区的日志片段集,说明所有的日志片段都过期了,但是需要保留一个日志片段用于接收消息的写入,此时就会先创建一个新的日志片段和对应的两个索引文件,然后再执行删除操作。

步骤三、删除日志分段时首先会从Log对象中维护的ConcurrentSkipListMap跳跃表中移除待删除的日志片段,以保证没有现成对这些日志片段进行读取操作。然后将对应的日志片段文件后缀添加.deleted的后缀(也包括对应的索引文件)。最后交由以delete-file命名的延迟任务来删除以.deleted为后缀的文件,这个延迟时间可以通过file.delete.delay.ms参数来配置,默认值是60000,即1分钟。

1.2、基于日志大小

日志删除任务会检查当前日主的大小是否超过设定的值,来查找需要删除的日志分段文件集合。Broker端的参数配置log.retention.bytes来配置,默认是-1,即表示无穷大。注意log.retention.bytes设置的是Log中所有文件的总大小(后缀是.log日志文件的大小),不是单个日志片段的大小。

首先计算日志文件的总的大小和设置的保存的日志大小的差值,计算需要删除的日志总的大小,然后从日志文件的第一日志分段开始查找可删除的日志分段的文件集合,找到后和基于时间的清理一样进行相应的删除操作。

1.3、基于日志起始偏移量

一般情况下日志文件的起始偏移量logStartOffset等于第一个日志分段的baseOffset,但并不是绝对的,logStartOffset的值可以通过DeleteRecordsRequest请求、日志清理和截断等操作进行修改

日志起始偏移量的判断依据是某个日志分段的下一个日志分段的baseOffset是否小于等于logStartOffset,若是则删除此日志分段。

如果我们设置的日志文件的起始偏移量logStartOffset=30,那么我们从头查找则第一个片段是非删除需要根据第二个片段的11比较logStartOffset>11则第一个日志分段加入到deletableSegments中,依次比较就获得了图中所示的前三个片段加入deletableSegments即待删除的日志分段。

扫描完成后删除日志的操作和基于时间的删除操作一致。

2、日志压缩

Kafka中的日志压缩(Log Compaction)是指在默认的日志删除的规则之外提供的一种清理数据的方式。Log Compaction对于相同的Key的不同的value值只保留最后一个版本。

log.cleanup.policy=compact(topic __consumer_offsets 默认为此策略)

Log Compaction是基于key的压缩,使用这种策略要保证每条消息都要有key不能为NULL

我们在位移主题的章节中也强调过位移主题就是使用这个策略,原理如图所示:

Log Compaction执行前后,日志分段中的每条消息的偏移量不变。Log Compaction会生成新的日志片段文件,日志分段中的每条消息的物理位置会重新按照新文件来组织。Log Compaction执行后的偏移量不再是连续的。不过并不影响日志的查询。

kafka的日志存放目录下都有一个cleaner-offset-checkpoint的文件,这个文件就是清理检查点文件,用来记录每个主题每个分区中已经清理的偏移量。

logStartOffset到firstDirtyOffset标识已经清理过了clean部分,firstDirtyOffset(与cleaner checkpoint相等)表示dirty部分的起始偏移量,而firstUncleanableOffset为dirty部分的截止偏移量,整个dirty部分的偏移量范围为[firstDirtyOffset, firstUncleanableOffset),注意这里是左闭右开区间。为了避免当前活跃的日志分段activeSegment成为热点文件,activeSegment不会参与Log Compaction的操作。同时Kafka支持通过参数log.cleaner.min.compaction.lag.ms(默认值为0)来配置消息在被清理前的最小保留时间,默认情况下firstUncleanableOffset等于activeSegment的baseOffset。

每个broker会启动log.cleaner.thread(默认值为1)个日志清理线程负责执行清理任务,这些线程会选择“污浊率”最高的日志文件进行清理(判断哪一个主题对应的分区下的日志分段文件需要清理)。用cleanBytes表示clean部分的日志占用大小,dirtyBytes表示dirty部分的日志占用大小,那么这个日志的污浊率(dirtyRatio)为:dirtyRatio = dirtyBytes / (cleanBytes + dirtyBytes),即待清理的日志片段的大小/(已经清理的日志片段文件大小+待清理的日志片段的大小)为了防止日志不必要的频繁清理操作,Kafka还使用了参数log.cleaner.min.cleanable.ratio(默认值为0.5)来限定可进行清理操作的最小污浊率。

Kafka中的每个日志清理线程会使用一个名为“SkimpyOffsetMap”的对象来构建key与offset的映射关系的哈希表。日志清理需要遍历两次日志文件,第一次遍历把每个key的哈希值和最后出现的offset都保存在SkimpyOffsetMap中,映射模型如下图所示。第二次遍历检查每个消息是否符合保留条件,如果符合就保留下来,否则就会被清理掉。假设一条消息的offset为offset1,这条消息的key在SkimpyOffsetMap中所对应的offset为offset2,如果offset1>=offset2即为满足保留条件。

默认情况下SkimpyOffsetMap使用MD5来计算key的哈希值,占用空间大小为16B,根据这个哈希值来从SkimpyOffsetMap中找到对应的槽位,如果发生冲突则用线性探测法处理。为了防止哈希冲突过于频繁,我们也可以通过broker端参数log.cleaner.io.buffer.load.factor(默认值为0.9)来调整负载因子。偏移量占用空间大小为8B,故一个映射项占用大小为24B。每个日志清理线程的SkimpyOffsetMap的内存占用大小为log.cleaner.dedupe.buffer.size / log.cleaner.thread,默认值为 = 128MB/1 = 128MB。所以默认情况下SkimpyOffsetMap可以保存128MB * 0.9 /24B ≈ 5033164个key的记录。假设每条消息的大小为1KB,那么这个SkimpyOffsetMap可以用来映射4.8GB的日志文件,而如果有重复的key,那么这个数值还会增大,整体上来说SkimpyOffsetMap极大的节省了内存空间且非常高效。

虽然使用了MD5来计算key的哈希值,还是可能遇到两个不同的KEY的hash值可能相同,那么可能出现其中一个KEY对应的消息丢失。

Log Compaction会保留key的最新value,那么相同key的其他消息如何清除呢?Kafka中提供了一个墓碑消息(tombstone)的概念,如果一条消息的key不为null,但是其value为null,那么此消息就是墓碑消息。日志清理线程发现墓碑消息时会先进行常规的清理,并保留墓碑消息一段时间。

Log Compaction执行过后的日志分段的大小会比原先的日志分段的要小,为了防止出现太多的小文件,Kafka在实际清理过程中并不对单个的日志分段进行单独清理,而是会将日志文件中offset从0至firstUncleanableOffset的所有日志分段进行分组,每个日志分段只属于一组,分组策略为:按照日志分段的顺序遍历,每组中日志分段的占用空间大小之和不超过segmentSize(可以通过broker端参数log.segments.bytes设置,默认值为1GB),且对应的索引文件占用大小之和不超过maxIndexSize(可以通过broker端参数log.index.interval.bytes设置,默认值为10MB)。同一个组的多个日志分段清理过后,只会生成一个新的日志分段。

第一次清理后日志分段文件大小分别是0.4、0.4、0.3、0.7、0.3。第二次Log Compaction合并日志分段文件[0.4,0.4]、[0.3,0.7]、[0.3]、[1]这四个组都没有超过1G。第三次Log Compaction过程中会将对每个日志分组中需要保留的消息拷贝到一个以“.clean”为后缀的临时文件中,此临时文件以当前日志分组中第一个日志分段的文件名命名,例如:00000000000000000000.log.clean。Log Compaction过后将“.clean”的文件修改为以“.swap”后缀的文件,例如:00000000000000000000.log.swap,然后删除掉原本的日志文件,最后才把文件的“.swap”后缀去掉,整个过程中的索引文件的变换也是如此,至此一个完整Log Compaction操作才算完成。

本文作者: 顾 明 训
本文链接: https://www.itzones.cn/2020/03/31/Kafka日志清理/
版权声明: 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!
  • kafka Log Clean
  • kafka Log Compaction
  • kafka

扫一扫,分享到微信

微信分享二维码
Kafka消费组位移重设
Kafka日志存储
  1. 1. 1、日志删除
    1. 1.1. 1.1、基于时间
    2. 1.2. 1.2、基于日志大小
    3. 1.3. 1.3、基于日志起始偏移量
  2. 2. 2、日志压缩
© 2020 IT小栈
载入天数...载入时分秒... || 本站总访问量次 || 本站访客数人次
Hexo Theme Yilia by Litten
  • 所有文章
  • 友链

tag:

  • jvm
  • Java基础
  • kafka HW
  • kafka Leader Epoch
  • kafka
  • kafka位移主题
  • kafka位移提交
  • kafka副本机制
  • kafka ISR
  • zookeeper
  • kafka消息丢失
  • kafka日志存储
  • kafka Log Clean
  • kafka Log Compaction
  • kafka消费位移设置
  • kafka Rebalance
  • kafka分区算法
  • kafka生产者拦截器
  • kafka SASL/SCRAM
  • kafka ACL
  • redis
  • redis Ziplist
  • redis Hashtable
  • redis LinkedList
  • redis QuickList
  • redis intset
  • redis String
  • redis SDS
  • redis SkipList
  • redisDb
  • redisServer
  • redis 简介
  • Redis Cluster
  • 主从同步
  • RocketMQ高可用HA
  • 事务消息
  • 内存映射
  • MMAP
  • 同步刷盘
  • 异步刷盘
  • 消息存储文件
  • RocketMQ安装
  • 延迟消息
  • RocketMQ入门
  • 推拉模式
  • PushConsumer
  • 消费结果处理
  • rebalance
  • RocketMQ权限控制
  • RocketMQ ACL
  • 消息过滤
  • 消息重试
  • 消费位置
  • 集群消费
  • 广播消费
  • 运维命令
  • shiro源码分析
  • shiro入门
  • IOC和DI
  • Spring创建Bean
  • Bean生命周期
  • Sping属性注入
  • 异常
  • SpringMVC
  • springCloud
  • Eureka

    缺失模块。
    1、请确保node版本大于6.2
    2、在博客根目录(注意不是yilia根目录)执行以下命令:
    npm i hexo-generator-json-content --save

    3、在根目录_config.yml里添加配置:

      jsonContent:
        meta: false
        pages: false
        posts:
          title: true
          date: true
          path: true
          text: false
          raw: false
          content: false
          slug: false
          updated: false
          comments: false
          link: false
          permalink: false
          excerpt: false
          categories: false
          tags: true
    

  • 我的OSCHINA
  • 我的CSDN
  • 我的GITHUB
  • 一生太水