RocketMQ RMQ_SYS_TRANS_HALF_TOPIC 爆掉的问题
现象
SaaS项目东郭反应,项目中发的事务消息一直在RMQ_SYS_TRANS_HALF_TOPIC
中,并且不断增长。随即我们查看RocketMQ日志发现如下情况:
这个本来是RocketMQ正常的逻辑,发送事务消息后没有提交状态的话,当达到超时时间后,RocketMQ会回查本地事务状态。这里显示的是回查的次数超限,消息被移到了TRANS_CHECK_MAXTIME_TOPIC
中。
不正常的是REAL_TOPIC
变成了RMQ_SYS_TRANS_HALF_TOPIC
,正常应该是原始的业务消息TOPIC才对。于是我们带着这个问题开始排查起来。
追踪
一、排查
一开始我们以为是Producer的问题,因为得到的反馈是这个消息没有被“消费”,所以我们开始排查Producer所在的项目。发现并没有什么问题。后来我们观察到上述日志中有一个DELAY=3,结合在网上查询的资料,认为可能是触发了RocketMQ的一个问题,就是事务消息进行了延迟发送。我们以为快接近真相了,我们开始查找Producer是否在发送事务消息时设置了DELAY参数。很快我们就失望了,Producer没有任何地方设置了DELAY参数。
二、翻阅源码
我们回头去看上面那个日志,发现RECONSUME_TIME=1
并且RETRY_TOPIC
也不为空,这说明这个消息肯定是被消费者消费到了,但是由于某种原因消费失败了,触发了重试。于是我们开始看RocketMQ重试相关源码。我们首先找到了DELAY=3这个参数的来源:
1 | //在我们的消费者中设置了ConsumeConcurrentlyContext 延时级别 |
我们找到对应broker版本(4.6.0)的源码一步步找到了这里:
1 | //org.apache.rocketmq.store.CommitLog#putMessage |
对照之前日志里的参数,sysFlag=8,delay=3
,我们认为很可能走了这段逻辑,然后触发了事务消息进行了延迟发送的问题。继续看了延时消息发送的逻辑,没有找到问题,而且这也解释不了为什么REAL_TOPIC
变成了``RMQ_SYS_TRANS_HALF_TOPIC`
三、真相
上面还提到了一个RETRY_TOPIC
,这个在之前的排查过程中没有发现有什么地方设置,于是我们搜索了一把,发现在这里:
1 | //org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#sendMessageBack |
很明显是个异常流程,那么到底是什么导致了这个异常流程呢?既然知道是在消费的时候出了异常,于是我们找到对应的消费者日志,发现如下错误:
这里Broker返回的错误是MESSAGE_ILIEGAL
,在回过头去看重试相关代码,有了之前的经验这次很快就定位到了可能报这个错误的地方:
1 | //第一处:org.apache.rocketmq.store.DefaultMessageStore#putMessage |
果断找运维要了store.log
,发现如下错误:
1 | WARN SendMessageThread_1 - putMessage message topic length too long 149 |
所以应该就是第一处的问题了。至此问题的原因基本找到了,但还有以下问题:
为什么TOPIC会超长?
重试的消息TOPIC规则为%RETRY%+consumerGroup:
1
MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup())
而我们的consumerGroup的规则为:
1
2
3public String getGroup() {
return group + "_" + getTopic() + "_" + getTags();
}有问题的consumer的tags为:
SAAS_PURCHASE_PURCHASE_ORDER_UPDATE||SAAS_PURCHASE_PURCHASE_ORDER_OPEN_RECEVIED||SAAS_PURCHASE_PURCHASE_ORDER_COMPLETED_RECEVIED
所以最终拼出来的TOPIC超出了长度。
为什么走了Catch里面的流程就会导致HALF队列爆掉?
这是由于我们使用的RocketMQ-client版本为
4.5.0
,这个版本Catch里的代码有个bug,没有清除掉原始消息的事务消息标志TRANS_MSG=true
。所以这个消息发出去后在broker端又会走事务消息的流程,并且还是带延时的。这会导致真实的TOPIC丢掉。下面用一张图来说明一下:
事务消息消费失败,topic转为
%RETRY%xxxx
发送到broker,由于事务消息标志没有被清除,于是topic转成了RMQ_SYS_TRANS_HALF_TOPIC
。又由于delay参数没有被清除,topic最后被转为了schedule_topic_xxxx
。等到schedule执行时,消息会发到RMQ_SYS_TRANS_HALF_TOPIC
中。由于不是Producer发的事务消息,所以拿不到LocalTransactionState。只能等待事务消息回查。这时刚好又碰到我们的LocalTransactionListener的一个问题:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
Object msg = HessianUtils.decode(messageExt.getBody());
String topic = messageExt.getTopic();
String tag = messageExt.getTags();
//由于拿着rmq_sys_trans_half_topic来获取handler所以肯定获取不到。
LocalTransactionHandler localTransacationHandler = getLocalTransacationHandler(topic, tag);
String msgId = messageExt.getMsgId();
if( localTransacationHandler == null ){
logger.error("localTransacationHandler is empty should never happened! msgId={}, arg={}", msgId, JSON.toJSONString(msg));
//于是这个地方会返回COMMIT_MESSAGE
return LocalTransactionState.COMMIT_MESSAGE;
}
boolean checkResult = false;
try {
checkResult = localTransacationHandler.localTransactionCheck(msgId, msg);
} catch (Exception e) {
logger.error("localTransacationCheck failed! msgId={}, arg={}", msgId, JSON.toJSONString(msg), e);
checkResult = false;
}
return checkResult ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
}Broker收到
COMMIT_MESSAGE
后会将消息写往REAL_TOPIC中。而此时REAL_TOPIC早已变成了RMQ_SYS_TRANS_HALF_TOPIC
。就这样,RMQ_SYS_TRANS_HALF_TOPIC
爆掉了。
解决
首先我们将handler为空时返回
COMMIT_MESSAGE
,改为了ROLLBACK_MESSAGE
。升级stone中RocketMQ为4.6.1,可以看到在Catch代码中多了一行:
1
MessageAccessor.clearProperty(newMsg, "TRAN_MSG");
有问题的consumer将tag拆分
第一步和第二步只能解决
RMQ_SYS_TRANS_HALF_TOPIC
爆掉的问题。但是topic超长还是会有问题。所以目前暂时是将consumer的tag拆开。
RocketMQ RMQ_SYS_TRANS_HALF_TOPIC 爆掉的问题