同事突然问我:RocketMQ的一个消息,多次消费重试,消息的msgId会不会变?哪怕已经进了DLQ。
刚开始出于经验,我说不会变。因为我之前每次排查问题的时候,用同一个msgId都能找到多次重试消费的日志。后来为了更加确定,我卷了一下源码,我看的是4.6.1,一是因为公司用的这个版本,二是我上次卷的就是这个版本。。。
消息重试 既然跟重试有关,那就从客户端消费失败的逻辑开始,看看能不能找到蛛丝马迹,下面是消费失败将消息发回broker的代码:
1 2 3 4 5 6 7 8 9 10 11 12 org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService case CLUSTERING: List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1 ; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this .sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1 ); msgBackFailed.add(msg); } }
从上面可以看到,消费失败会调用sendMessageBack
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#sendMessageBack org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#sendMessageBack org.apache.rocketmq.client.impl.MQClientAPIImpl#consumerSendMessageBack ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader(); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader); requestHeader.setGroup(consumerGroup); requestHeader.setOriginTopic(msg.getTopic()); requestHeader.setOffset(msg.getCommitLogOffset()); requestHeader.setDelayLevel(delayLevel); requestHeader.setOriginMsgId(msg.getMsgId()); requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes); RemotingCommand response = this .remotingClient.invokeSync(MixAll.brokerVIPChannel(this .clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
可以看到给Broker发了一个RequestCode.CONSUMER_SEND_MSG_BACK,然后附带了一些offset,originMsgId等信息,开始我以为跟这个originMsgId有关,因为他这里把原始的msgId发回去了呀,所以msgId就不会变,然而事实不是这样的。继续看到broker处理部分:
1 2 3 4 5 6 7 8 9 10 11 String newTopic = MixAll.getRetryTopic(requestHeader.getGroup()); MessageExt msgExt = this .brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset()); MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(newTopic); String originMsgId = MessageAccessor.getOriginMessageId(msgExt); MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId); CompletableFuture<PutMessageResult> putMessageResult = this .brokerController.getMessageStore().asyncPutMessage(msgInner);
由于代码太长,省略了一些。可以看到上面说的request中的originMsgId并没有用,而是通过offset直接定位到消息直接拿的msgId(如果不是第一次那么是从Properties中取的PROPERTY_ORIGIN_MESSAGE_ID)。但是这个跟msgId也没有关系呀,这个时候我猜测,是不是在提交到commitlog的时候将msgId重置为了originMsgId,继续往下看:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 if (msg.getDelayTimeLevel() > 0 ) { if (msg.getDelayTimeLevel() > this .defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this .defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } topic = ScheduleMessageService.SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); .... result = mappedFile.appendMessage(msg, this .appendMessageCallback);
由于重试需要延迟处理,所以消息又被投递到了SCHEDULE_TOPIC(SCHEDULE_TOPIC_XXXX),继续看写入commitlog部分:
1 2 3 4 5 6 7 String msgId; if ((sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ) { msgId = MessageDecoder.createMessageId(this .msgIdMemory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset); } else { msgId = MessageDecoder.createMessageId(this .msgIdV6Memory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset); }
然而又一次错了,msgId并没有被重置为originMsgId,而是通过host和写入的offset算出来的。也就是说每一次重试,重新写入commitlog,msgId都会变。。。这下我颓了,难道我跟同事说错了?可是我以前排查问题的时候,明明每次都是一样的啊?
再次消费 不行,还得继续卷,这个时候我在想,难不成是在重新消费的时候做了啥处理?来看看消费者的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public PullResult processPullResult (final MessageQueue mq, final PullResult pullResult, final SubscriptionData subscriptionData) { PullResultExt pullResultExt = (PullResultExt) pullResult; this .updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId()); if (PullStatus.FOUND == pullResult.getPullStatus()) { ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary()); List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer); List<MessageExt> msgListFilterAgain = msgList; if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) { msgListFilterAgain = new ArrayList<MessageExt>(msgList.size()); for (MessageExt msg : msgList) { if (msg.getTags() != null ) { if (subscriptionData.getTagsSet().contains(msg.getTags())) { msgListFilterAgain.add(msg); } } } } if (this .hasHook()) { FilterMessageContext filterMessageContext = new FilterMessageContext(); filterMessageContext.setUnitMode(unitMode); filterMessageContext.setMsgList(msgListFilterAgain); this .executeHook(filterMessageContext); } for (MessageExt msg : msgListFilterAgain) { String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (Boolean.parseBoolean(traFlag)) { msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); } MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET, Long.toString(pullResult.getMinOffset())); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET, Long.toString(pullResult.getMaxOffset())); msg.setBrokerName(mq.getBrokerName()); } pullResultExt.setMsgFoundList(msgListFilterAgain); } pullResultExt.setMessageBinary(null ); return pullResult; }
开始看到msg.setTransactionId
这个,还开心了一下,想着找找setMsgId
,结果还是没有。 难不成在消费服务中处理的?开始继续看:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run @Override public void run () { if (this .processQueue.isDropped()) { log.info("the message queue not be able to consume, because it's dropped. group={} {}" , ConsumeMessageConcurrentlyService.this .consumerGroup, this .messageQueue); return ; } MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this .messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null ; defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
看到这个defaultMQPushConsumerImpl.resetRetryAndNamespace
,我激动坏了,打开一看:
1 2 3 4 5 6 7 8 9 10 11 final String groupTopic = MixAll.getRetryTopic(consumerGroup); for (MessageExt msg : msgs) { String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); if (retryTopic != null && groupTopic.equals(msg.getTopic())) { msg.setTopic(retryTopic); } if (StringUtils.isNotEmpty(this .defaultMQPushConsumer.getNamespace())) { msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this .defaultMQPushConsumer.getNamespace())); } }
这啥呀这是,也不对。后来直到我进入了MessageExt
, idea显示了这么一个图标: 我意识到事情不对劲,我可能做了很多无用功,忘记了 Java面向对象的三大特性之一的继承
: 答案就藏在MessageClientExt
中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class MessageClientExt extends MessageExt { public String getOffsetMsgId () { return super .getMsgId(); } public void setOffsetMsgId (String offsetMsgId) { super .setMsgId(offsetMsgId); } @Override public String getMsgId () { String uniqID = MessageClientIDSetter.getUniqID(this ); if (uniqID == null ) { return this .getOffsetMsgId(); } else { return uniqID; } } public void setMsgId (String msgId) { } }
WTF,我一直盯着setMsgId,以为是哪个地方把msgId重置了,没想到,他娘的是getMsgId方法被重写了。 从这里可以得知两个东西: OffsetMsgId
:对应commitLog offset的,每次都会变的物理msgId uniqID
:对应Property中的UNIQ_KEY,看来不变的是这个。
那UNIQ_KEY到底是啥:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 static { byte [] ip; try { ip = UtilAll.getIP(); } catch (Exception e) { ip = createFakeIP(); } LEN = ip.length + 2 + 4 + 4 + 2 ; ByteBuffer tempBuffer = ByteBuffer.allocate(ip.length + 2 + 4 ); tempBuffer.put(ip); tempBuffer.putShort((short ) UtilAll.getPid()); tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode()); FIX_STRING = UtilAll.bytes2string(tempBuffer.array()); setStartTime(System.currentTimeMillis()); COUNTER = new AtomicInteger(0 ); } public static String createUniqID () { StringBuilder sb = new StringBuilder(LEN * 2 ); sb.append(FIX_STRING); sb.append(UtilAll.bytes2string(createUniqIDBuffer())); return sb.toString(); } private static byte [] createUniqIDBuffer() { ByteBuffer buffer = ByteBuffer.allocate(4 + 2 ); long current = System.currentTimeMillis(); if (current >= nextStartTime) { setStartTime(current); } buffer.putInt((int ) (System.currentTimeMillis() - startTime)); buffer.putShort((short ) COUNTER.getAndIncrement()); return buffer.array(); }
UNIQ_KEY是在第一次发送到Broker时,通过ip、pid、hashcode、时间差、自增序号组成的。
总结 消息本身的id是与消息offset相关的,每个消息都是不同的,而我们常在client使用的msgId是在client生产消息时赋予的具有逻辑唯一性的id(不论投递多少次)。
最后 陷入困境的时候,不妨去窝个liao。😊