IT小栈

  • 主页
  • Java基础
  • RocketMQ
  • Kafka
  • Redis
  • Shiro
  • Spring
  • Spring Boot
  • Spring Cloud
  • 资料链接
  • 关于
所有文章 友链

IT小栈

  • 主页
  • Java基础
  • RocketMQ
  • Kafka
  • Redis
  • Shiro
  • Spring
  • Spring Boot
  • Spring Cloud
  • 资料链接
  • 关于

RocketMQ推送消费源码分析(一)-消息拉取

2020-05-29

RocketMQ的消费提供推拉两种模式,push模式无疑是首选,我们项目使用的比较深入需要相应的设置一些Consuner端的参数来满足项目需要,对默认的设置进行修改,有些参数总是模棱两可,今天我们重点分析下,push的整个工作机制。

1、整体流程

话不多说,直接上图,图不是很专业重在说明重要节点流程

Push模式的消费我们分为三个章节来介绍

1、第一节,我们重点介绍PushConsuemr的启动过程,消息拉取的过程,及消息分发消费

2、第二节,我们重点介绍Rebalance(重平衡)的整个过程,包括加入消费者如何通知同一个消费组中的其他消费者进行Rebalance

3、第三节,我们重点介绍根据消息消费的返回状态,相应的处理及重置消费偏移量的过程

本节我们分析PushConsuemr启动过程中都做了哪些工作,及消息从Broker端拉取后消息如何推送到我们客户端的。

在分析之前你是否有过这样的疑问

1、我们知道PushConsumer内部是多线程处理的,多线程去拉取同一个Topic下的同一个MessageQueue那内部怎么处理的?

2、我们创建PushConsumer可以配置consumeThreadMin和consumeThreadMax,消费线程的线程数,这个线程数就是去broker端拉取消息的线程池设置吗?

3、我们创建PushConsumer可以配置consumeMessageBatchMaxSize,说明可以批量拉取消息,但是我们发现设置了100获取到的最大也就是32条这又是怎么回事?

4、我们创建PushConsumer参数的配置中怎么会有关于pull的参数配置pullInterval消息的拉取间隔时间,我们这个明明是Push模式,这个参数影响了什么?

2、Push消费启动初始化过程

DefaultMQPushConsumer.start()的启动实际上是调用了DefaultMQPushConsumerImpl.start().那么我们就重点分析下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
//相关的参数校验
this.checkConfig();
//拷贝订阅信息到,<topic,tag>集群模式需要将该消费组的重试主题也放进去, rebalanceImpl.subscriptionInner(Map中)
this.copySubscription();
//如果消息消费模式为集群模式,并且当前的实例名为 DEFAULT,替换为当前客户端进程的PID
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
//获取或者创建客户端实例(很重要)
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
//完善rebalanceImpl实例
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
//pullAPIWrapper,消息拉取API封装类
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
//获取消费进度存储,如果是集群模式(broker端存储消费进度),使用远程存储 RemoteBrokerOffsetStore,如果是广播模式(消费进度在客户端本地保存),则使用本地存储LocalFileOffsetStore
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
//加载消息消费进度
this.offsetStore.load();

if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
//创建ConsumeMessageConcurrentlyService将Listenern设置进去
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
//消息消费服务并启动
this.consumeMessageService.start();
//向远程 Broker 服务器注册消费者
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
//启动客户端实例(核心方法)
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
//更新该主题的路由信息订阅信息
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
//检测客户端的consumer一些配置在broker端进行校验
this.mQClientFactory.checkClientInBroker();
//客户端发送心跳包给broker
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
//消息消费重新负载
this.mQClientFactory.rebalanceImmediately();
}

2.1、MQClientInstance创建客户端实例的过程

this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
//客户端IP+"@"+instanceName(客户端设置的不设置集群模式默认是DEFAULT,会转换为PID)+"@"+unitName(自己设置的)
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
this.clientConfig = clientConfig;
this.instanceIndex = instanceIndex;
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);

if (this.clientConfig.getNamesrvAddr() != null) {
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
}

this.clientId = clientId;

this.mQAdminImpl = new MQAdminImpl(this);
//拉取消息的服务类(线程类)
this.pullMessageService = new PullMessageService(this);
//消费重平衡服务类(线程类)
this.rebalanceService = new RebalanceService(this);
//用于消费失败后发送给broker端的延迟消息
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
this.defaultMQProducer.resetClientConfig(clientConfig);
//消费状态管理类
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
}

我们看到在创建客户端实例的时候同时创建了几个重量级的对象,pullMessageService(拉取消息的服务类)、rebalanceService(消费重平衡服务类)等等。

2.2、MQClientInstance启动过程

mQClientFactory.start();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
this.mQClientAPIImpl.start();
//启动各种定时器
this.startScheduledTask();
//开启拉取消息的服务(还没进行重平衡pullRequestQueue里面是空)
this.pullMessageService.start();
//开始再平衡服务(等待20秒进行重平衡)
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}

2.2.1、启动了各种定时器startScheduledTask

这里面的定时器后面分析过程中都会用到,包括更新主题的路由信息,Client端发送心跳给所有的Broker,消费后的消费位移持久化等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
private void startScheduledTask() {
//定时获取name server address地址信息 2分钟获取一次
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
//更新主题的路由信息,默认30秒
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
//给所有broker发送心跳(默认30秒,可配置)
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
//持久化消息消费进度,默认5秒保存一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

//调整线程池大小(内部实现是空的)
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}

2.2.2、消息拉取服务pullMessageService启动

pullMessageService.start(),在创建MQClientInstance的时候我们知道创建了消息拉取服务对象,此时启动该服务,是个多线程服务启动会调用其run(),PullMessageService.run();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();

public void run() {
log.info(this.getServiceName() + " service started");
//一般不会调用stopped,除非是客户端关闭
while (!this.isStopped()) {
try {
//从队列中获取,拉取请求(消息队列、处理队列、消费组信息)
PullRequest pullRequest = this.pullRequestQueue.take();
//拉取消息(核心方法)
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}

log.info(this.getServiceName() + " service end");
}

我们发现这个其实就是一个死循环,从LinkedBlockingQueue对列中获取一个PullRequest对象,此对象用于消息拉取这个是重点方法,我们本节会继续分析

2.2.3、再平衡服务rebalanceService启动

rebalanceService.start();在创建MQClientInstance的时候我们知道创建了再平衡服务,此时启动该服务,是个多线程服务启动会调用其run()方法,RebalanceService.run();

1
2
3
4
5
6
7
8
9
10
11
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
//等待20秒进行重平衡
this.waitForRunning(waitInterval);
//再平衡的核心方法
this.mqClientFactory.doRebalance();
}

log.info(this.getServiceName() + " service end");
}

我们发现其是个定时任务,每隔20s进行一次重平衡,RocketMQ推送消费源码分析-重平衡介绍

2.3、相关服务执行

1
2
3
4
5
6
7
8
//更新该主题的路由信息订阅信息
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
//检测客户端的consumer一些配置在broker端进行校验
this.mQClientFactory.checkClientInBroker();
//客户端发送心跳包给broker
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
//消息消费重新负载
this.mQClientFactory.rebalanceImmediately();

客户端发送心跳包给broker这个在,2.2.1章节中介绍了每隔30秒发送心跳包给Broker,立刻发送是为了让Broker感知新的Consuemr创建了,为下一步重平衡打下基础,需要用到,在RocketMQ推送消费源码分析-重平衡中介绍,包括rebalanceImmediately();

3、消息拉取

在2.2.2章节中我们说过执行pullMessage(pullRequest)核心方法拉取消息,PullRequest从队列中获取的,是rebalance重平衡是放入的,后面章节分析放入的过程,本节我们直接看拉取消息。跟踪代码DefaultMQPushConsumerImpl.pullMessage()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
public void pullMessage(final PullRequest pullRequest) {
final ProcessQueue processQueue = pullRequest.getProcessQueue();
//省略其他代码......
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
//判断处理消息队列中的目前消息总数是否大于设定的1000
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
//等待50毫秒再将pullRequest回收到PullMessageService.pullRequestQueue中,供下一次使用
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
//每1000次,抛出一个警告做流量控制实际就是说明,处理队列处理消息慢
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
//缓存消息的大小大于100M
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
//等待50毫秒再将pullRequest回收到PullMessageService.pullRequestQueue中,供下一次使用
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
//每1000次,抛出一个警告做流量控制实际就是说明,消息字节比较大
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
//不是顺序消费
if (!this.consumeOrderly) {
//最大跨度偏移量大于指定的2000,做控制
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
}
return;
}
} else {
//顺序消息,处理队列锁定
if (processQueue.isLocked()) {
if (!pullRequest.isLockedFirst()) {
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
pullRequest, offset, brokerBusy);
if (brokerBusy) {
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
pullRequest, offset);
}

pullRequest.setLockedFirst(true);
pullRequest.setNextOffset(offset);
}
} else {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
}
//获取订阅信息
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}
//记录开始时间
final long beginTimestamp = System.currentTimeMillis();

//拉取消息的回调处理
PullCallback pullCallback = new PullCallback() {
//省略其他代码....第四节会说明消息的处理
};

boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
//从缓存中读取获取偏移量
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}

String subExpression = null;
boolean classFilter = false;
//获取订阅信息
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
//每次请求时是否更新订阅关系(false)
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
subExpression = sd.getSubString();
}

classFilter = sd.isClassFilterMode();
}

int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
try {
//拉取消息
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
}

3.1、做相关的校验及流控

if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {

if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {

if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {

控制拉取消息的速率,限制消息处理队列中的消息不能大于1000,限制处理消息的总的字节大小不能大于100M,限制处理的消息的最大跨度偏移量不能大于指定的2000,任何一条成立则延迟50毫秒后,回收此次的拉取任务请求资源。

3.2、从broker拉取消息

做好流控之后从broker端拉取消息this.pullAPIWrapper.pullKernelImpl(…),最终调用的是broker端的PullMessageProcessor.processRequest方法,其中有一个核心方法

1
2
3
4
5
6
7
8
9
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
throws RemotingCommandException {
//省略之前的校验、必要参数获取等.....
final GetMessageResult getMessageResult =
//核心方法,获取消息 this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
//省略获取消息后的处理及构建返回对象.....
return response;
}

我们看一下这个获取消息的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
final int maxMsgNums,
final MessageFilter messageFilter) {

//省略其他代码.....

GetMessageResult getResult = new GetMessageResult();
//获取消息的最大物理偏移量
final long maxOffsetPy = this.commitLog.getMaxOffset();
//根据主题和队列编号获取消费队列(唯一)
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
if (consumeQueue != null) {
minOffset = consumeQueue.getMinOffsetInQueue();
maxOffset = consumeQueue.getMaxOffsetInQueue();

//省略相关的判断关于偏移量的大小.....
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
if (bufferConsumeQueue != null) {
//省略其他代码......

int i = 0;
//获取消息拉取最大过滤总数
final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
// 循环获取 消息位置信息
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); // 消息物理位置offset
int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); // 消息长度
long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();// 消息tagsCode
// 设置消息物理位置拉取到的最大offset
maxPhyOffsetPulling = offsetPy;
// 当 offsetPy 小于 nextPhyFileStartOffset 时,意味着对应的 Message 已经移除,所以直接continue,直到可读取的Message。
if (nextPhyFileStartOffset != Long.MIN_VALUE) {
if (offsetPy < nextPhyFileStartOffset)
continue;
}
/// 校验 commitLog 是否需要硬盘,无法全部放在内存
boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
//判断消息什么时候返回给Client端
if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
isInDisk)) {
break;
}
//省略各种判断....

//获取消息(核心方法)
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);

//省略其他代码....
}
}
}
return getResult;
}

判断消息是否在磁盘中还是内存中

1
2
3
4
private boolean checkInDiskByCommitOffset(long offsetPy, long maxOffsetPy) {
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
return (maxOffsetPy - offsetPy) > memory;
}

计算公式:系统总的物理内存*40% 判断消息是否在缓存中。

判断消息什么拉取好(数量、大小到一定的限制)返回给Client端

this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
isInDisk)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private boolean isTheBatchFull(int sizePy, int maxMsgNums, int bufferTotal, int messageTotal, boolean isInDisk) {
if (isInDisk) {
//最大字节数(1024 * 64)
if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk()) {
return true;
}
//最大传输个数(8)
if (messageTotal > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk() - 1) {
return true;
}
} else {
//最大字节数(1024 * 256)
if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) {
return true;
}
//最大传输个数(32)
if (messageTotal > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory() - 1) {
return true;
}
}
return false;
}

这就是broker对返回给Client端的消息的大小及数量的限制,并且消息从磁盘和从缓存中获取的参数限制还不一样。这就解答了我们开始提出的第三个问题,为什么我们设置了批量消费的个数还是不行,因为在Broker端参数中已经做了限制。

1
2
3
4
5
6
7
8
9
10
11
12
13
public SelectMappedBufferResult getMessage(final long offset, final int size) {
//获取MappedFile文件的大小
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
//通过偏移量获取需要获取消息的那个MappedFile
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
//
if (mappedFile != null) {
//获取到了相对偏移量,相对于获取的MappedFile的
int pos = (int) (offset % mappedFileSize);
return mappedFile.selectMappedBuffer(pos, size);
}
return null;
}

获取消息,MappedFile本节没有终点分析,如果想要深入了解的可以查看,《RocketMQ内存映射》和 《RocketMQ存储文件》两个章节里面终点讲解了消息文件及内部的映射。

4、客户端消息处理

在3、消息拉取中我们省略了关于从broker端拉取到消息的Client端处理,我们在这分析下PullCallback pullCallback = new PullCallback()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
//从Broker拉取消息
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
//本次从broker拉取消息的时间
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
//判断拉取消息的内容是否为null
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());

boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
//核心方法,处理消息的消费和消费状态后的处理(异步消息处理)
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
//每次拉取的间隔时间,默认是0,立刻拉取消息(回收pullRequest)
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}

if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}

break;
case NO_NEW_MSG:
//省略。。。。。。
break;
case NO_MATCHED_MSG:
//省略。。。。。。
break;
case OFFSET_ILLEGAL:
//省略。。。。。。
break;
default:
break;
}
}
}
};

我们只分析获取到消息(FOUND),其他消息的状态比较简单不做分析

4.1、相关参数设置

boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());这个方法就是在processQueue属性中设置消息的大小、消息的个数、消息的最大跨度等供3.1、做相应的流控

4.2、获取到的消息的提交消费

DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
//获取消费端配置的最大批量消费消息的个数
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
//判断拉取的消息个数是否大于配置的消费消息个数
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
//多线程提交消息
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
//5秒钟后再次提交
this.submitConsumeRequestLater(consumeRequest);
}
} else {
for (int total = 0; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}

ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}

我们看到了consumeBatchSize消费者端参数,这才是这个参数作用的地方,broker拉取一批数据根据消费端的每次消费的个数进行相关的处理,如果处理失败5秒钟后再次提交。

我们再看看consumeExecutor线程池对象的创建

this.consumeExecutor = new ThreadPoolExecutor(
    this.defaultMQPushConsumer.getConsumeThreadMin(),
    this.defaultMQPushConsumer.getConsumeThreadMax(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.consumeRequestQueue,
    new ThreadFactoryImpl("ConsumeMessageThread_"));

我们看到了ConsumeThreadMin和ConsumeThreadMax,这也是消费者端参数默认都是20,这就解释了我们前面提到的问题,这个参数的作用实际就是处理从Broker端拉取到Client后,开启多线程进行处理。

我们再看看ConsumeRequest对象就是处理消息的多线程对象实际上是执行了run()方法,我们在下面的

《RocketMQ推送消费源码分析-消费处理》中重点说明。

4.3、资源回收

if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}

我们又看到了一个熟悉的参数pullInterval,上面已经说了消息分发到多线程进行异步处理了,此时,pullRequest资源也该进行回收再次使用,就是我们在2.2.2中说的拉取请求资源再次放入pullRequestQueue队列中,说明不止重平衡(初始放入)会放入,拉取消息后也会用完回收,因为主题对应的消息队列只能供一个消费者使用,这样其实就是类似于单线程拉取该消息队列中的消息。

pullInterval就是回收这个pullRequest的间隔时间可以配置就是说,两次执行拉取消息的间隔时间。

至此我们回答了我们开始提出的问题。

本文作者: 顾 明 训
本文链接: https://www.itzones.cn/2020/05/29/RocketMQ推送消费源码分析-消息拉取/
版权声明: 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!
  • PushConsumer
  • rocketMQ

扫一扫,分享到微信

微信分享二维码
RocketMQ推送消费源码分析(二)-重平衡
Redis基本数据类型之String
  1. 1. 1、整体流程
  2. 2. 2、Push消费启动初始化过程
    1. 2.1. 2.1、MQClientInstance创建客户端实例的过程
    2. 2.2. 2.2、MQClientInstance启动过程
      1. 2.2.1. 2.2.1、启动了各种定时器startScheduledTask
      2. 2.2.2. 2.2.2、消息拉取服务pullMessageService启动
      3. 2.2.3. 2.2.3、再平衡服务rebalanceService启动
    3. 2.3. 2.3、相关服务执行
  3. 3. 3、消息拉取
    1. 3.1. 3.1、做相关的校验及流控
    2. 3.2. 3.2、从broker拉取消息
  4. 4. 4、客户端消息处理
    1. 4.1. 4.1、相关参数设置
    2. 4.2. 4.2、获取到的消息的提交消费
    3. 4.3. 4.3、资源回收
© 2020 IT小栈
载入天数...载入时分秒... || 本站总访问量次 || 本站访客数人次
Hexo Theme Yilia by Litten
  • 所有文章
  • 友链

tag:

  • jvm
  • Java基础
  • kafka HW
  • kafka Leader Epoch
  • kafka
  • kafka位移主题
  • kafka位移提交
  • kafka副本机制
  • kafka ISR
  • zookeeper
  • kafka消息丢失
  • kafka日志存储
  • kafka Log Clean
  • kafka Log Compaction
  • kafka消费位移设置
  • kafka Rebalance
  • kafka分区算法
  • kafka生产者拦截器
  • kafka SASL/SCRAM
  • kafka ACL
  • redis
  • redis Ziplist
  • redis Hashtable
  • redis LinkedList
  • redis QuickList
  • redis intset
  • redis String
  • redis SDS
  • redis SkipList
  • redisDb
  • redisServer
  • redis 简介
  • Redis Cluster
  • 主从同步
  • RocketMQ高可用HA
  • 事务消息
  • 内存映射
  • MMAP
  • 同步刷盘
  • 异步刷盘
  • 消息存储文件
  • RocketMQ安装
  • 延迟消息
  • RocketMQ入门
  • 推拉模式
  • PushConsumer
  • 消费结果处理
  • rebalance
  • RocketMQ权限控制
  • RocketMQ ACL
  • 消息过滤
  • 消息重试
  • 消费位置
  • 集群消费
  • 广播消费
  • 运维命令
  • shiro源码分析
  • shiro入门
  • IOC和DI
  • Spring创建Bean
  • Bean生命周期
  • Sping属性注入
  • 异常
  • SpringMVC
  • springCloud
  • Eureka

    缺失模块。
    1、请确保node版本大于6.2
    2、在博客根目录(注意不是yilia根目录)执行以下命令:
    npm i hexo-generator-json-content --save

    3、在根目录_config.yml里添加配置:

      jsonContent:
        meta: false
        pages: false
        posts:
          title: true
          date: true
          path: true
          text: false
          raw: false
          content: false
          slug: false
          updated: false
          comments: false
          link: false
          permalink: false
          excerpt: false
          categories: false
          tags: true
    

  • 我的OSCHINA
  • 我的CSDN
  • 我的GITHUB
  • 一生太水