问题排查|为啥RocketMQ广播消费每次启动都会从头开始消费?

1、现象

最近听到有项目反馈,使用公司内部封装的消息sdk,在使用RocketMQ广播消费时,每一次版本发布,消费者重启后,都会从最早位点开始消费,造成严重的消息消费积压,不仅对消息服务的负载产生极大影响,也会让发版本后消息出来不及时。

那这是什么原因引起的呢?

2、问题排查

熟悉RocketMQ的小伙伴应该知道,消费组在重启时优先会去查询上一次的消费位点,只要能查询到有效的位点,则会从查询到的位点开始消费,如果查询不到有效位点,则按照消费组设置的ConsumeFromWhere策略去查询位点,其可选值:

  • CONSUME_FROM_FIRST_OFFSET 从最早位点开始消费

  • CONSUME_FROM_LAST_OFFSET

    从最新位点开始消费

  • CONSUME_FROM_TIMESTAMP 从指定时间开始消费

现在的现象是从最早消费,基本可以认为是消费者在启动时并没有查询到有效位点,但RocketMQ广播消费一样会存储消费进度,只是存储在消费端本地,那为什么会读取不到消费进度文件呢?

注意:如果在容器环境中使用RocketMQ的广播消费,进度文件不能存储在容器本身,应该需要引用外部存储文件,即容器创建后,可以统一访问该共享目录,否则肯定会出现本文出现的问题。

为了破解该问题,首先我们要知道RocketMQ广播消费消费进度文件的存储目录,由于本文遇到的环境是虚拟机环境,结合RocketMQ广播消费的核心实现类LocalFileOffsetStore中如下代码:

01
01

结合上述两段核心代码,我们可以得知RocketMQ广播消费模式,消息消费进度文件的存储根目录为 /{用户主目录}/.rocketmq_offsets,子目录为/mqClientId/{consumerGroupName}/offsets.json。其中mqClient为消费者clientId,consumerGroupName为消费最名称。

于是立马登录服务器,进入到.rocketmq_offsets文件夹,看到下面存在很多子文件夹,如下图所示:

02
02

不进引起了我的注意,按照上述的定义,一个消费者一个客户端拥有自己的专属文件夹,从这里看,显然是消费组在启动时就会创建一个新的文件夹,故加载不到之前存储的消费进度信息,综合来看,问题就出在mqClientId上,也就是这个mqClinetId并不唯一,每一次启动时会变化,最终查看内部代码,发现消费组在构建mqClientID时确实存在问题,因为会取当前时间戳,代码如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yFt6TyLw-1671359950905)(/Users/codingw/Library/Application Support/typora-user-images/image-20221218175405137.png)]

破案了。那原生使用RocketMQ的广播消费,是否会存在同样的问题呢?答案是不会,因为clientId的默认生成规则如下所示:

04
04

也就是默认的mqClientId是IP地址 + @ DEFAULT,这个无论启动多少次,也不会变化。


见字如面,我是威哥,一个从普通二本院校毕业,从未曾接触分布式、微服务、高并发到通过技术分享实现职场蜕变,成长为RocketMQ社区优秀布道师、大厂资深架构师,出版《RocketMQ技术内幕》、《RocketMQ实战》两本书的作者,在CSDN中记录了我的成长历程,欢迎大家关注,私信,一起交流进步。

分享笔者一个硬核的RocketMQ电子书: 在这里插入图片描述

获取方式:扫描如下二维码,关注【中间件兴趣圈】,回复RMQPDF即可获取。

在这里插入图片描述

阅读更多

版权信息:本文由中间件兴趣圈创作

禁止非授权转载,违着依法追究相关法律责任

如需转载,请联系 codingw@126.com