RocketMQ RMQ_SYS_TRANS_HALF_TOPIC 爆掉的问题

现象

SaaS项目东郭反应,项目中发的事务消息一直在RMQ_SYS_TRANS_HALF_TOPIC中,并且不断增长。随即我们查看RocketMQ日志发现如下情况:

这个本来是RocketMQ正常的逻辑,发送事务消息后没有提交状态的话,当达到超时时间后,RocketMQ会回查本地事务状态。这里显示的是回查的次数超限,消息被移到了TRANS_CHECK_MAXTIME_TOPIC中。

不正常的是REAL_TOPIC变成了RMQ_SYS_TRANS_HALF_TOPIC,正常应该是原始的业务消息TOPIC才对。于是我们带着这个问题开始排查起来。

追踪

一、排查

一开始我们以为是Producer的问题,因为得到的反馈是这个消息没有被“消费”,所以我们开始排查Producer所在的项目。发现并没有什么问题。后来我们观察到上述日志中有一个DELAY=3,结合在网上查询的资料,认为可能是触发了RocketMQ的一个问题,就是事务消息进行了延迟发送。我们以为快接近真相了,我们开始查找Producer是否在发送事务消息时设置了DELAY参数。很快我们就失望了,Producer没有任何地方设置了DELAY参数。

二、翻阅源码

我们回头去看上面那个日志,发现RECONSUME_TIME=1并且RETRY_TOPIC也不为空,这说明这个消息肯定是被消费者消费到了,但是由于某种原因消费失败了,触发了重试。于是我们开始看RocketMQ重试相关源码。我们首先找到了DELAY=3这个参数的来源:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//在我们的消费者中设置了ConsumeConcurrentlyContext 延时级别
context.setDelayLevelWhenNextConsume(getDelayLevelWhenNextConsume(reconsumeTimes));
//可以看到这个delayLevel最终是会被发送到broker
//org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#sendMessageBack
public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
int delayLevel = context.getDelayLevelWhenNextConsume();

// Wrap topic with namespace before sending back message.
msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));
try {
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
return true;
} catch (Exception e) {
log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
}

return false;
}

我们找到对应broker版本(4.6.0)的源码一步步找到了这里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//org.apache.rocketmq.store.CommitLog#putMessage
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}

topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

msg.setTopic(topic);
msg.setQueueId(queueId);
}
}

对照之前日志里的参数,sysFlag=8,delay=3,我们认为很可能走了这段逻辑,然后触发了事务消息进行了延迟发送的问题。继续看了延时消息发送的逻辑,没有找到问题,而且这也解释不了为什么REAL_TOPIC变成了``RMQ_SYS_TRANS_HALF_TOPIC`

三、真相

上面还提到了一个RETRY_TOPIC,这个在之前的排查过程中没有发现有什么地方设置,于是我们搜索了一把,发现在这里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#sendMessageBack
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);

Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());

String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);

newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
//这里设置了RETRY_TOPIC
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));

newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());

this.mQClientFactory.getDefaultMQProducer().send(newMsg);
} finally {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
}
}

很明显是个异常流程,那么到底是什么导致了这个异常流程呢?既然知道是在消费的时候出了异常,于是我们找到对应的消费者日志,发现如下错误:

这里Broker返回的错误是MESSAGE_ILIEGAL,在回过头去看重试相关代码,有了之前的经验这次很快就定位到了可能报这个错误的地方:

1
2
3
4
5
6
7
8
9
10
11
12
13
//第一处:org.apache.rocketmq.store.DefaultMessageStore#putMessage 
if (msg.getTopic().length() > Byte.MAX_VALUE) {
log.warn("putMessage message topic length too long " + msg.getTopic().length());
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
//第二处:org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend(long, java.nio.ByteBuffer, int, org.apache.rocketmq.store.MessageExtBrokerInner)
if (propertiesLength > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long. length={}", propertiesData.length);
return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
}
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);

果断找运维要了store.log,发现如下错误:

1
WARN SendMessageThread_1 - putMessage message topic length too long 149

所以应该就是第一处的问题了。至此问题的原因基本找到了,但还有以下问题:

  1. 为什么TOPIC会超长?

    重试的消息TOPIC规则为%RETRY%+consumerGroup:

    1
    MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup())

    而我们的consumerGroup的规则为:

    1
    2
    3
    public String getGroup() {
    return group + "_" + getTopic() + "_" + getTags();
    }

    有问题的consumer的tags为:SAAS_PURCHASE_PURCHASE_ORDER_UPDATE||SAAS_PURCHASE_PURCHASE_ORDER_OPEN_RECEVIED||SAAS_PURCHASE_PURCHASE_ORDER_COMPLETED_RECEVIED

    所以最终拼出来的TOPIC超出了长度。

  2. 为什么走了Catch里面的流程就会导致HALF队列爆掉?

    这是由于我们使用的RocketMQ-client版本为4.5.0,这个版本Catch里的代码有个bug,没有清除掉原始消息的事务消息标志TRANS_MSG=true。所以这个消息发出去后在broker端又会走事务消息的流程,并且还是带延时的。这会导致真实的TOPIC丢掉。

    下面用一张图来说明一下:

    事务消息消费失败,topic转为%RETRY%xxxx发送到broker,由于事务消息标志没有被清除,于是topic转成了RMQ_SYS_TRANS_HALF_TOPIC。又由于delay参数没有被清除,topic最后被转为了schedule_topic_xxxx。等到schedule执行时,消息会发到RMQ_SYS_TRANS_HALF_TOPIC中。由于不是Producer发的事务消息,所以拿不到LocalTransactionState。只能等待事务消息回查。

    这时刚好又碰到我们的LocalTransactionListener的一个问题:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
    Object msg = HessianUtils.decode(messageExt.getBody());
    String topic = messageExt.getTopic();
    String tag = messageExt.getTags();
    //由于拿着rmq_sys_trans_half_topic来获取handler所以肯定获取不到。
    LocalTransactionHandler localTransacationHandler = getLocalTransacationHandler(topic, tag);

    String msgId = messageExt.getMsgId();
    if( localTransacationHandler == null ){
    logger.error("localTransacationHandler is empty should never happened! msgId={}, arg={}", msgId, JSON.toJSONString(msg));
    //于是这个地方会返回COMMIT_MESSAGE
    return LocalTransactionState.COMMIT_MESSAGE;
    }
    boolean checkResult = false;
    try {
    checkResult = localTransacationHandler.localTransactionCheck(msgId, msg);
    } catch (Exception e) {
    logger.error("localTransacationCheck failed! msgId={}, arg={}", msgId, JSON.toJSONString(msg), e);
    checkResult = false;
    }

    return checkResult ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
    }

    Broker收到COMMIT_MESSAGE后会将消息写往REAL_TOPIC中。而此时REAL_TOPIC早已变成了RMQ_SYS_TRANS_HALF_TOPIC。就这样,RMQ_SYS_TRANS_HALF_TOPIC 爆掉了。

解决

  1. 首先我们将handler为空时返回COMMIT_MESSAGE,改为了ROLLBACK_MESSAGE

  2. 升级stone中RocketMQ为4.6.1,可以看到在Catch代码中多了一行:

    1
    MessageAccessor.clearProperty(newMsg, "TRAN_MSG");
  3. 有问题的consumer将tag拆分

    第一步和第二步只能解决RMQ_SYS_TRANS_HALF_TOPIC爆掉的问题。但是topic超长还是会有问题。所以目前暂时是将consumer的tag拆开。

RocketMQ RMQ_SYS_TRANS_HALF_TOPIC 爆掉的问题

https://jingzhouzhao.github.io/archives/cfa05355.html

作者

太阳当空赵先生

发布于

2020-06-05

更新于

2022-05-23

许可协议

评论