IT小栈

  • 主页
  • Java基础
  • RocketMQ
  • Kafka
  • Redis
  • Shiro
  • Spring
  • Spring Boot
  • Spring Cloud
  • 资料链接
  • 关于
所有文章 友链

IT小栈

  • 主页
  • Java基础
  • RocketMQ
  • Kafka
  • Redis
  • Shiro
  • Spring
  • Spring Boot
  • Spring Cloud
  • 资料链接
  • 关于

RocketMQ延迟消息

2019-07-07

延时消息即消息发送后并不立即对消费者可见,而是在用户指定的时间投递给消费者。比如我们现在发送一条延时30秒的消息,消息发送后立即发送给服务器,但是服务器在30秒后才将该消息交给消费者。

RocketMQ通过配置的延迟级别延迟消息投递到消费者,其中不同的延迟级别对应不同的延迟时间,可配置,默认的延迟级别有18种,分别是1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,支持时间单位 s 秒 m分钟 h小时 d天。

源码 MessageStoreConfig.java 是定义如下:

1
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

可以在brocker配置 messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,自定义其时间级别。

1、代码验证

前提:先启动消费者等待消息的发送,先发送消息,消费者启动需要时间,影响测试结果。

1.1、生产者Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class DelayProducer {

public static void main(String[] args) throws MQClientException, InterruptedException {

DefaultMQProducer producer = new DefaultMQProducer("producer_test");
producer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
producer.start();
SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");
for (int i = 0; i < 10; i++) {
try {
//构建消息
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("延迟消息" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//延时的级别为3 对应的时间为10s 就是发送后延时10S在把消息投递出去
msg.setDelayTimeLevel(3);
SendResult sendResult = producer.send(msg);

System.out.printf("%s%n", sd.format(new Date())+" == "+sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}

查看结果:

1.2、消费者Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class DelayConsumer {

public static void main(String[] args) {
try {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
consumer.setConsumerGroup("consumer_delay");
consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently(){

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList,
ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) {
try {
for(MessageExt msg : paramList){
String msgbody = new String(msg.getBody(), "utf-8");
SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");
System.out.println("接收时间 : "+ sd.format(new Date()) +" == MessageBody: "+ msgbody);//输出消息内容
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
}
});
consumer.start();
System.out.println("DelayConsumer===启动成功!");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

查看结果:

2、内部机制分析

查看其消息投递的核心方法org.apache.rocketmq.store.CommitLog.putMessage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
//设置消息存储到文件中的时间
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;

StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

String topic = msg.getTopic();
int queueId = msg.getQueueId();

final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery消息的延迟级别是否大于0
if (msg.getDelayTimeLevel() > 0) {
//如果消息的延迟级别大于最大的延迟级别则置为最大延迟级别
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
//将消息主题设置为SCHEDULE_TOPIC_XXXX
topic = ScheduleMessageService.SCHEDULE_TOPIC;
//将消息队列设置为延迟的消息队列的ID
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

// Backup real topic, queueId
//消息的原有的主题和消息队列存入属性中
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

msg.setTopic(topic);
msg.setQueueId(queueId);
}
}

long eclipseTimeInLock = 0;
MappedFile unlockMappedFile = null;
//获取最后一个消息的映射文件,mappedFileQueue可看作是CommitLog文件夹下的一个个文件的映射
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

//写入消息之前先申请putMessageLock,也就是保证消息写入CommitLog文件中串行的
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;

// Here settings are stored timestamp, in order to ensure an orderly
// global
//设置消息的存储时间
msg.setStoreTimestamp(beginLockTimestamp);
//mappedFile==null标识CommitLog文件还未创建,第一次存消息则创建CommitLog文件
//mappedFile.isFull()表示mappedFile文件已满,需要重新创建CommitLog文件
if (null == mappedFile || mappedFile.isFull()) {
//里面的参数0代表偏移量
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
//mappedFile==null说明创建CommitLog文件失败抛出异常,创建失败可能是磁盘空间不足或者权限不够
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
//mappedFile文件后面追加消息
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}

eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
//释放锁
putMessageLock.unlock();
}

if (eclipseTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
}

if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}

PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
//消息刷盘
handleDiskFlush(result, putMessageResult, msg);
//主从数据同步复制
handleHA(result, putMessageResult, msg);

return putMessageResult;
}

我们发现在通过putMessage 延迟消息就被放存放到了主题为 SCHEDULE_TOPIC_XXXX的commitlog中,消息的原有的主题和消息队列存入属性中,后面再通过定时的方式对这这些消息进行重新发送。

ScheduleMessageService.start()启动会为每一个延迟队列创建一个调度任务每一个调度任务对应SCHEDULE_TOPIC_XXXX主题下的一个消息消费队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void start() {

for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}

定时任务的实现类DeliverDelayedMessageTimerTask,核心方法是executeOnTimeup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
public void executeOnTimeup() {
//根据延迟级别获取该延迟队列信息
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));

long failScheduleOffset = offset;
//未找到说明目前没有该延迟级别的消息,忽略本次任务
if (cq != null) {
//根据offset获取队列中获取当前队列中有效的消息,
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();

//遍历ConsumeQueue,每一个ConsumeQueue条目是20个字节解析消息
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
//物理偏移量
long offsetPy = bufferCQ.getByteBuffer().getLong();
//消息长度
int sizePy = bufferCQ.getByteBuffer().getInt();
//消息的tag的Hash值
long tagsCode = bufferCQ.getByteBuffer().getLong();
//
if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
//can't find ext content.So re compute tags code.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
tagsCode, offsetPy, sizePy);
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
if (countdown <= 0) {
//根据物理偏移量和消息的大小从Commitlog文件中查找消息
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
if (msgExt != null) {
try {
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
//消息存储到Commitlog文件中,转发到主题对应的消息队列上,供消费者再次消费。
PutMessageResult putMessageResult =
ScheduleMessageService.this.defaultMessageStore
.putMessage(msgInner);

if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
} else {
// XXX: warn and notify me
log.error(
"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
msgExt.getTopic(), msgExt.getMsgId());
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
} catch (Exception e) {
log.error(
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
+ offsetPy + ",sizePy=" + sizePy, e);
}
}
} else {
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for

nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {

bufferCQ.release();
}
}else {
//未找到有效的消息,更新延迟队列定时拉取进度,并创建定时任务带下一次继续尝试
long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
+ cqMinOffset + ", queueId=" + cq.getQueueId());
}
}
}
//创建延迟任务
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}

图解:

1、消息生产者发送消息,如果发送的消息DelayTimeLevel大于0,则改变消息主题为SCHEDULE_TOPIC_XXXX,消息的队列为DelayTimeLevel-1

2、消息经由Commitlog转发到消息队列SCHEDULE_TOPIC_XXXX的消费队列1。

3、定时任务Timer每隔1秒根据上次拉取消息的偏移量从消费队列中取出所有消息。

4、根据消息的物理偏移量和消息大小从Commitlog中拉取消息。(PS:消息存储章节中会重点讲解)

5、根据消息的属性重新创建消息,并恢复原主题TopicTest、原消息队列ID,清除DelayTimeLevel属性存入Commitlog中。

6、记录原主题TopicTest的消息队列的消息偏移量,供消费者索引检索消息进行消费。

本文作者: 顾 明 训
本文链接: https://www.itzones.cn/2019/07/07/RocketMQ延迟消息/
版权声明: 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!
  • 延迟消息
  • rocketMQ

扫一扫,分享到微信

微信分享二维码
RocketMQ存储文件
RocketMQ推拉模式
  1. 1. 1、代码验证
    1. 1.1. 1.1、生产者Producer
    2. 1.2. 1.2、消费者Consumer
  2. 2. 2、内部机制分析
© 2020 IT小栈
载入天数...载入时分秒... || 本站总访问量次 || 本站访客数人次
Hexo Theme Yilia by Litten
  • 所有文章
  • 友链

tag:

  • jvm
  • Java基础
  • kafka HW
  • kafka Leader Epoch
  • kafka
  • kafka位移主题
  • kafka位移提交
  • kafka副本机制
  • kafka ISR
  • zookeeper
  • kafka消息丢失
  • kafka日志存储
  • kafka Log Clean
  • kafka Log Compaction
  • kafka消费位移设置
  • kafka Rebalance
  • kafka分区算法
  • kafka生产者拦截器
  • kafka SASL/SCRAM
  • kafka ACL
  • redis
  • redis Ziplist
  • redis Hashtable
  • redis LinkedList
  • redis QuickList
  • redis intset
  • redis String
  • redis SDS
  • redis SkipList
  • redisDb
  • redisServer
  • redis 简介
  • Redis Cluster
  • 主从同步
  • RocketMQ高可用HA
  • 事务消息
  • 内存映射
  • MMAP
  • 同步刷盘
  • 异步刷盘
  • 消息存储文件
  • RocketMQ安装
  • 延迟消息
  • RocketMQ入门
  • 推拉模式
  • PushConsumer
  • 消费结果处理
  • rebalance
  • RocketMQ权限控制
  • RocketMQ ACL
  • 消息过滤
  • 消息重试
  • 消费位置
  • 集群消费
  • 广播消费
  • 运维命令
  • shiro源码分析
  • shiro入门
  • IOC和DI
  • Spring创建Bean
  • Bean生命周期
  • Sping属性注入
  • 异常
  • SpringMVC
  • springCloud
  • Eureka

    缺失模块。
    1、请确保node版本大于6.2
    2、在博客根目录(注意不是yilia根目录)执行以下命令:
    npm i hexo-generator-json-content --save

    3、在根目录_config.yml里添加配置:

      jsonContent:
        meta: false
        pages: false
        posts:
          title: true
          date: true
          path: true
          text: false
          raw: false
          content: false
          slug: false
          updated: false
          comments: false
          link: false
          permalink: false
          excerpt: false
          categories: false
          tags: true
    

  • 我的OSCHINA
  • 我的CSDN
  • 我的GITHUB
  • 一生太水