IT小栈

  • 主页
  • Java基础
  • RocketMQ
  • Kafka
  • Redis
  • Shiro
  • Spring
  • Spring Boot
  • Spring Cloud
  • 资料链接
  • 关于
所有文章 友链

IT小栈

  • 主页
  • Java基础
  • RocketMQ
  • Kafka
  • Redis
  • Shiro
  • Spring
  • Spring Boot
  • Spring Cloud
  • 资料链接
  • 关于

Kafka日志存储

2020-03-28

Kafka中的消息是以主题为基本单位进行归纳,各个主题逻辑上相互独立。每个主题又分为一个或多个分区,分区的数量在创建主题时指定后期也可以修改。每条消息会根据分区规则被追加到指定的分区中,分区中的每条消息都会分配一个唯一的序列号,就是所说的偏移量。

1、目录结构

每一个分区对应一个日志Log。为了防止Log过大Kafka又引入了日志分段LogSegment的概念,将Log进行切分为多个LogSegment,就是将一个大文件分割成多个小文件,这样便有消息的维护和清理。Log在物理上只以文件夹的形式存储,而每个LogSegment对应磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如事务的索引文件.txnindex结尾的文件)。

向Log中追加消息时是顺序写入,只有最后一个LogSegment才能执行写入操作,在此之前的所有的LogSegment都不能写入数据。我们将最后一个LogSegment成为“活跃的日志片段”,当消息不断加入,满足一定条件时,就会创建新的LogSegment,也就是“活跃的日志片段”,之后的消息会写入这个新创建的“活跃的日志片段”中。

我们发现每个日志片段都有两个索引文件,就是为了便于消息的检索,每一个LogSegment日志文件(.log结尾的文件)和两个索引文件,偏移量索引文件(.index结尾的文件)和时间戳索引文件(.timeindex结尾额的文件)。每一个LogSegment都有一个基准的偏移量表示当前文件第一条消息的偏移量offset。偏移量是一个64位的长整型数,日志文件和两个索引文件都是根据基准偏移量命名的,固定20位数字,没有达到的位数直接补0。第一个LogSegment的基准偏移量为0,对应的日志文件00000000000000000000.log。

我们看到主题为gumx分区编号是1的log文件夹下的日志情况。第二个LogSegment基准位移786851,也说明这个日志片段第一条消息的偏移量就是786851,同时反映了第一个LogSegment偏移量范围是0~786850的消息。

我们再看看Kafka的日志根目录下的文件情况。

Kafka第一次启动时就会在日志的根目录下生成5个文件,4个检查点文件(cleaner-offset-checkpoint、log-start-offset-checkpoint、recovery-point-offset-checkpoint、replication-offset-checkpoint)和meta.properties文件。

内部主题__consumer_offsets不是在Broker启动时创建,而是第一个消费者消费消息时会自动创建。

Kafka的Broker可以配置多个日志根目录,创建主题时会选择分区数最少的那个根目录来完成本次创建

2、日志格式

随着Kafka的迅猛发展,其消息格式也在不断的升级改进,中间经历过数个版本,本节将从kafka0.11.0版本开始所使用的消息格式讲起。

此版本参考了Protocol Buffer而引入了变长整型(Varints)和ZigZag编码。Varints是使用一个或多个字节来序列化整数的一种方法,数值越小,其所占用的字节数就越少。ZigZag编码以一种锯齿形(zig-zags)的方式来回穿梭于正负整数之间,以使得带符号整数映射为无符号整数,这样可以使得绝对值较小的负数仍然享有较小的Varints编码值,比如-1编码为1,1编码为2,-2编码为3。

详细可以参考:https://developers.google.com/protocol-buffers/docs/encoding。

消息集为Record Batch,其内部也包含了一条或者多条消息,消息的格式参见下图中部和右部。在消息压缩的情形下,Record Batch Header部分(参见下图左部,从first offset到records count字段)是不被压缩的,而被压缩的是records字段中的所有内容。生产者客户端中的ProducerBatch对应的就是这里的Record Batch,而producerRecord对应的就是这个Record。

先来讲述一下消息格式Record的关键字段,可以看到内部字段大量采用了Varints,这样Kafka可以根据具体的值来确定需要几个字节来保存

  1. length:消息总长度。
  2. attributes:弃用,但是还是在消息格式中占据1B的大小,以备未来的格式扩展。
  3. timestamp delta:时间戳增量。通常一个timestamp需要占用8个字节,如果像这里保存与RecordBatch的起始时间戳的差值的话可以进一步的节省占用的字节数。
  4. offset delta:位移增量。保存与RecordBatch起始位移的差值,可以节省占用的字节数。
  5. headers:这个字段用来支持应用级别的扩展,而不需要像v0和v1版本一样不得不将一些应用级别的属性值嵌入在消息体里面。Header的格式如上图最有,包含key和value,一个Record里面可以包含0至多个Header

讲述一下消息集(RecordBatch)的关键字段

  1. first offset:表示当前RecordBatch的起始位移。
  2. length:计算partition leader epoch到headers之间的长度。
  3. partition leader epoch:用来确保数据可靠性。
  4. magic:消息格式的版本号,对于当前版本而言,magic等于2。
  5. attributes:消息属性,注意这里占用了两个字节。低3位表示压缩格式,可以参考v0和v1;第4位表示时间戳类型;第5位表示此RecordBatch是否处于事务中,0表示非事务,1表示事务。第6位表示是否是Control消息,0表示非Control消息,而1表示是Control消息,Control消息用来支持事务功能。
  6. last offset delta:RecordBatch中最后一个Record的offset与first offset的差值。主要被broker用来确认RecordBatch中Records的组装正确性。
  7. first timestamp:RecordBatch中第一条Record的时间戳。
  8. max timestamp:RecordBatch中最大的时间戳,一般情况下是指最后一个Record的时间戳,和last offset delta的作用一样,用来确保消息组装的正确性。
  9. producer id:用来支持幂等性。
  10. producer epoch:和producer id一样,用来支持幂等性。
  11. first sequence:和producer id、producer epoch一样,用来支持幂等性。
  12. records count:RecordBatch中Record的个数

3、日志索引

每个日志分段文件对应两个索引文件,主要用来提高查找消息的效率。偏移量索引文件用来建立消息偏移量到物理地址的映射关系,快速定位物理文件的位置;时间戳索引文件根据指定的时间戳来查找对应的偏移量信息。

kafka中的索引文件以稀疏索引的方式构造消息的索引,并不保证每条消息在索引文件中都有都有对应的索引项。当写入一定量(由broker端参数log.index.interval.bytes指定,默认值4096即4k)的消息时偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项增大或者减少log.index.interval.bytes的值对应的可以减少或增加索引项的密度。

日志分段的一定条件满足其一即可

1、当前日志分段文件大小超过了Broker端设置的log.segment.bytes的值。默认是1073741824即1G。

2、当前日志分段中的消息的最大时间戳与当前的系统时间差值大于log.roll.ms或者log.roll.hours参数配置的值,如果两个都配置了,那么log.roll.ms优先级更高。默认情况是配置了log.roll.hours=168即7天。

3、偏移量索引文件或时间戳索引文件的大小达到broker端log.index.size.max.bytes的值。默认值为10485760,即10M。

4、追加消息的偏移量与当前日志分段的偏移量大于Integer.MAX_VALUE,即要追加的消息的偏移量不能转换为相对偏移量(offset-baseOffset>Integer.MAX_VALUE)

对于当前活跃的日志片段而言,其他对应的索引文件内容已经固定不需要写入索引项,被设置为只读,当前活跃的日志片段需要写入更多的索引相关,设置为可读写。kafka在创建索引文件的时候为其预分配log.index.size.max.bytes大小的空间,这一点与日志分段文件不同,只有当索引文件进行切分的时候,kafka才会将该索引文件裁剪到实际的数据大小。

3.1、偏移量索引

在偏移量索引文件中,每个索引项共占用 8 个字节,并分为两部分。相对偏移量和物理地址。

  1. relativeOffset:相对偏移量,表示消息相对与基准偏移量的偏移量,占 4 个字节
  2. position:物理地址,消息在日志分段文件中对应的物理位置,也占 4 个字节

日志切割第四个条件,追加消息的偏移量与当前日志分段的偏移量大于Integer.MAX_VALUE,因为relativeOffset4 个字节刚好对应 Integer.MAX_VALUE ,如果大于 Integer.MAX_VALUE ,则不能用 4 个字节进行表示了。

我们解析一下主题是gumx分区编号是1的偏移量查看内部实现细节进行分析

1
./kafka-dump-log.sh --files /usr/local/kafka/store/gumx-0/00000000000000000000.index

我们看到offset不是每个都有这就是我们提到的稀疏索引,不是每条消息都有索引项。

那么我们分析一下查找偏移量是15的消息该怎么查找?

kafka查询偏移量的时候使用二分查找法快速定位偏移量的位置,如果指定的偏移量不在索引文件中,则会返回小于指定偏移量的最大偏移量。那么对照图中那就是[13,19630],然后从日志分段文件中的物理位置是19630的开始顺序查找偏移量是15的消息。

上面我们分析的是最简单的情况,如果我们查找是日志中有多个日志分段,我们如何确定我们查找的消息在哪一个日志分段中。如下图所示我们查找偏移量是1438的消息。kafka的每个日志对象中使用了ConcurrentSkipListMap(跳跃表结构)来保存各个日志片段的baseOffset作为key,这样就可以根据指定的偏移量来查找消息所在的日志分段。查找到baseOffset是1346的,我们需要去索引文件中查找相对偏移量是(1438-1346=92),在对应的索引文件中查找偏移量是92的索引。

还有一点需要注意Kafka强制要求索引文件的大小必须是索引项的整数倍,我们上面分析了索引项是8个字节,则kafka索引文件大小也必须是8的整数倍,如果broker设置了log.index.size.max.bytes=68,那么kafka内部会将其转换为64,即不大于68的8的整数倍。

3.2、时间戳索引

时间戳的索引格式,每个索引项占用12个字节,分为两部分,时间戳和相对偏移量

  1. timestamp: 当前日志分段的最大时间戳
  2. relativeOffset:时间戳所对应的消息的相对偏移量

时间戳索引文件中包含若干个时间戳索引项,每个追加的时间戳索引项的timestamp必须大于之前的追加的索引项timestamp,否则并不追加。如果broker端的参数log.message.timestamp.type设置为LOgAppendTime,那么消息的时间戳必定是能够单调保持递增,相反如果是CreateTime类型则无法保证。生产者可以自己设置时间戳ProducerRecord(String topic,Integer partition,Long timestamp,K key,V value)的方法指定也无法保证消息的时间戳自增,因为我们无法保证生产者的服务器时间是时间同步的。容易造成时间戳乱序。

与偏移量索引文件相似,时间戳索引文件必须是索引项大小(12B)的整数倍即12的整数倍,同样的设置参数是Broker端的log.index.size.max.bytes如果配置是68则对应的索引文件kafka内部会进行转换60。

如果要查找指定时间戳的消息1585448614028开始的消息,我们分析下步骤

1、将指定时间戳和每一个日志分段的最大时间戳largestTimeStamp逐一比对,直到找到不小于指定时间戳的所对应的时间分段。那么时间戳索引文件中的largestTimeStamp如何获取,找到最后一条索引项,如果时间戳大于0则取其值,否则取该日志分段的最近修改时间。

2、找到相应的日志时间戳索引分段后,在时间戳索引文件使用二分查找查找到不大于指定时间戳的最大索引项,即[1585448613471,17],这样就找到了相对偏移量17。

3、在偏移量索引文件中使用二分算法查找到不大于17的最大索引项,即[16, 25672]。

4、根据从步骤一中选出的日志分段文件中查找25672的物理位置开始查找不小于指定时间戳的消息。

本文作者: 顾 明 训
本文链接: https://www.itzones.cn/2020/03/28/Kafka日志存储/
版权声明: 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!
  • kafka日志存储
  • kafka

扫一扫,分享到微信

微信分享二维码
Kafka日志清理
Kafka消费者组重平衡
  1. 1. 1、目录结构
  2. 2. 2、日志格式
  3. 3. 3、日志索引
    1. 3.1. 3.1、偏移量索引
    2. 3.2. 3.2、时间戳索引
© 2020 IT小栈
载入天数...载入时分秒... || 本站总访问量次 || 本站访客数人次
Hexo Theme Yilia by Litten
  • 所有文章
  • 友链

tag:

  • jvm
  • Java基础
  • kafka HW
  • kafka Leader Epoch
  • kafka
  • kafka位移主题
  • kafka位移提交
  • kafka副本机制
  • kafka ISR
  • zookeeper
  • kafka消息丢失
  • kafka日志存储
  • kafka Log Clean
  • kafka Log Compaction
  • kafka消费位移设置
  • kafka Rebalance
  • kafka分区算法
  • kafka生产者拦截器
  • kafka SASL/SCRAM
  • kafka ACL
  • redis
  • redis Ziplist
  • redis Hashtable
  • redis LinkedList
  • redis QuickList
  • redis intset
  • redis String
  • redis SDS
  • redis SkipList
  • redisDb
  • redisServer
  • redis 简介
  • Redis Cluster
  • 主从同步
  • RocketMQ高可用HA
  • 事务消息
  • 内存映射
  • MMAP
  • 同步刷盘
  • 异步刷盘
  • 消息存储文件
  • RocketMQ安装
  • 延迟消息
  • RocketMQ入门
  • 推拉模式
  • PushConsumer
  • 消费结果处理
  • rebalance
  • RocketMQ权限控制
  • RocketMQ ACL
  • 消息过滤
  • 消息重试
  • 消费位置
  • 集群消费
  • 广播消费
  • 运维命令
  • shiro源码分析
  • shiro入门
  • IOC和DI
  • Spring创建Bean
  • Bean生命周期
  • Sping属性注入
  • 异常
  • SpringMVC
  • springCloud
  • Eureka

    缺失模块。
    1、请确保node版本大于6.2
    2、在博客根目录(注意不是yilia根目录)执行以下命令:
    npm i hexo-generator-json-content --save

    3、在根目录_config.yml里添加配置:

      jsonContent:
        meta: false
        pages: false
        posts:
          title: true
          date: true
          path: true
          text: false
          raw: false
          content: false
          slug: false
          updated: false
          comments: false
          link: false
          permalink: false
          excerpt: false
          categories: false
          tags: true
    

  • 我的OSCHINA
  • 我的CSDN
  • 我的GITHUB
  • 一生太水