IT小栈

  • 主页
  • Java基础
  • RocketMQ
  • Kafka
  • Redis
  • Shiro
  • Spring
  • Spring Boot
  • Spring Cloud
  • 资料链接
  • 关于
所有文章 友链

IT小栈

  • 主页
  • Java基础
  • RocketMQ
  • Kafka
  • Redis
  • Shiro
  • Spring
  • Spring Boot
  • Spring Cloud
  • 资料链接
  • 关于

RocketMQ推送消费源码分析(三)-消费处理

2020-06-01

在 《RocketMQ推送消费源码分析(一)-消息拉取》一文中我们分析了消息的拉取过程,但是并没有分析消息在本地业务处理后返回成功或者失败后的源码是如何处理的,本节我们重点分析下。

1、背景

上两节我们分析了①②部分本节我们重点分析③从Broker获取消息后的处理。

2、消息处理

我们接着第一节分析的获取到的消息提交到ConsumeRequest多线程中执行,核心方法无疑就是run()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
public void run() {
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
}
//消费者注册的listener
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
//如果消息来自延迟队列则设置其主题为%RETRY_TOPIC%+consumerGroup
//namespace一般不做设置不做分析
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
//目前只是开启了消息轨迹才会有Hook这个,(消息处理前执行)
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<String, String>());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
//客户端程序消费消息的开始时间
long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
//核心方法,获取返回的状态(CONSUME_SUCCESS/RECONSUME_LATER)
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
}
//客户端消费消息的时间
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
//客户端15分钟没有返回消费结果认定为超时
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}
//开始可消息轨迹
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}
//返回异常或者什么都没返回认为消费失败
if (null == status) {
log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
//目前只是开启了消息轨迹才会有Hook这个,(消息处理前执行)
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}

ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
//判断处理队列是否被删除
if (!processQueue.isDropped()) {
//处理消费结果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}

从源码中我们分为三部分来说明,

第一部分就是消息处理前,需要判断该主题是否是延迟消息,如果是则设置其主题为%RETRY_TOPIC%+consumerGroup真正的主题,并且判断是否有“钩子”Hook目前只有开始了消息轨迹才会有,消息轨迹不是我们本节重点我们不做讲解。

第二部分就是讲消息发送到注册的监听器Listener中,等待返回结果,超过15分钟认为超时,也是认为本次消息处理失败,作为RECONSUME_LATER处理

第三部分就是我们本节重点,对与不同状态的消息的处理ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this)。

3、消费后状态的处理

重点分析下ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public void processConsumeResult(final ConsumeConcurrentlyStatus status
,final ConsumeConcurrentlyContext context,final ConsumeRequest consumeRequest) {
int ackIndex = context.getAckIndex();

if (consumeRequest.getMsgs().isEmpty())
return;

//根据状态做消费成功失败统计(运维方面),设置ackIndex
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}
//相关处理
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
//广播模式下失败了不会做处理,打印相关的日志
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
//返回RECONSUME_LATER才会进入
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
//发送延迟消息到broker(重要)
boolean result = this.sendMessageBack(msg, context);
//发送失败放入msgBackFailed集合中
if (!result) {
//消费次数+1
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
//放入失败的集合中
msgBackFailed.add(msg);
}
}
//1.消费结果是CONSUME_SUCCESS不会进入
//2.消费结果是RECONSUME_LATER,并且发送延迟消息失败的才会进入(做相关失败的补偿,客户端继续消费延迟5秒并增加消费次数)
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
//5秒钟后再次消费,此时的消费次数已经+1(开始了一轮客户端消费)
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}

long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
//更新消费进度
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}

分析下源码

第一部分设置ackIndex如果返回的是CONSUME_SUCCESS(消费成功),则设置成该批消费的消息的数量,如果返回的是RECONSUME_LATER则设置成-1,表示这批消息消费失败。

第二部分广播模式和集群模式是截然不同的处理方式。广播模式成功的不需要什么特殊处理,返回消费失败了只是记录失败的消息的日志不做其他处理,相当于失败就是丢弃。集群模式则不然需要对失败的消息进行相关的处理。下面进行讲解。

第三部分对应消费后的消息的消费位移进行处理,消费位移保存到本地。

3.1、广播模式消费失败处理

我们看源码分析出,只是记录失败的消息的日志不做其他处理,相当于失败就是丢弃。

log.warn(“BROADCASTING, the message consume failed, drop it, {}”, msg.toString());

3.2、集群模式消费失败处理

集群模式下的消费失败后,通知Broker对失败的消息进行处理this.sendMessageBack(msg, context);。并且处理失败后还做了一次补偿机制延迟5秒后继续提交给客户端进行消费处理,相当与从Broker拉取到的消息进行相关的处理。保证消息不会丢失。追溯源码我们发现发送给Broker消费失败的核心代码是defaultMQPushConsumerImpl.sendMessageBack();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
//发送失败的消息进入,延迟队列,此时延迟级别还是0,服务端会加3
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
//构建延迟队列的消息信息
Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());

String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);

newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
//延迟队列(消息重试也是利用了延迟队列,但是是从第三级开始)
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
//失败了就构建本地客户端的producer发送到broker(补偿机制)
this.mQClientFactory.getDefaultMQProducer().send(newMsg);
} finally {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
}
}

该方法我们分为两部分进行讲解

第一部分通知Broker该消息消费失败,我们分析下Broker端的处理

第二部分如果通知测Broker消费失败没有成功,则进行补偿机制构建延迟消息,延迟级别从第三级开始(客户端Broker就是这么处理的保持一致),启用Producer发送延迟消息,关于延迟消息请参照 《RocketMQ延迟消息》一文中有关于延迟消息的详细讲解及源码分析。

3.2.1、Broker对于消费失败的处理

Broker的处理方法是SendMessageProcessor.consumerSendMsgBack()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);

//省略其他代码......

//构建延迟主题的名
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();

int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
//构建延迟主题的配置
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return response;
}

if (!PermName.isWriteable(topicConfig.getPerm())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
return response;
}
//根据偏移量从broker上获取这个消息
MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
if (null == msgExt) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("look message by offset failed, " + requestHeader.getOffset());
return response;
}

final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
//设置消息的属性为延迟消息
if (null == retryTopic) {
MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
}
msgExt.setWaitStoreMsgOK(false);
//获取延迟级别,默认是0
int delayLevel = requestHeader.getDelayLevel();
//最大消费次数是16
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
//判断消费次数是否达到上限16次
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;

topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0
);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return response;
}
} else {
//初始值的设置,消息构建是默认是0,延迟队列从3开始
if (0 == delayLevel) {
delayLevel = 3 + msgExt.getReconsumeTimes();
}
//已经是延迟队列了,直接设置
msgExt.setDelayTimeLevel(delayLevel);
}
//构建延迟消息
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(newTopic);
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));

msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);

String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
//消息持久化到broker中
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
if (putMessageResult != null) {
switch (putMessageResult.getPutMessageStatus()) {
case PUT_OK:
String backTopic = msgExt.getTopic();
String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (correctTopic != null) {
backTopic = correctTopic;
}

this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);

response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);

return response;
default:
break;
}

response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(putMessageResult.getPutMessageStatus().name());
return response;
}

response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("putMessageResult is null");
return response;
}

判断延迟级别参数,其实我们所谓的消息失败后的重试是借助延迟消息进行处理的,延迟级别默认是18个级别分别是”1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;可以配置的,这个参数即会影响延迟消息也会影响消费失败后的什么时候返回消费的时间设置。关于把消息放入到延迟消息中我们就不做分析了。感兴趣的可以参照《RocketMQ延迟消息》和 《RocketMQ内存映射》

广播模式对于消费失败的消息不做处理只是记录日志

集群模式下消费失败会发送失败的消息到broker到延迟消息中,并做了两次补偿机制,第一次broker返回失败则采用启用Producer端发送延迟消息的方式进行补偿,如果还是失败,则采用延迟5秒进行本地的客户端再次消费,相当于又进行一轮消费,这样保证消息一定会被消费端消费

4、消费位移的提交

上面我们分析了消费位移做了保存this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);

这就涉及了两种位移提交了,广播模式下消费位移是保存在客户端本地的并且不会更新broker端的消息的消费状态,集群模式的位移文件是在broekr端的需要远程执行更新操作,我们看下源码。

分别看一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
if (mq != null) {
//获取当前队列的之前的偏移量
AtomicLong offsetOld = this.offsetTable.get(mq);
if (null == offsetOld) {
offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
}

if (null != offsetOld) {
//是否需要做一次判断,比较旧的偏移量和新的哪一个大,依照大的作为最新的偏移量
if (increaseOnly) {
MixAll.compareAndIncreaseOnly(offsetOld, offset);
} else {
offsetOld.set(offset);
}
}
}
}

我们发现里面的实现是一样的,就不分开讲了,我们发现只是更新了Consuemr对象中的位移。我们回头看一下 《RocketMQ推送消费源码分析(一)-消息拉取》中说了在mQClientFactory.start()启动的时候启动了各种定时器,其中就有一个

1
2
3
4
5
6
7
8
9
10
11
//持久化消息消费进度,默认5秒保存一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

原来消费位移是每隔5秒才会保存一次。在执行定时器方法的时候才会区分执行的是本地的偏移量文件还是远程的Broker端的偏移量文件。

4.1、广播模式提交消费位移

LocalFileOffsetStore.persistAll();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void persistAll(Set<MessageQueue> mqs) {
if (null == mqs || mqs.isEmpty())
return;
//偏移量序列化对象
OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
if (mqs.contains(entry.getKey())) {
AtomicLong offset = entry.getValue();
//设置每一个消息队列的消费位移
offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
}
}
//json字符串
String jsonString = offsetSerializeWrapper.toJson(true);
if (jsonString != null) {
try {
//写入文件(重要)
MixAll.string2File(jsonString, this.storePath);
} catch (IOException e) {
log.error("persistAll consumer offset Exception, " + this.storePath, e);
}
}
}

我们分为两部分讲解,第一部分将准备每一个消息队里的偏移量对象构建集合,序列化成json字符串

第二部分就是写入偏移量文件中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void string2File(final String str, final String fileName) throws IOException {
//创建临时文件
String tmpFile = fileName + ".tmp";
string2FileNotSafe(str, tmpFile);
//创建备份文件
String bakFile = fileName + ".bak";
//读取之前的偏移量文件中的内容
String prevContent = file2String(fileName);
if (prevContent != null) {
//存在就创建备份文件
string2FileNotSafe(prevContent, bakFile);
}
//删除原有的偏移量文件
File file = new File(fileName);
file.delete();
//将临时文件重命名为正式偏移量文件
file = new File(tmpFile);
file.renameTo(new File(fileName));
}

我们分析源码发现并不是简单的写入到偏移量文件中,先创建了临时文件并写入了最新的偏移量,最后将之前就的偏移量文件删除后修改临时文件的文件名。我们还发先为了安全偏移量文件每次都做了备份,备份文件没有其他程序访问直接修改。

我们发现这样本地就会存在两份偏移量文件只是有一个是备份文件是上一次的偏移量写入的值。由于定时器一直在执行只要消费完消息后在5秒后两个文件会是相同的。

4.2、集群模式提交消费位移

4.2.1、client端上传消费位移

RemoteBrokerOffsetStore.persistAll();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public void persistAll(Set<MessageQueue> mqs) {
if (null == mqs || mqs.isEmpty())
return;

final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
if (!mqs.isEmpty()) {
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
MessageQueue mq = entry.getKey();
AtomicLong offset = entry.getValue();
if (offset != null) {
//判断该消息队列是否有偏移量,没有的直接移除
if (mqs.contains(mq)) {
try {
//远程更新broker端的位移
this.updateConsumeOffsetToBroker(mq, offset.get());
log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
this.groupName,
this.mQClientFactory.getClientId(),
mq,
offset.get());
} catch (Exception e) {
log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
}
} else {
unusedMQ.add(mq);
}
}
}
}
//无用的移除
if (!unusedMQ.isEmpty()) {
for (MessageQueue mq : unusedMQ) {
this.offsetTable.remove(mq);
log.info("remove unused mq, {}, {}", mq, this.groupName);
}
}
}

我们看到是每一个消费队列去更新到的Broker端,Broker端的处理方法是ConsumerManageProcessor.updateConsumerOffset(…)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);
final UpdateConsumerOffsetRequestHeader requestHeader =
(UpdateConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
//更新消费位移
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}

没什么分析的我们直接看核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,
final long offset) {
//构建Map的key=topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
//提交
this.commitOffset(clientHost, key, queueId, offset);
}

private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
if (null == map) {
map = new ConcurrentHashMap<Integer, Long>(32);
map.put(queueId, offset);
//放入集合中
this.offsetTable.put(key, map);
} else {
Long storeOffset = map.put(queueId, offset);
if (storeOffset != null && offset < storeOffset) {
log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
}
}
}

我们发现仅仅是更新本地的Map对象,key(topic@group)是主题和消费组组成,value是一个Map(queueId,offset)记录每一个消费队列的消费位移。我们发现并没有持久化到Broker端的文件中,难道和Client端的处理一样的定时器,那么这个定时器什么时候启动呢,我们猜测是在Broker启动时,因为要加载一堆配置文件并且需要知道你每一个消费队列的消费进度。

4.2.2、Broker定时器写入偏移量到文件中

验证一下我们的猜想

我们在Broker启动时记载的BrokerController.initialize()方法中看到了

1
2
3
4
5
6
7
8
9
10
11
12
//更新消费位移
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//更新消费位移
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

5秒钟执行一次,我们追踪源码查看核心方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public synchronized void persist() {
//获取偏移量的JOSN字符串
String jsonString = this.encode(true);
if (jsonString != null) {
//获取配置文件路径rootDir + File.separator + "config" + File.separator + "consumerOffset.json"
String fileName = this.configFilePath();
try {
//写入文件
MixAll.string2File(jsonString, fileName);
} catch (IOException e) {
log.error("persist file " + fileName + " exception", e);
}
}
}

那我们看下序列化的是什么

1
2
3
4
public String encode(final boolean prettyFormat) {
//this就是本身,就一个成员变量offsetTable记录的就是client端上传的偏移量信息
return RemotingSerializable.toJson(this, prettyFormat);
}

拿到了偏移量文件,文件路径rootDir + File.separator + “config” + File.separator + “consumerOffset.json”

直接写入文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void string2File(final String str, final String fileName) throws IOException {
//创建临时文件
String tmpFile = fileName + ".tmp";
string2FileNotSafe(str, tmpFile);
//创建备份文件
String bakFile = fileName + ".bak";
//读取之前的偏移量文件中的内容
String prevContent = file2String(fileName);
if (prevContent != null) {
//存在就创建备份文件
string2FileNotSafe(prevContent, bakFile);
}
//删除原有的偏移量文件
File file = new File(fileName);
file.delete();
//将临时文件重命名为正式偏移量文件
file = new File(tmpFile);
file.renameTo(new File(fileName));
}

这个是个公共的方法上面已经分析规律,不做分析了,至此broker端的持久化偏移量到文件中分析完了。

本文作者: 顾 明 训
本文链接: https://www.itzones.cn/2020/06/01/RocketMQ推送消费源码分析-消费处理/
版权声明: 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!
  • 消费结果处理
  • rocketMQ

扫一扫,分享到微信

微信分享二维码
Redis基本数据类型之Hash
RocketMQ推送消费源码分析(二)-重平衡
  1. 1. 1、背景
  2. 2. 2、消息处理
  3. 3. 3、消费后状态的处理
    1. 3.1. 3.1、广播模式消费失败处理
    2. 3.2. 3.2、集群模式消费失败处理
      1. 3.2.1. 3.2.1、Broker对于消费失败的处理
  4. 4. 4、消费位移的提交
    1. 4.1. 4.1、广播模式提交消费位移
    2. 4.2. 4.2、集群模式提交消费位移
      1. 4.2.1. 4.2.1、client端上传消费位移
      2. 4.2.2. 4.2.2、Broker定时器写入偏移量到文件中
© 2020 IT小栈
载入天数...载入时分秒... || 本站总访问量次 || 本站访客数人次
Hexo Theme Yilia by Litten
  • 所有文章
  • 友链

tag:

  • jvm
  • Java基础
  • kafka HW
  • kafka Leader Epoch
  • kafka
  • kafka位移主题
  • kafka位移提交
  • kafka副本机制
  • kafka ISR
  • zookeeper
  • kafka消息丢失
  • kafka日志存储
  • kafka Log Clean
  • kafka Log Compaction
  • kafka消费位移设置
  • kafka Rebalance
  • kafka分区算法
  • kafka生产者拦截器
  • kafka SASL/SCRAM
  • kafka ACL
  • redis
  • redis Ziplist
  • redis Hashtable
  • redis LinkedList
  • redis QuickList
  • redis intset
  • redis String
  • redis SDS
  • redis SkipList
  • redisDb
  • redisServer
  • redis 简介
  • Redis Cluster
  • 主从同步
  • RocketMQ高可用HA
  • 事务消息
  • 内存映射
  • MMAP
  • 同步刷盘
  • 异步刷盘
  • 消息存储文件
  • RocketMQ安装
  • 延迟消息
  • RocketMQ入门
  • 推拉模式
  • PushConsumer
  • 消费结果处理
  • rebalance
  • RocketMQ权限控制
  • RocketMQ ACL
  • 消息过滤
  • 消息重试
  • 消费位置
  • 集群消费
  • 广播消费
  • 运维命令
  • shiro源码分析
  • shiro入门
  • IOC和DI
  • Spring创建Bean
  • Bean生命周期
  • Sping属性注入
  • 异常
  • SpringMVC
  • springCloud
  • Eureka

    缺失模块。
    1、请确保node版本大于6.2
    2、在博客根目录(注意不是yilia根目录)执行以下命令:
    npm i hexo-generator-json-content --save

    3、在根目录_config.yml里添加配置:

      jsonContent:
        meta: false
        pages: false
        posts:
          title: true
          date: true
          path: true
          text: false
          raw: false
          content: false
          slug: false
          updated: false
          comments: false
          link: false
          permalink: false
          excerpt: false
          categories: false
          tags: true
    

  • 我的OSCHINA
  • 我的CSDN
  • 我的GITHUB
  • 一生太水