IT小栈

  • 主页
  • Java基础
  • RocketMQ
  • Kafka
  • Redis
  • Shiro
  • Spring
  • Spring Boot
  • Spring Cloud
  • 资料链接
  • 关于
所有文章 友链

IT小栈

  • 主页
  • Java基础
  • RocketMQ
  • Kafka
  • Redis
  • Shiro
  • Spring
  • Spring Boot
  • Spring Cloud
  • 资料链接
  • 关于

RocketMQ推送消费源码分析(二)-重平衡

2020-05-31

上一节我们讲到了push模式的消费下的消息拉取过程中对于如何获取拉取资源没有想到,本节我们重点讲解下,集群模式下消费者的增加会触发消费的重平衡来保证每个消费者“分担”消息的消费。

1、背景

我们直接上图,图中的②就是我们本节要讲解的内容。

2、客户端发送心跳到每一个Broker

上节我们分析了DefaultMQPushConsumer.start()的启动会发送心跳和开启重平衡

1
2
3
4
//客户端发送心跳包给broker
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
//消息消费重新负载
this.mQClientFactory.rebalanceImmediately();

我们先分析下心跳发现核心方法是MQClientInstance.sendHeartbeatToAllBroker()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
private void sendHeartbeatToAllBroker() {
//准备心跳数据
final HeartbeatData heartbeatData = this.prepareHeartbeatData();
final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
if (producerEmpty && consumerEmpty) {
log.warn("sending heartbeat, but no consumer and no producer");
return;
}
if (!this.brokerAddrTable.isEmpty()) {
//记录心跳的总次数
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
//获取Broker的列表信息
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
//给每一个Broker发送心跳
while (it.hasNext()) {
Entry<String, HashMap<Long, String>> entry = it.next();
String brokerName = entry.getKey();
HashMap<Long, String> oneTable = entry.getValue();
if (oneTable != null) {
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
Long id = entry1.getKey();
String addr = entry1.getValue();
if (addr != null) {
if (consumerEmpty) {
if (id != MixAll.MASTER_ID)
continue;
}

try {
//发送给broker端心跳,获取最新的版本号
int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
}
this.brokerVersionTable.get(brokerName).put(addr, version);
if (times % 20 == 0) {
log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
log.info(heartbeatData.toString());
}
} catch (Exception e) {
if (this.isBrokerInNameServer(addr)) {
log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
} else {
log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
id, addr, e);
}
}
}
}
}
}
}
}

2.1、心跳的数据内容

我们看下发给broker的是什么数据,Client端的实例不仅有消费者同时也可给生产者使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private HeartbeatData prepareHeartbeatData() {
HeartbeatData heartbeatData = new HeartbeatData();
//客户端ID,客户端IP+"@"+instanceName(客户端设置的不设置集群模式默认是DEFAULT,会转换为PID)+"@"+unitName(自己设置的)
heartbeatData.setClientID(this.clientId);
//消费者信息consumerTable的Map结构中是<consumerGroup,MQConsumerInner> 消费组和消费者实例
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
ConsumerData consumerData = new ConsumerData();
//消费者组名
consumerData.setGroupName(impl.groupName());
//消费类型PULL(拉取)、PUSH(推送)
consumerData.setConsumeType(impl.consumeType());
//消费模式CLUSTERING(集群)、BROADCASTING(广播)
consumerData.setMessageModel(impl.messageModel());
//消费的起始位置:CONSUME_FROM_FIRST_OFFSET(队列头部)、
//CONSUME_FROM_LAST_OFFSET(队列尾部【默认】)
//CONSUME_FROM_TIMESTAMP(时间点)
consumerData.setConsumeFromWhere(impl.consumeFromWhere());
//订阅的主题信息
consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());
consumerData.setUnitMode(impl.isUnitMode());

heartbeatData.getConsumerDataSet().add(consumerData);
}
}
//生产者信息producerTable的Map结构中是<producerGroup,MQProducerInner> 生产者组和生产者实例
for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
MQProducerInner impl = entry.getValue();
if (impl != null) {
ProducerData producerData = new ProducerData();
producerData.setGroupName(entry.getKey());

heartbeatData.getProducerDataSet().add(producerData);
}
}
return heartbeatData;
}

主要是当前这个客户端实例中包含的生产者信息和消费者信息

2.2、broker端心跳的处理

Broker处理心跳是在ClientManageProcessor中处理的。对于Consumer的处理就稍微复杂一点,我们分析下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
//获取心跳数据
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
//客户端通道信息
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
ctx.channel(),
heartbeatData.getClientID(),
request.getLanguage(),
request.getVersion()
);

for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
//根据消费组名获取订阅组的配置信息
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
data.getGroupName());
boolean isNotifyConsumerIdsChangedEnable = true;
//省略部分代码......
//判断当前的消费者是否是新的
boolean changed = this.brokerController.getConsumerManager().registerConsumer(
data.getGroupName(),
clientChannelInfo,
data.getConsumeType(),
data.getMessageModel(),
data.getConsumeFromWhere(),
data.getSubscriptionDataSet(),
isNotifyConsumerIdsChangedEnable
);

if (changed) {
log.info("registerConsumer info changed {} {}",
data.toString(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel())
);
}
}
//简单的注册生产者
for (ProducerData data : heartbeatData.getProducerDataSet()) {
this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
clientChannelInfo);
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}

里面最重要的方法则是this.brokerController.getConsumerManager().registerConsumer(…)注册Consumer的过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
//消费组信息对象,包含订阅者的消费组名及消费者客户端的通道集合信息
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) {
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
//判断是否客户端的通道有改变(一般就是注册)消费组信息中会维护所有的消费者客户端
//格式ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
//判断是否订阅信息变了
boolean r2 = consumerGroupInfo.updateSubscription(subList);

if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
//通知消费者组内的其他消费者执行重平衡
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
//注册
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);

return r1 || r2;
}

2.3、Broker端通知组内的消费者开启重平衡

就是说消费者组内的消费者改变或者订阅信息改变都会执行重平衡,consumerIdsChangeListener.handle(…)我们是注册的所以一定会执行这个方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void handle(ConsumerGroupEvent event, String group, Object... args) {
if (event == null) {
return;
}
switch (event) {
case CHANGE:
if (args == null || args.length < 1) {
return;
}
//获取所有的客户端的通道信息列表
List<Channel> channels = (List<Channel>) args[0];
//默认notifyConsumerIdsChangedEnable是开启的
if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {
for (Channel chl : channels) {
//通知每一个消费者
this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);
}
}
break;
case UNREGISTER:
this.brokerController.getConsumerFilterManager().unRegister(group);
break;
case REGISTER:
if (args == null || args.length < 1) {
return;
}
Collection<SubscriptionData> subscriptionDataList = (Collection<SubscriptionData>) args[0];
this.brokerController.getConsumerFilterManager().register(group, subscriptionDataList);
break;
default:
throw new RuntimeException("Unknown event " + event);
}
}

至此我们分析了原来客户端发送心跳不仅是检测客户端是否在线,同时保证消费者注册和订阅信息改变来触发重平衡。

我们看下Broker调用的客户端的方法ClientRemotingProcessor.notifyConsumerIdsChanged()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
try {
final NotifyConsumerIdsChangedRequestHeader requestHeader =
(NotifyConsumerIdsChangedRequestHeader) request.decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class);
log.info("receive broker's notification[{}], the consumer group: {} changed, rebalance immediately",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.getConsumerGroup());
//立刻执行rebalance
this.mqClientFactory.rebalanceImmediately();
} catch (Exception e) {
log.error("notifyConsumerIdsChanged exception", RemotingHelper.exceptionSimpleDesc(e));
}
return null;
}

看到mqClientFactory.rebalanceImmediately()这个我们是否比较熟悉那就是我们开始提到的执行心跳的同时也执行重平衡。

3、客户端的重平衡

mqClientFactory.rebalanceImmediately()最后我们跟踪到其核心代码是RebalanceImpl.doRebalance();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void doRebalance(final boolean isOrder) {
//消费者的订阅信息(集群模式下包含重试主题)
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
//根据主题执行重平衡
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}

3.1、重平衡分配策略

根据主题执行重平衡this.rebalanceByTopic(topic, isOrder);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
case CLUSTERING: {
//1.从topicSubscribeInfoTable列表中获取与该topic相关的所有消息队列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//2.从broker端获取消费该消费组的所有客户端clientId,每个消费者都需要向broker发送心跳包
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}

if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}

if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
//排序
Collections.sort(mqAll);
Collections.sort(cidAll);

//3.创建DefaultMQPushConsumer对象时默认设置为AllocateMessageQueueAveragely
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
//4.调用AllocateMessageQueueAveragely.allocate方法,获取当前client分配消费队列
List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
//5.将分配得到的allocateResult 中的队列放入allocateResultSet 集合
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
//6.更新updateProcessQueue
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}

广播模式下的重平衡只能是订阅者信息变了,我们不做讨论,广播模式下每个消费者都会独享消费订阅的全部消息

集群模式比较复杂我们分析下,我们需要获取订阅者的主题的消息队列MessageQueue集合,需要获取该订阅组的消费者客户端的集合,对其进行排序供下面算法使用。

提供了很多种算法我们本节不做讨论需要了解的请查看 《RocketMQ消费模式》1.2章节中重点介绍了其分配策略

至此每个客户端都会执行同一个分配策略,这个就能保证每个消费者只消费自己管理的MessageQueue的消息

3.2、客户端获取的消费队列集合放入拉取资源中

获取到了消费队列的集合,下面我们分析如何放入拉取资源中,就是我们在上一节 RocketMQ推送消费源码分析-消息拉取提到的,消息的拉取任务是从从LinkedBlockingQueue对列中获取的PullRequest对象。

我们接着分析updateProcessQueueTableInRebalance()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;

Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
//排除不在重平衡后本次消费队列中的资源
if (mq.getTopic().equals(topic)) {
//当前topic所在的队列,不在重平衡后的分配的队列不在mqSet中
if (!mqSet.contains(mq)) {
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) {//在拉取的时候更新lastPullTimestamp的值,然后在rebalance的时候会去判断ProcessQueue已经超过一定的时间没有去拉取消息,如果是的话移除
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}
//构建PullRequest列表
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}

this.removeDirtyOffset(mq);
//新建处理队列对象
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
//很重要将分配好的消息队列放到PullMessageService.pullRequestQueue队列中(PullMessageService消息拉取的核心类)
this.dispatchPullRequest(pullRequestList);
return changed;
}

构建了PullRequest对象,放入消费者队列和处理队列,分析下this.dispatchPullRequest(pullRequestList)

1
2
3
4
5
6
7
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
for (PullRequest pullRequest : pullRequestList) {
//放入到LinkedBlockingQueue<PullRequest> pullRequestQueue
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
}
}

循环放入到LinkedBlockingQueue<PullRequest> pullRequestQueue 拉取请求的资源队列中,供拉取任务使用

本文作者: 顾 明 训
本文链接: https://www.itzones.cn/2020/05/31/RocketMQ推送消费源码分析-重平衡/
版权声明: 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!
  • rebalance
  • rocketMQ

扫一扫,分享到微信

微信分享二维码
RocketMQ推送消费源码分析(三)-消费处理
RocketMQ推送消费源码分析(一)-消息拉取
  1. 1. 1、背景
  2. 2. 2、客户端发送心跳到每一个Broker
    1. 2.1. 2.1、心跳的数据内容
    2. 2.2. 2.2、broker端心跳的处理
    3. 2.3. 2.3、Broker端通知组内的消费者开启重平衡
  3. 3. 3、客户端的重平衡
    1. 3.1. 3.1、重平衡分配策略
    2. 3.2. 3.2、客户端获取的消费队列集合放入拉取资源中
© 2020 IT小栈
载入天数...载入时分秒... || 本站总访问量次 || 本站访客数人次
Hexo Theme Yilia by Litten
  • 所有文章
  • 友链

tag:

  • jvm
  • Java基础
  • kafka HW
  • kafka Leader Epoch
  • kafka
  • kafka位移主题
  • kafka位移提交
  • kafka副本机制
  • kafka ISR
  • zookeeper
  • kafka消息丢失
  • kafka日志存储
  • kafka Log Clean
  • kafka Log Compaction
  • kafka消费位移设置
  • kafka Rebalance
  • kafka分区算法
  • kafka生产者拦截器
  • kafka SASL/SCRAM
  • kafka ACL
  • redis
  • redis Ziplist
  • redis Hashtable
  • redis LinkedList
  • redis QuickList
  • redis intset
  • redis String
  • redis SDS
  • redis SkipList
  • redisDb
  • redisServer
  • redis 简介
  • Redis Cluster
  • 主从同步
  • RocketMQ高可用HA
  • 事务消息
  • 内存映射
  • MMAP
  • 同步刷盘
  • 异步刷盘
  • 消息存储文件
  • RocketMQ安装
  • 延迟消息
  • RocketMQ入门
  • 推拉模式
  • PushConsumer
  • 消费结果处理
  • rebalance
  • RocketMQ权限控制
  • RocketMQ ACL
  • 消息过滤
  • 消息重试
  • 消费位置
  • 集群消费
  • 广播消费
  • 运维命令
  • shiro源码分析
  • shiro入门
  • IOC和DI
  • Spring创建Bean
  • Bean生命周期
  • Sping属性注入
  • 异常
  • SpringMVC
  • springCloud
  • Eureka

    缺失模块。
    1、请确保node版本大于6.2
    2、在博客根目录(注意不是yilia根目录)执行以下命令:
    npm i hexo-generator-json-content --save

    3、在根目录_config.yml里添加配置:

      jsonContent:
        meta: false
        pages: false
        posts:
          title: true
          date: true
          path: true
          text: false
          raw: false
          content: false
          slug: false
          updated: false
          comments: false
          link: false
          permalink: false
          excerpt: false
          categories: false
          tags: true
    

  • 我的OSCHINA
  • 我的CSDN
  • 我的GITHUB
  • 一生太水