IT小栈

  • 主页
  • Java基础
  • RocketMQ
  • Kafka
  • Redis
  • Shiro
  • Spring
  • Spring Boot
  • Spring Cloud
  • 资料链接
  • 关于
所有文章 友链

IT小栈

  • 主页
  • Java基础
  • RocketMQ
  • Kafka
  • Redis
  • Shiro
  • Spring
  • Spring Boot
  • Spring Cloud
  • 资料链接
  • 关于

RocketMQ内存映射

2019-07-19

前面几节我们讲解的都是RocketMQ应用层面也有一些关于源码的分析讲解,消息的存储只是简单的分析没有深入的探讨其中的内部机制,我们都知道RocketMQ是基于文件存储的消息模型,文件存储给我们的直觉感受就是比较慢,为什么RocketMQ在消息的生产及消费都在毫秒级,真如我们所想象的文件存储就很慢吗?本节分析下消息的内部内存映射机制。

1、概述

我们首先看一下消息的发送流程图

前面我们在RocketMQ事务消息章节中消息发送中各个类的业务流程,我们只是分析到CommitLog.putMessage()方法,下面我们接着探讨。上图中就是该消息方法的流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
//设置消息存储到文件中的时间
msg.setStoreTimestamp(System.currentTimeMillis());
//设置消息的校验码CRC
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

String topic = msg.getTopic();
int queueId = msg.getQueueId();

final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery消息的延迟级别是否大于0
if (msg.getDelayTimeLevel() > 0) {
//如果消息的延迟级别大于最大的延迟级别则置为最大延迟级别
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
//将消息主题设置为SCHEDULE_TOPIC_XXXX
topic = ScheduleMessageService.SCHEDULE_TOPIC;
//将消息队列设置为延迟的消息队列的ID
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
//消息的原有的主题和消息队列存入属性中
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}

long eclipseTimeInLock = 0;
MappedFile unlockMappedFile = null;
//获取最后一个消息的映射文件,mappedFileQueue可看作是CommitLog文件夹下的一个个文件的映射
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

//写入消息之前先申请putMessageLock,也就是保证消息写入CommitLog文件中串行的
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;

//设置消息的存储时间
msg.setStoreTimestamp(beginLockTimestamp);
//mappedFile==null标识CommitLog文件还未创建,第一次存消息则创建CommitLog文件
//mappedFile.isFull()表示mappedFile文件已满,需要重新创建CommitLog文件
if (null == mappedFile || mappedFile.isFull()) {
//里面的参数0代表偏移量
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
//mappedFile==null说明创建CommitLog文件失败抛出异常,创建失败可能是磁盘空间不足或者权限不够
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
//mappedFile文件后面追加消息
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}

eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
//释放锁
putMessageLock.unlock();
}

if (eclipseTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
}

if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}

PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
//消息刷盘
handleDiskFlush(result, putMessageResult, msg);
//主从数据同步复制
handleHA(result, putMessageResult, msg);
return putMessageResult;
}

本章节我们重点分析三点,关于刷盘机制后面章节会介绍

  1. 获取映射文件MappedFile
  2. 创建映射文件MappedFile
  3. 映射文件中写入消息

2、获取映射文件MappedFile

2.1、MappedFile和Commitlog的关系

我们知道消息的存储文件时Commitlog文件中,那与内存映射对象MappedFile又有什么联系呢?

从图中我们可以清晰的知道他们之间的关系,每个MappedFile对象对于一个Commitlog文件,我们分析下这个对应关系的业务操作发生在什么时候,我们分析下源码

Broker服务启动时会创建BrokerController对象并对其初始化initialize()该方法调用DefaultMessageStore.load()方法加载Commitlog文件和消费队列文件

1
2
3
4
5
6
7
8
public boolean load() {
//省略代码...
// 加载Commitlog文件
result = result && this.commitLog.load();
// 加载消费队列文件
result = result && this.loadConsumeQueue();
//省略代码...
}

我们分析下commitLog.load()调用mappedFileQueue.load()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public boolean load() {
//消息存储路径
File dir = new File(this.storePath);
File[] files = dir.listFiles();
if (files != null) {
// 升序
Arrays.sort(files);
for (File file : files) {
if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
+ " length not matched message store config value, ignore it");
return true;
}
try {
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
//当前文件的写指针
mappedFile.setWrotePosition(this.mappedFileSize);
//刷写到磁盘指针,该指针之前的数据持久化到磁盘中
mappedFile.setFlushedPosition(this.mappedFileSize);
//当前文件的提交指针
mappedFile.setCommittedPosition(this.mappedFileSize);
//添加到MappedFile文件集合中
this.mappedFiles.add(mappedFile);
log.info("load " + file.getPath() + " OK");
} catch (IOException e) {
log.error("load file " + file + " error", e);
return false;
}
}
}
return true;
}

很明显此方法就是MappedFile对象和一个Commitlog文件建立的逻辑关系

循环消息存储路径文件夹中的Commitlog文件,升序排列,创建MappedFile对象设置基础参数数据,添加到MappedFile文件集合中,我们查看new MappedFile(),调用MappedFile.init()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void init(final String fileName, final int fileSize) throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
this.file = new File(fileName);
//初始化的初始偏移量是文件名称
this.fileFromOffset = Long.parseLong(this.file.getName());
boolean ok = false;
ensureDirOK(this.file.getParent());
try {
//创建读写文件通道NIO
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
//将文件映射到内存
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {
log.error("create file channel " + this.fileName + " Failed. ", e);
throw e;
} catch (IOException e) {
log.error("map file " + this.fileName + " Failed. ", e);
throw e;
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}

将文件映射到内存。

上面我们分析了mappedFile和commitlog的逻辑建立关系,将mappedFile加入mappedFileQueue中,并讲解了MappedFile初始化的过程。

2.2、获取mappedFileQueue中最后一个mappedFile

上面我们了解到commitlog和mappedFile一一对应的关系,我们需要存储消息就需要找到最后一个未存满消息的commitlog文件,即查找的是最后一个mappedFiled对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public MappedFile getLastMappedFile() {
MappedFile mappedFileLast = null;

while (!this.mappedFiles.isEmpty()) {
try {
mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
break;
} catch (IndexOutOfBoundsException e) {
//continue;
} catch (Exception e) {
log.error("getLastMappedFile has exception.", e);
break;
}
}

return mappedFileLast;
}

该方法比较简单就是从mappedFiles集合中获取最后一个MappedFile对象,2.1中我们分析了其初始化的过程将MappedFile对象放入mappedFiles集合中。

3、创建映射文件MappedFile

当获取的MappedFile对象不存在或者消息已经存满我们需要创建,this.mappedFileQueue.getLastMappedFile(0)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
//创建映射问价的起始偏移量
long createOffset = -1;
//获取最后一个映射文件,如果为null或者写满则会执行创建逻辑
MappedFile mappedFileLast = getLastMappedFile();
//最后一个映射文件为null,创建一个新的映射文件
if (mappedFileLast == null) {
//计算将要创建的映射文件的起始偏移量
//如果startOffset<=mappedFileSize则起始偏移量为0
//如果startOffset>mappedFileSize则起始偏移量为是mappedFileSize的倍数
createOffset = startOffset - (startOffset % this.mappedFileSize);
}
//映射文件满了,创建新的映射文件
if (mappedFileLast != null && mappedFileLast.isFull()) {
//创建的映射文件的偏移量等于最后一个映射文件的起始偏移量 + 映射文件的大小(commitlog文件大小)
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
}
//创建新的映射文件
if (createOffset != -1 && needCreate) {
//构造commitlog名称
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
//优先通过allocateMappedFileService中方式构建映射文件,预分配方式,性能高
//如果上述方式失败则通过new创建映射文件
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
if (mappedFile != null) {
if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true);
}
this.mappedFiles.add(mappedFile);
}
return mappedFile;
}
return mappedFileLast;
}

allocateMappedFileService.putRequestAndReturnMappedFile()通过MappedFile服务类创建MappedFile

AllocateMappedFileService是创建MappedFile核心类,我们分析下该类

字段 类型 说明
waitTimeOut int 等待创建映射文件的超时时间,默认5秒
requestTable ConcurrentMap<String, AllocateRequest> 用来保存当前所有待处理的分配请求,其中KEY是filePath,VALUE是分配请求。如果分配请求被成功处理,即获取到映射文件则从请求会从requestTable中移除
requestQueue PriorityBlockingQueue<AllocateRequest> 分配请求队列,注意是优先级队列,从该队列中获取请求,进而根据请求创建映射文件
hasException boolean 标识是否发生异常
messageStore DefaultMessageStore

分析其核心方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
//默认提交两个请求
int canSubmitRequests = 2;
//当transientStorePoolEnable为true,刷盘方式是ASYNC_FLUSH,broker不是SLAVE,才启动TransientStorePool
if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
//启动快速失败策略时,计算TransientStorePool中剩余的buffer数量减去requestQueue中待分配的数量后,剩余的buffer数量
if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
&& BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
canSubmitRequests = this.messageStore.getTransientStorePool().remainBufferNumbs() - this.requestQueue.size();
}
}

AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
//判断requestTable中是否存在该路径的分配请求,如果存在则说明该请求已经在排队中
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;

//该路径没有在排队
if (nextPutOK) {
//如果剩余的buffer数量小于等于0则快速失败
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
this.requestTable.remove(nextFilePath);
return null;
}
//将指定的元素插入到此优先级队列中
boolean offerOK = this.requestQueue.offer(nextReq);
if (!offerOK) {
log.warn("never expected here, add a request to preallocate queue failed");
}
//剩余的buffer数量减1
canSubmitRequests--;
}

//创建第二个映射文件
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
if (nextNextPutOK) {
//检查buffer数量
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
this.requestTable.remove(nextNextFilePath);
} else {
//将指定的元素插入到此优先级队列中
boolean offerOK = this.requestQueue.offer(nextNextReq);
if (!offerOK) {
log.warn("never expected here, add a request to preallocate queue failed");
}
}
}
if (hasException) {
log.warn(this.getServiceName() + " service has exception. so return null");
return null;
}
AllocateRequest result = this.requestTable.get(nextFilePath);
try {
if (result != null) {
//等待
boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
if (!waitOK) {
log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
return null;
} else {
this.requestTable.remove(nextFilePath);
return result.getMappedFile();
}
} else {
log.error("find preallocate mmap failed, this never happen");
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
return null;
}

将创建请求插入到requestQueue和requestTable中,由于优先级队列中requestQueue存入的是AllocateRequest对象实现了compareTo方法,优先级的排序,由于创建MappedFile时传入的是预创建两个,我们需要创建最新的请求的结果,其他请求需要进行排队。

AllocateMappedFileService是个多线程类,内部实现了run()的核心方法mmapOperation()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
private boolean mmapOperation() {
boolean isSuccess = false;
AllocateRequest req = null;
try {
//检索并删除此队列的头,如有必要,等待元素可用
req = this.requestQueue.take();
//
AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
if (null == expectedRequest) {
log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
+ req.getFileSize());
return true;
}
if (expectedRequest != req) {
log.warn("never expected here, maybe cause timeout " + req.getFilePath() + " "
+ req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
return true;
}

if (req.getMappedFile() == null) {
long beginTime = System.currentTimeMillis();

MappedFile mappedFile;
//判断TransientStorePoolEnable是否启用
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
try {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {//默认方式创建
log.warn("Use default implementation.");
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
}
} else {
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}

long eclipseTime = UtilAll.computeEclipseTimeMilliseconds(beginTime);
if (eclipseTime > 10) {
int queueSize = this.requestQueue.size();
log.warn("create mappedFile spent time(ms) " + eclipseTime + " queue size " + queueSize
+ " " + req.getFilePath() + " " + req.getFileSize());
}

// pre write mappedFile
if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig().getMapedFileSizeCommitLog()
&& this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
//对MappedFile进行预热
mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
}

req.setMappedFile(mappedFile);
this.hasException = false;
isSuccess = true;
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
this.hasException = true;
return false;
} catch (IOException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.hasException = true;
if (null != req) {
requestQueue.offer(req);
try {
Thread.sleep(1);
} catch (InterruptedException ignored) {
}
}
} finally {
if (req != null && isSuccess)
req.getCountDownLatch().countDown();
}
return true;
}

关于MMAP详细分析请查看-》linux内存映射mmap原理分析一文

知识点补充:PageCache与Mmap内存映射

这里有必要先稍微简单地介绍下page cache的概念。系统的所有文件I/O请求,操作系统都是通过page cache机制实现的。对于操作系统来说,磁盘文件都是由一系列的数据块顺序组成,数据块的大小由操作系统本身而决定,x86的linux中一个标准页面大小是4KB。

操作系统内核在处理文件I/O请求时,首先到page cache中查找(page cache中的每一个数据块都设置了文件以及偏移量地址信息),如果未命中,则启动磁盘I/O,将磁盘文件中的数据块加载到page cache中的一个空闲块,然后再copy到用户缓冲区中。 page cache本身也会对数据文件进行预读取,对于每个文件的第一个读请求操作,系统在读入所请求页面的同时会读入紧随其后的少数几个页面。因此,想要提高page cache的命中率(尽量让访问的页在物理内存中),从硬件的角度来说肯定是物理内存越大越好。从操作系统层面来说,访问page cache时,即使只访问1k的消息,系统也会提前预读取更多的数据,在下次读取消息时, 就很可能可以命中内存。

在RocketMQ中,ConsumeQueue逻辑消费队列存储的数据较少,并且是顺序读取,在page cache机制的预读取作用下,ConsumeQueue的读性能会比较高近乎内存,即使在有消息堆积情况下也不会影响性能。而对于CommitLog消息存储的日志数据文件来说,读取消息内容时候会产生较多的随机访问读取,严重影响性能。如果选择合适的系统IO调度算法,比如设置调度算法为“Noop”(此时块存储采用SSD的话),随机读的性能也会有所提升。 另外,RocketMQ主要通过MappedByteBuffer对文件进行读写操作。其中,利用了NIO中的FileChannel模型直接将磁盘上的物理文件直接映射到用户态的内存地址中(这种Mmap的方式减少了传统IO将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(需要注意的是,采用MappedByteBuffer这种内存映射的方式有几个限制,其中之一是一次只能映射1.5~2G 的文件至用户态的虚拟内存,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了)

(1)Mmap内存映射技术的特点

Mmap内存映射和普通标准IO操作的本质区别在于它并不需要将文件中的数据先拷贝至OS的内核IO缓冲区,而是可以直接将用户进程私有地址空间中的一块区域与文件对象建立映射关系,这样程序就好像可以直接从内存中完成对文件读/写操作一样。只有当缺页中断发生时,直接将文件从磁盘拷贝至用户态的进程空间内,只进行了一次数据拷贝。对于容量较大的文件来说(文件大小一般需要限制在1.5~2G以下),采用Mmap的方式其读/写的效率和性能都非常高。

(2)JDK NIO的MappedByteBuffer简要分析

从JDK的源码来看,MappedByteBuffer继承自ByteBuffer,其内部维护了一个逻辑地址变量—address。在建立映射关系时,MappedByteBuffer利用了JDK NIO的FileChannel类提供的map()方法把文件对象映射到虚拟内存。仔细看源码中map()方法的实现,可以发现最终其通过调用native方法map0()完成文件对象的映射工作,同时使用Util.newMappedByteBuffer()方法初始化MappedByteBuffer实例,但最终返回的是DirectByteBuffer的实例。在Java程序中使用MappedByteBuffer的get()方法来获取内存数据是最终通过DirectByteBuffer.get()方法实现(底层通过unsafe.getByte()方法,以“地址 + 偏移量”的方式获取指定映射至内存中的数据)。

(3)使用Mmap的限制

a.Mmap映射的内存空间释放的问题;由于映射的内存空间本身就不属于JVM的堆内存区(Java Heap),因此其不受JVM GC的控制,卸载这部分内存空间需要通过系统调用 unmap()方法来实现。然而unmap()方法是FileChannelImpl类里实现的私有方法,无法直接显示调用。RocketMQ中的做法是,通过Java反射的方式调用“sun.misc”包下的Cleaner类的clean()方法来释放映射占用的内存空间;

b.MappedByteBuffer内存映射大小限制;因为其占用的是虚拟内存(非JVM的堆内存),大小不受JVM的-Xmx参数限制,但其大小也受到OS虚拟内存大小的限制。一般来说,一次只能映射1.5~2G 的文件至用户态的虚拟内存空间,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了;

c.使用MappedByteBuffe的其他问题;会存在内存占用率较高和文件关闭不确定性的问题;

我们发现有两种方式创建mappedFile对象

1、mappedFile = new MappedFile(req.getFilePath(), req.getFileSize())

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public MappedFile(final String fileName, final int fileSize) throws IOException {
init(fileName, fileSize);
}
private void init(final String fileName, final int fileSize) throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
this.file = new File(fileName);
//初始化的初始偏移量是文件名称
this.fileFromOffset = Long.parseLong(this.file.getName());
boolean ok = false;
ensureDirOK(this.file.getParent());
try {
//创建读写文件通道NIO
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
//将文件映射到内存
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {
log.error("create file channel " + this.fileName + " Failed. ", e);
throw e;
} catch (IOException e) {
log.error("map file " + this.fileName + " Failed. ", e);
throw e;
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}

2、mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool())

1
2
3
4
5
6
7
8
//transientStorePoolEnable 为 true
public void init(final String fileName, final int fileSize,
final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize);
//初始化MappedFile的writeBuffer
this.writeBuffer = transientStorePool.borrowBuffer();
this.transientStorePool = transientStorePool;
}

同样调用了init(fileName, fileSize)方法。

TransientStorePool与MappedFile在数据处理上的差异在什么地方呢?分析其代码,TransientStorePool会通过ByteBuffer.allocateDirect调用直接申请对外内存,消息数据在写入内存的时候是写入预申请的内存中。在异步刷盘的时候,再由刷盘线程将这些内存中的修改写入文件。

那么与直接使用MappedByteBuffer相比差别在什么地方呢?修改MappedByteBuffer实际会将数据写入文件对应的Page Cache中,而TransientStorePool方案下写入的则为纯粹的内存。因此在消息写入操作上会更快,因此能更少的占用CommitLog.putMessageLock锁,从而能够提升消息处理量。使用TransientStorePool方案的缺陷主要在于在异常崩溃的情况下回丢失更多的消息。

创建完mappedFile对象后,有个预热操作,每个字节填充(byte) 0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public void warmMappedFile(FlushDiskType type, int pages) {
long beginTime = System.currentTimeMillis();
//创建一个新的字节缓冲区,其内容是此缓冲区内容的共享子序列
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
//记录上一次刷盘的字节数
int flush = 0;
long time = System.currentTimeMillis();
for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
byteBuffer.put(i, (byte) 0);
// 刷盘方式是同步策略时,进行刷盘操作
// 每修改pages个分页刷一次盘,相当于4096*4k = 16M 每16M刷一次盘,1G文件 1024M/16M = 64次
if (type == FlushDiskType.SYNC_FLUSH) {
if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
flush = i;
mappedByteBuffer.force();
}
}

// 防止垃圾回收GC
if (j % 1000 == 0) {
log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
time = System.currentTimeMillis();
try {
Thread.sleep(0);
} catch (InterruptedException e) {
log.error("Interrupted", e);
}
}
}
// force flush when prepare load finished
if (type == FlushDiskType.SYNC_FLUSH) {
log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",
this.getFileName(), System.currentTimeMillis() - beginTime);
//刷盘,强制将此缓冲区内容的任何更改写入包含映射文件的存储设备
mappedByteBuffer.force();
}
log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),
System.currentTimeMillis() - beginTime);

this.mlock();
}

预热的目的:第一点,由于仅分配内存并进行mlock系统调用后并不会为程序完全锁定这些内存,因为其中的分页可能是写时复制的。因此,就有必要对每个内存页面中写入一个假的值。其中,RocketMQ是在创建并分配MappedFile的过程中,预先写入一些随机值至Mmap映射出的内存空间里。第二,调用Mmap进行内存映射后,OS只是建立虚拟内存地址至物理地址的映射表,而实际并没有加载任何文件至内存中。程序要访问数据时OS会检查该部分的分页是否已经在内存中,如果不在,则发出一次缺页中断。这里,可以想象下1G的CommitLog需要发生多少次缺页中断,才能使得对应的数据才能完全加载至物理内存中(ps:X86的Linux中一个标准页面大小是4KB)?RocketMQ的做法是,在做Mmap内存映射的同时进行madvise系统调用,目的是使OS做一次内存映射后对应的文件数据尽可能多的预加载至内存中,从而达到内存预热的效果。

mlock()操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void mlock() {
final long beginTime = System.currentTimeMillis();
final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
Pointer pointer = new Pointer(address);
{
int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
}

{
int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
}
}

mlock系统调用:其可以将进程使用的部分或者全部的地址空间锁定在物理内存中,防止其被交换到swap空间。对于RocketMQ这种的高吞吐量的分布式消息队列来说,追求的是消息读写低延迟,那么肯定希望尽可能地多使用物理内存,提高数据读写访问的操作效率。

4、映射文件中写入消息

MappedFile.appendMessage()的核心方法MappedFile.appendMessagesInner()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
assert messageExt != null;
assert cb != null;
//获取当前写的指针
int currentPos = this.wrotePosition.get();

if (currentPos < this.fileSize) {
//创建一个与MappedFile的共享内存区
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
//设置指针
byteBuffer.position(currentPos);
AppendMessageResult result = null;
if (messageExt instanceof MessageExtBrokerInner) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
this.wrotePosition.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
//当前写的指针大于文件的大小则抛出异常
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}

追加消息的核心方法Commotlog.doAppend()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBrokerInner msgInner) {
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>

// PHY OFFSET
//写入的位置
long wroteOffset = fileFromOffset + byteBuffer.position();

this.resetByteBuffer(hostHolder, 8);
//创建全局唯一的消息ID,消息ID有16字节,4个字节IP+4个字节的端口号+8字节的消息偏移量
String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);

// Record ConsumeQueue information
keyBuilder.setLength(0);
keyBuilder.append(msgInner.getTopic());
keyBuilder.append('-');
keyBuilder.append(msgInner.getQueueId());
String key = keyBuilder.toString();
//从CommitLog中保存了主题和队列的组合 待写入的偏移量
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
//可能是第一次还没有偏移量设置为0
if (null == queueOffset) {
queueOffset = 0L;
CommitLog.this.topicQueueTable.put(key, queueOffset);
}

// Transaction messages that require special handling
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
// Prepared and Rollback message is not consumed, will not enter the
// consumer queuec
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
queueOffset = 0L;
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
default:
break;
}

/**
* Serialize message
*/
final byte[] propertiesData =
msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);

final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;

if (propertiesLength > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long. length={}", propertiesData.length);
return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
}

final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;

final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
//计算:消息长度 = 消息体的长度+消息主题的长度 +消息属性的长度
final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);

// Exceeds the maximum message

if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ ", maxMessageSize: " + this.maxMessageSize);
return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}
//如果消息的长度+END_FILE_MIN_BLANK_LENGTH大于剩余的空闲长度
// Determines whether there is sufficient free space
//每一个CommitLog文件至少会空闲8个字节,前4位记录当前文件剩余空间,后四位存储魔数(CommitLog.MESSAGE_MAGIC_CODE)
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
// Here the length of the specially set maxBlank
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}

// Initialization of storage space
this.resetByteBuffer(msgStoreItemMemory, msgLen);
// 1 TOTALSIZE 该消息条目的总长度,4字节
this.msgStoreItemMemory.putInt(msgLen);
// 2 MAGICCODE 魔数 ,4字节
this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC 消息体crc校验码 4字节
this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
// 4 QUEUEID 消息消费队列的ID 4字节
this.msgStoreItemMemory.putInt(msgInner.getQueueId());
// 5 FLAG
this.msgStoreItemMemory.putInt(msgInner.getFlag());
// 6 QUEUEOFFSET 消息在消息消费队列的偏移量,8字节
this.msgStoreItemMemory.putLong(queueOffset);
// 7 PHYSICALOFFSET 消息在CommitLog文件中的偏移量 8字节
this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
// 8 SYSFLAG
this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP 消息生产者调用消息发送的API的时间戳 8字节
this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
// 10 BORNHOST 消息发送者的ip、端口号 8字节
this.resetByteBuffer(hostHolder, 8);
this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));
// 11 STORETIMESTAMP 消息存储时间戳,8字节
this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS broker服务器的IP+端口号 8字节
this.resetByteBuffer(hostHolder, 8);
this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));
//this.msgBatchMemory.put(msgInner.getStoreHostBytes());
// 13 RECONSUMETIMES 消息重试的次数,4字节
this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset 事务消息物理偏移量,8字节
this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
// 15 BODY 消息体内容,bodyLength的长度
this.msgStoreItemMemory.putInt(bodyLength);
if (bodyLength > 0)
this.msgStoreItemMemory.put(msgInner.getBody());
// 16 TOPIC 主题
this.msgStoreItemMemory.put((byte) topicLength);
this.msgStoreItemMemory.put(topicData);
// 17 PROPERTIES 消息属性
this.msgStoreItemMemory.putShort((short) propertiesLength);
if (propertiesLength > 0)
this.msgStoreItemMemory.put(propertiesData);

final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// Write messages to the queue buffer
//写到消息队列缓存中
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);

AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);

switch (tranType) {
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// The next update ConsumeQueue information
CommitLog.this.topicQueueTable.put(key, ++queueOffset);
break;
default:
break;
}
return result;
}

构建消息的基础参数,返回放入缓存的状态及写指针的位置。

本文作者: 顾 明 训
本文链接: https://www.itzones.cn/2019/07/19/RocketMQ内存映射/
版权声明: 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!
  • 内存映射
  • MMAP
  • rocketMQ

扫一扫,分享到微信

微信分享二维码
RocketMQ消息重试
RocketMQ权限控制
  1. 1. 1、概述
  2. 2. 2、获取映射文件MappedFile
    1. 2.1. 2.1、MappedFile和Commitlog的关系
    2. 2.2. 2.2、获取mappedFileQueue中最后一个mappedFile
  3. 3. 3、创建映射文件MappedFile
  4. 4. 4、映射文件中写入消息
© 2020 IT小栈
载入天数...载入时分秒... || 本站总访问量次 || 本站访客数人次
Hexo Theme Yilia by Litten
  • 所有文章
  • 友链

tag:

  • jvm
  • Java基础
  • kafka HW
  • kafka Leader Epoch
  • kafka
  • kafka位移主题
  • kafka位移提交
  • kafka副本机制
  • kafka ISR
  • zookeeper
  • kafka消息丢失
  • kafka日志存储
  • kafka Log Clean
  • kafka Log Compaction
  • kafka消费位移设置
  • kafka Rebalance
  • kafka分区算法
  • kafka生产者拦截器
  • kafka SASL/SCRAM
  • kafka ACL
  • redis
  • redis Ziplist
  • redis Hashtable
  • redis LinkedList
  • redis QuickList
  • redis intset
  • redis String
  • redis SDS
  • redis SkipList
  • redisDb
  • redisServer
  • redis 简介
  • Redis Cluster
  • 主从同步
  • RocketMQ高可用HA
  • 事务消息
  • 内存映射
  • MMAP
  • 同步刷盘
  • 异步刷盘
  • 消息存储文件
  • RocketMQ安装
  • 延迟消息
  • RocketMQ入门
  • 推拉模式
  • PushConsumer
  • 消费结果处理
  • rebalance
  • RocketMQ权限控制
  • RocketMQ ACL
  • 消息过滤
  • 消息重试
  • 消费位置
  • 集群消费
  • 广播消费
  • 运维命令
  • shiro源码分析
  • shiro入门
  • IOC和DI
  • Spring创建Bean
  • Bean生命周期
  • Sping属性注入
  • 异常
  • SpringMVC
  • springCloud
  • Eureka

    缺失模块。
    1、请确保node版本大于6.2
    2、在博客根目录(注意不是yilia根目录)执行以下命令:
    npm i hexo-generator-json-content --save

    3、在根目录_config.yml里添加配置:

      jsonContent:
        meta: false
        pages: false
        posts:
          title: true
          date: true
          path: true
          text: false
          raw: false
          content: false
          slug: false
          updated: false
          comments: false
          link: false
          permalink: false
          excerpt: false
          categories: false
          tags: true
    

  • 我的OSCHINA
  • 我的CSDN
  • 我的GITHUB
  • 一生太水