RocketMQ并发消费两次重试为什么会隔10个小时

前天发完版本好不容易休息一下,又遇到一个问题,项目组反馈说RocketMQ的一个消费组一条消息,消费了两次,但两者之间的间隔超过了10个小时,现象如下图所示:

这是为什么呢?两者之间相差了差不多10个多小时,是不是这条消息重复消费了16次,但从日志中并没有打印出16次消息题,只打印了两条消息,从日志角度来看应该不是重复消费了16次。

本着不轻易相信日志的原则,我觉得应该去RocketMQ服务器上看看这条消息存储的重试次数,从而推断出消息是否消费失败而进行消费重试,正好项目组也提供了消息的Key,根据消息key、发送主题名称去查找消息,只能查询到一条消息:

说明发送端确实只发送了一条消息。

那个时候服务集群并没有发现什么异常,消费者没有重启、队列也没发生重平衡,不符合RocketMQ会重复推送消息给客户端的场景,那基本就可以断定是消息消费失败重启引起的,但RocketMQ消息消费重试重试延迟是采取间隔的,往往第一次重试只需10秒就会发生重试,不应该是10个小时?

为此,我特意根据Key分别查找了主题SCHEDULE_TOPIC_XXXX与%RETRY%重试主题,发现这个key也只有一条消息,说明这还仅仅是第一次重试,如下图:

但从这里可以看到消息写入到SCHEDULE_TOPIC_XXXX的时间为2022-05-27 22:26:20,然后过了10s就通过调度机制进入到重试主题,并开始被消费,故进行了第一次重试。

扩展机制知识:RocketMQ并发消费的消费者,在客户端消费失败后会向服务端发送ACK,根据重试次数进入到SCHEDULE_TOPIC_XXXX主题不同的队列中,每一个队列代表的延迟时间不一样,经过一定延迟时间后再次调度到消费组的重试主题,被消费者再次消费,实现时间间隔的重试,提高重试成功率。

那为什么第一次消费是12点51分,为什么这么久才进入到Broker的SCHEDULE_TOPIC_XXXXn呢?

这个就是问题的关键所在,这个细节我平时也没关注,故接下来得分析这段代码,代码具体定义在ConsumeMessageConcurrentlyService的processConsumeResult方法中。

 switch (this.defaultMQPushConsumer.getMessageModel()) {
            case BROADCASTING:
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
                }
                break;
            case CLUSTERING:
                List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    boolean result = this.sendMessageBack(msg, context);
                    if (!result) {
                        msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                        msgBackFailed.add(msg);
                    }
                }

                if (!msgBackFailed.isEmpty()) {
                    consumeRequest.getMsgs().removeAll(msgBackFailed);

                    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                }
                break;
            default:
                break;
        }

原来,如果客户端消费失败后向Broker发生ACK失败后,会加入到msgBack失败队列中,并重新提交到消费者这边消费,并且这条消息的位点不会提交,因为有关键代码:

consumeRequest.getMsgs().removeAll(msgBackFailed);

long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
   this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}

RocketMQ消费位点采取最小位点提交,只要消息存在于本地处理队列,位点就不会提交,从而会触发消息积压。但尴尬的是这个主题平时消息量很少,并没有通过积压来发现该问题。

根据现象与代码结合,原因是客户端一直消费失败,但向Broker提交ACK一直失败,直到晚上22:26分才发送成功,从而才触发重新消费。

那现在的关键是为什么会发生ACK会失败呢?这次比较遗憾,因为项目组使用的是容器,这块代码并没有采集到日志集群,导致无法查看错误日志,而且所在的集群是正常的,这个疑问后续等我分析完毕后,再和大家来分享,如果粉丝朋友们遇到过,后者分析过,还请留言。


见字如面,我是威哥,一个从普通二本院校毕业,从未曾接触分布式、微服务、高并发到通过技术分享实现职场蜕变,成长为RocketMQ社区优秀布道师、大厂资深架构师,出版《RocketMQ技术内幕》一书,在CSDN中记录了我的成长历程,欢迎大家关注,私信,一起交流进步。

分享笔者一个硬核的RocketMQ电子书: 在这里插入图片描述

获取方式:扫描如下二维码,关注【中间件兴趣圈】,回复RMQPDF即可获取。

在这里插入图片描述

版权信息:本文由中间件兴趣圈创作

禁止非授权转载,违着依法追究相关法律责任

如需转载,请联系 codingw@126.com