同事问了我一个关于RocketMQ的问题

同事突然问我:RocketMQ的一个消息,多次消费重试,消息的msgId会不会变?哪怕已经进了DLQ。

刚开始出于经验,我说不会变。因为我之前每次排查问题的时候,用同一个msgId都能找到多次重试消费的日志。后来为了更加确定,我卷了一下源码,我看的是4.6.1,一是因为公司用的这个版本,二是我上次卷的就是这个版本。。。

消息重试

既然跟重试有关,那就从客户端消费失败的逻辑开始,看看能不能找到蛛丝马迹,下面是消费失败将消息发回broker的代码:

1
2
3
4
5
6
7
8
9
10
11
12
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
//消费失败ackIndex=-1
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}

从上面可以看到,消费失败会调用sendMessageBack:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#sendMessageBack
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#sendMessageBack
org.apache.rocketmq.client.impl.MQClientAPIImpl#consumerSendMessageBack
ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);

requestHeader.setGroup(consumerGroup);
requestHeader.setOriginTopic(msg.getTopic());
requestHeader.setOffset(msg.getCommitLogOffset());
//这个是延迟级别对应broker的那16个task,是可以通过ConsumeConcurrentlyContext 设置的,默认是按Broker的策略
requestHeader.setDelayLevel(delayLevel);
requestHeader.setOriginMsgId(msg.getMsgId());
requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);

RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);

可以看到给Broker发了一个RequestCode.CONSUMER_SEND_MSG_BACK,然后附带了一些offset,originMsgId等信息,开始我以为跟这个originMsgId有关,因为他这里把原始的msgId发回去了呀,所以msgId就不会变,然而事实不是这样的。继续看到broker处理部分:

1
2
3
4
5
6
7
8
9
10
11
// org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncConsumerSendMsgBack
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());

MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(newTopic);

String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);

由于代码太长,省略了一些。可以看到上面说的request中的originMsgId并没有用,而是通过offset直接定位到消息直接拿的msgId(如果不是第一次那么是从Properties中取的PROPERTY_ORIGIN_MESSAGE_ID)。但是这个跟msgId也没有关系呀,这个时候我猜测,是不是在提交到commitlog的时候将msgId重置为了originMsgId,继续往下看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//org.apache.rocketmq.store.CommitLog#putMessage
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}

topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

msg.setTopic(topic);
msg.setQueueId(queueId);

....
result = mappedFile.appendMessage(msg, this.appendMessageCallback);

由于重试需要延迟处理,所以消息又被投递到了SCHEDULE_TOPIC(SCHEDULE_TOPIC_XXXX),继续看写入commitlog部分:

1
2
3
4
5
6
7
//org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend(long, java.nio.ByteBuffer, int, org.apache.rocketmq.store.MessageExtBrokerInner)
String msgId;
if ((sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
} else {
msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
}

然而又一次错了,msgId并没有被重置为originMsgId,而是通过host和写入的offset算出来的。也就是说每一次重试,重新写入commitlog,msgId都会变。。。这下我颓了,难道我跟同事说错了?可是我以前排查问题的时候,明明每次都是一样的啊?

再次消费

不行,还得继续卷,这个时候我在想,难不成是在重新消费的时候做了啥处理?来看看消费者的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
//org.apache.rocketmq.client.consumer.PullCallback#onSuccess
//org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#processPullResult
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
final SubscriptionData subscriptionData) {
PullResultExt pullResultExt = (PullResultExt) pullResult;

this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
if (PullStatus.FOUND == pullResult.getPullStatus()) {
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);

List<MessageExt> msgListFilterAgain = msgList;
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
for (MessageExt msg : msgList) {
if (msg.getTags() != null) {
if (subscriptionData.getTagsSet().contains(msg.getTags())) {
msgListFilterAgain.add(msg);
}
}
}
}

if (this.hasHook()) {
FilterMessageContext filterMessageContext = new FilterMessageContext();
filterMessageContext.setUnitMode(unitMode);
filterMessageContext.setMsgList(msgListFilterAgain);
this.executeHook(filterMessageContext);
}

for (MessageExt msg : msgListFilterAgain) {
String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(traFlag)) {
msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
}
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
Long.toString(pullResult.getMinOffset()));
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
Long.toString(pullResult.getMaxOffset()));
msg.setBrokerName(mq.getBrokerName());
}

pullResultExt.setMsgFoundList(msgListFilterAgain);
}

pullResultExt.setMessageBinary(null);

return pullResult;
}

开始看到msg.setTransactionId这个,还开心了一下,想着找找setMsgId,结果还是没有。
难不成在消费服务中处理的?开始继续看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run

@Override
public void run() {
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
}

MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;

defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());

看到这个defaultMQPushConsumerImpl.resetRetryAndNamespace,我激动坏了,打开一看:

1
2
3
4
5
6
7
8
9
10
11
final String groupTopic = MixAll.getRetryTopic(consumerGroup);
for (MessageExt msg : msgs) {
String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (retryTopic != null && groupTopic.equals(msg.getTopic())) {
msg.setTopic(retryTopic);
}

if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
}
}

这啥呀这是,也不对。后来直到我进入了MessageExt, idea显示了这么一个图标:

我意识到事情不对劲,我可能做了很多无用功,忘记了 Java面向对象的三大特性之一的继承:

答案就藏在MessageClientExt中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class MessageClientExt extends MessageExt {

public String getOffsetMsgId() {
return super.getMsgId();
}

public void setOffsetMsgId(String offsetMsgId) {
super.setMsgId(offsetMsgId);
}

@Override
public String getMsgId() {
String uniqID = MessageClientIDSetter.getUniqID(this);
if (uniqID == null) {
return this.getOffsetMsgId();
} else {
return uniqID;
}
}

public void setMsgId(String msgId) {
//DO NOTHING
//MessageClientIDSetter.setUniqID(this);
}
}

WTF,我一直盯着setMsgId,以为是哪个地方把msgId重置了,没想到,他娘的是getMsgId方法被重写了。
从这里可以得知两个东西:
OffsetMsgId:对应commitLog offset的,每次都会变的物理msgId
uniqID:对应Property中的UNIQ_KEY,看来不变的是这个。

那UNIQ_KEY到底是啥:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
static {
byte[] ip;
try {
ip = UtilAll.getIP();
} catch (Exception e) {
ip = createFakeIP();
}
LEN = ip.length + 2 + 4 + 4 + 2;
ByteBuffer tempBuffer = ByteBuffer.allocate(ip.length + 2 + 4);
tempBuffer.put(ip);
tempBuffer.putShort((short) UtilAll.getPid());
tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());
FIX_STRING = UtilAll.bytes2string(tempBuffer.array());
setStartTime(System.currentTimeMillis());
COUNTER = new AtomicInteger(0);
}
public static String createUniqID() {
StringBuilder sb = new StringBuilder(LEN * 2);
sb.append(FIX_STRING);
sb.append(UtilAll.bytes2string(createUniqIDBuffer()));
return sb.toString();
}

private static byte[] createUniqIDBuffer() {
ByteBuffer buffer = ByteBuffer.allocate(4 + 2);
long current = System.currentTimeMillis();
if (current >= nextStartTime) {
setStartTime(current);
}
buffer.putInt((int) (System.currentTimeMillis() - startTime));
buffer.putShort((short) COUNTER.getAndIncrement());
return buffer.array();
}

UNIQ_KEY是在第一次发送到Broker时,通过ip、pid、hashcode、时间差、自增序号组成的。

总结

消息本身的id是与消息offset相关的,每个消息都是不同的,而我们常在client使用的msgId是在client生产消息时赋予的具有逻辑唯一性的id(不论投递多少次)。

最后

陷入困境的时候,不妨去窝个liao。😊

同事问了我一个关于RocketMQ的问题

https://jingzhouzhao.github.io/archives/92571797.html

作者

太阳当空赵先生

发布于

2022-01-14

更新于

2022-05-23

许可协议

评论