IT小栈

  • 主页
  • Java基础
  • RocketMQ
  • Kafka
  • Redis
  • Shiro
  • Spring
  • Spring Boot
  • Spring Cloud
  • 资料链接
  • 关于
所有文章 友链

IT小栈

  • 主页
  • Java基础
  • RocketMQ
  • Kafka
  • Redis
  • Shiro
  • Spring
  • Spring Boot
  • Spring Cloud
  • 资料链接
  • 关于

RocketMQ刷盘策略

2019-07-24

我们都知道RocketMQ的消息是持久化到文件的,具体的消息的刷盘策略是什么,是发送一条消息就直接持久化到文件中吗?作为一款高性能的消息中间件这样做肯定不行,至少性能上不允许这样操作,那么具体策略是啥我们具体分析下。

1、刷盘策略

RocketMQ提供了两种刷盘策略同步刷盘、异步刷盘

同步刷盘:在消息到达MQ后,RocketMQ需要将数据持久化,同步刷盘是指数据到达内存之后,必须刷到commitlog日志之后才算成功,然后返回producer数据已经发送成功。

异步刷盘:,同步刷盘是指数据到达内存之后,返回producer说数据已经发送成功。,然后再写入commitlog日志。

复制方式 优点 缺点 适应场景
同步刷盘 保证了消息不丢失 吞吐率相对于异步刷盘要低 消息可靠性要求较高的场景
异步刷盘 系统的吞吐量提高 系统断电等异常时会有部分丢失 对应吞吐量要求较高的场景

下面我们从源码的角度分析其实现的逻辑

2、同步刷盘

CommitLog.putMessage()方法中的刷盘的核心方法handleDiskFlush()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// Synchronization flush 同步刷盘
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
//客户端确认要等待刷盘成功
if (messageExt.isWaitStoreMsgOK()) {
//封装刷盘请求对象 nextoffset : 当前内存写的位置 + 本次要写入的字节数
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
//添加刷盘请求(后台定时任务进行刷盘,每隔10毫秒批量刷盘。10毫秒中如果有多个请求,则多个请求一块刷盘)
service.putRequest(request);
//等待刷盘请求结果(最长等待5秒钟,刷盘成功后马上可以获取结果。)
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
service.wakeup();
}
}else {// Asynchronous flush 异步刷盘
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
//唤醒FlushRealTimeService服务线程
flushCommitLogService.wakeup();
} else {
//唤醒CommitRealTimeService服务线程
commitLogService.wakeup();
}
}
}

查看同步刷盘的核心类GroupCommitService中的核心属性

private volatile List requestsWrite = new ArrayList();
private volatile List requestsRead = new ArrayList();
requestsWrite : 写队列,主要用于向该线程添加刷盘任务
requestsRead : 读队列,主要用于执行特定的刷盘任务,这是是GroupCommitService 设计的一个亮点,把读写分离,每处理完requestsRead中的任务,就交换这两个队列。

我们查看其run()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
//等待通知,如果数据过来,提前结束等待执行onWaitEnd()方法交换读写swapRequests()
//刷盘请求的requestsWrite->requestsRead
this.waitForRunning(10);
//执行刷盘
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
//省略代码...
}

waitForRunning方法中执行了swapRequests()方法

1
2
3
4
5
private void swapRequests() {
List<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}

GroupCommitService接收到的刷盘请求通过putRequest()方法加入到requestsWrite集合中,swapRequests()方法将requestsWrite请求集合交换到requestsRead集合中供刷盘使用,我们重点查看doCommit()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void doCommit() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
//循环每一个刷盘请求
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of
// two times the flush
boolean flushOK = false;
for (int i = 0; i < 2 && !flushOK; i++) {
//判断是否已经刷盘过了,刷盘的位置和当前消息下次刷盘需要的位置比较
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
if (!flushOK) {
//0代码立刻刷盘,不管缓存中消息有多少
CommitLog.this.mappedFileQueue.flush(0);
}
}
//返回刷盘的结果
req.wakeupCustomer(flushOK);
}
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
//设置刷盘的时间点
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
//清空requestsRead对象
this.requestsRead.clear();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
CommitLog.this.mappedFileQueue.flush(0);
}
}
}

mappedFileQueue.flush(0)立刻刷盘

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public boolean flush(final int flushLeastPages) {
boolean result = true;
MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
if (mappedFile != null) {
long tmpTimeStamp = mappedFile.getStoreTimestamp();
//刷盘,返回刷写到磁盘指针
int offset = mappedFile.flush(flushLeastPages);
//计算当前的刷盘指针,之前的所有数据已经持久化到磁盘中
long where = mappedFile.getFileFromOffset() + offset;
result = where == this.flushedWhere;
this.flushedWhere = where;
if (0 == flushLeastPages) {
this.storeTimestamp = tmpTimeStamp;
}
}
return result;
}

mappedFile.flush(0);保证立刻刷盘后面异步刷盘时也会调用mappedFile.flush()方法

3、异步刷盘

1
2
3
4
5
6
7
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
//唤醒FlushRealTimeService服务线程
flushCommitLogService.wakeup();
} else {
//唤醒CommitRealTimeService服务线程
commitLogService.wakeup();
}

我们发现异步刷盘的时候有两种方式,一种是堆外内存池开启时启动CommitRealTimeService服务线程,另一个是默认执行的FlushRealTimeService服务线程进行刷盘操作,关于TransientStorePoolEnable在《RocketMQ内存映射》章节中的“创建映射文件MappedFile”中有介绍

​ 图3-1

1、FlushRealTimeService

查看其run()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
// 每次刷盘的间隔时间,默认 200ms
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
// 每次commit最少的页数 默认4页
int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
// 如果上次刷新的时间+该值 小于当前时间,则改变flushPhysicQueueLeastPages =0 默认为200
int commitDataThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();

long begin = System.currentTimeMillis();
//距离上一次刷盘时间超过200ms则立刻刷盘,commit最少的页数置为0
if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
this.lastCommitTimestamp = begin;
commitDataLeastPages = 0;
}
try {
//刷盘
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
long end = System.currentTimeMillis();
if (!result) {
this.lastCommitTimestamp = end; // result = false means some data committed.
//now wake up flush thread.
flushCommitLogService.wakeup();
}

if (end - begin > 500) {
log.info("Commit data to file costs {} ms", end - begin);
}
this.waitForRunning(interval);
} catch (Throwable e) {
CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
}
}

boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.commit(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
CommitLog.log.info(this.getServiceName() + " service end");
}
}

这种方式和同步刷盘一样就是mappedFileQueue.commit(commitDataLeastPages)参数有限制,数据达到一定量的时候才进行刷盘操作提高数据的刷盘性能。

2、CommitRealTimeService

查看其run()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
// 每次刷盘的间隔时间,默认 200ms
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
// 每次commit最少的页数 默认4页
int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
// 如果上次刷新的时间+该值 小于当前时间,则改变flushPhysicQueueLeastPages =0 默认为200
int commitDataThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();

long begin = System.currentTimeMillis();
//距离上一次刷盘时间超过200ms则立刻刷盘,commit最少的页数置为0
if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
this.lastCommitTimestamp = begin;
commitDataLeastPages = 0;
}
try {
//刷盘
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
long end = System.currentTimeMillis();
//返回的是false说明数据已经commit到了fileChannel中
if (!result) {
this.lastCommitTimestamp = end; // result = false means some data committed.
//now wake up flush thread.
flushCommitLogService.wakeup();
}
if (end - begin > 500) {
log.info("Commit data to file costs {} ms", end - begin);
}
this.waitForRunning(interval);
} catch (Throwable e) {
CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
}
}
boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.commit(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
CommitLog.log.info(this.getServiceName() + " service end");
}
}

我们发现其刷盘方法不一样mappedFileQueue.commit()调用MappedFile.commit()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public int commit(final int commitLeastPages) {
if (writeBuffer == null) {
//no need to commit data to file channel, so just regard wrotePosition as committedPosition.
return this.wrotePosition.get();
}
//如果提交的数据不满commitLeastPages则不执行本次的提交,待下一次提交
if (this.isAbleToCommit(commitLeastPages)) {
if (this.hold()) {
commit0(commitLeastPages);
this.release();
} else {
log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
}
}

// All dirty data has been committed to FileChannel.
if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
this.transientStorePool.returnBuffer(writeBuffer);
this.writeBuffer = null;
}

return this.committedPosition.get();
}

查看其核心刷盘方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
protected void commit0(final int commitLeastPages) {
int writePos = this.wrotePosition.get();
int lastCommittedPosition = this.committedPosition.get();
if (writePos - this.committedPosition.get() > 0) {
try {
//创建writeBuffer的共享缓存区
ByteBuffer byteBuffer = writeBuffer.slice();
//将指针回退到上一次提交的位置
byteBuffer.position(lastCommittedPosition);
//设置limit为writePos
byteBuffer.limit(writePos);
this.fileChannel.position(lastCommittedPosition);
//将committedPosition指针到wrotePosition的数据复制(写入)到fileChannel中
this.fileChannel.write(byteBuffer);
//更新committedPosition指针为writePos
this.committedPosition.set(writePos);
} catch (Throwable e) {
log.error("Error occurred when commit data to FileChannel.", e);
}
}
}

commit0()只是将缓存数据加入到fileChannel中,我们在CommitRealTimeService.run()方法中看到唤醒flushCommitLogService线程需要将fileChannel中的数据flush到磁盘中,我们发现两种方式都需要走flushCommitLogService.run()方法最后都执行MappedFile.flush(int)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public int flush(final int flushLeastPages) {
if (this.isAbleToFlush(flushLeastPages)) {
if (this.hold()) {
int value = getReadPosition();
try {
//We only append data to fileChannel or mappedByteBuffer, never both.
if (writeBuffer != null || this.fileChannel.position() != 0) {
this.fileChannel.force(false);
} else {
this.mappedByteBuffer.force();
}
} catch (Throwable e) {
log.error("Error occurred when force data to disk.", e);
}
//设置刷盘后的指针
this.flushedPosition.set(value);
this.release();
} else {
log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
this.flushedPosition.set(getReadPosition());
}
}
return this.getFlushedPosition();
}

两种缓存方式走的刷盘逻辑也不同,可以查看“图3-1”两种方式的处理流程图

我们还发现一个方法isAbleToFlush()判断是否需要刷盘

1
2
3
4
5
6
7
8
9
10
11
private boolean isAbleToFlush(final int flushLeastPages) {
int flush = this.flushedPosition.get();
int write = getReadPosition();
if (this.isFull()) {
return true;
}
if (flushLeastPages > 0) {
return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
}
return write > flush;
}

同步刷盘时flushLeastPages=0立刻刷盘

异步刷盘时flushLeastPages=4 ,默认是4,需要刷盘的数据达到PageCache的页数4倍时才会刷盘,或者距上一次刷盘时间>=200ms则设置flushLeastPages=0立刻刷盘

同步刷盘时无论消息的大小都立刻刷盘,线程阻塞等待刷盘结果

异步刷盘有两种方式但是其逻辑都是需要刷盘的数据OS_PAGE_SIZE的4倍即(1024 * 4)*4=16k或者距上一次刷盘时间>=200ms时才刷盘,提高数据的刷盘性能

本文作者: 顾 明 训
本文链接: https://www.itzones.cn/2019/07/24/RocketMQ刷盘策略/
版权声明: 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!
  • 同步刷盘
  • 异步刷盘
  • rocketMQ

扫一扫,分享到微信

微信分享二维码
RocketMQ运维监控
RocketMQ消息重试
  1. 1. 1、刷盘策略
  2. 2. 2、同步刷盘
  3. 3. 3、异步刷盘
    1. 3.1. 1、FlushRealTimeService
    2. 3.2. 2、CommitRealTimeService
© 2020 IT小栈
载入天数...载入时分秒... || 本站总访问量次 || 本站访客数人次
Hexo Theme Yilia by Litten
  • 所有文章
  • 友链

tag:

  • jvm
  • Java基础
  • kafka HW
  • kafka Leader Epoch
  • kafka
  • kafka位移主题
  • kafka位移提交
  • kafka副本机制
  • kafka ISR
  • zookeeper
  • kafka消息丢失
  • kafka日志存储
  • kafka Log Clean
  • kafka Log Compaction
  • kafka消费位移设置
  • kafka Rebalance
  • kafka分区算法
  • kafka生产者拦截器
  • kafka SASL/SCRAM
  • kafka ACL
  • redis
  • redis Ziplist
  • redis Hashtable
  • redis LinkedList
  • redis QuickList
  • redis intset
  • redis String
  • redis SDS
  • redis SkipList
  • redisDb
  • redisServer
  • redis 简介
  • Redis Cluster
  • 主从同步
  • RocketMQ高可用HA
  • 事务消息
  • 内存映射
  • MMAP
  • 同步刷盘
  • 异步刷盘
  • 消息存储文件
  • RocketMQ安装
  • 延迟消息
  • RocketMQ入门
  • 推拉模式
  • PushConsumer
  • 消费结果处理
  • rebalance
  • RocketMQ权限控制
  • RocketMQ ACL
  • 消息过滤
  • 消息重试
  • 消费位置
  • 集群消费
  • 广播消费
  • 运维命令
  • shiro源码分析
  • shiro入门
  • IOC和DI
  • Spring创建Bean
  • Bean生命周期
  • Sping属性注入
  • 异常
  • SpringMVC
  • springCloud
  • Eureka

    缺失模块。
    1、请确保node版本大于6.2
    2、在博客根目录(注意不是yilia根目录)执行以下命令:
    npm i hexo-generator-json-content --save

    3、在根目录_config.yml里添加配置:

      jsonContent:
        meta: false
        pages: false
        posts:
          title: true
          date: true
          path: true
          text: false
          raw: false
          content: false
          slug: false
          updated: false
          comments: false
          link: false
          permalink: false
          excerpt: false
          categories: false
          tags: true
    

  • 我的OSCHINA
  • 我的CSDN
  • 我的GITHUB
  • 一生太水