2分钟看懂RocketMQ延迟消息核心原理

2023年 9月 27日 42.9k 0

前言

延迟消息在业务场景中使用的非常多,订单失效,过期通知等功能都可以借助延迟消息机制来实现。本文将从源码层面来分析Rocketmq的延迟消息实现原理机制。

一、延迟消息的使用            

先看下延迟消息的使用,发送消息逻辑和普通消息一样,只要在生产者端将Message对象中设置延迟消息的等级,Rocketmq的开源版本支持18个等级,每个等级代表一个延迟时间。

Rocketmq有18个延迟等级分别对应如下延迟时间最小1s,最大2h,客户端从等级从1开始,上面设置为2,代表延迟5s。

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

三、broker端原理解析

  • 配置解析与初始化定时任务

broker端分析,在Broker服务启动的时候,会初始化延迟消息的配置解析和对延迟消息处理的定时任务的开启。

brocker启动时,根据不同等级开启定时器,即分别对每个延迟等级都开启一个定时器,rocketmq是基于java.util.Timer的定时器。

源码中单独开启了一个定时任务来持久化对延迟等级和offset的关系。

为什么需要持久化延迟等级和offset的关系呢?

offset是每次写入消息都会实时更新,下次直接在新的offset写消息即可,持久化可以保证brocker重启后offset不丢失,实现高可用机制,并且高性能(避免重新计算)。 

  • Broker接收到生产者延迟消息处理

下面broker接收到延迟消息的处理逻辑。会将生产者提供的真实的topic和队列进行备份暂存,然后将消息存储到内置的一个延迟消息topic队列中。

image.png

  • 定时将时间到的延迟消息写入真实topic和队列

延迟消息处理定时任务类DeliverDelayedMessageTimerTask,每次都将当前延迟等级对应的消息,写入真实的topic和队列,并维护新的offset,然后重新开启定时任务,等下次再重新处理该延迟等级的消息。

如果时间到了,他的处理核心逻辑比较清晰

1、 首先是还原延迟消息真实topic和队列

image.png

2、将消息写入commitlog

//将消息写入真实的topic和队列
PutMessageResult putMessageResult =
    ScheduleMessageService.this.writeMessageStore
        .putMessage(msgInner);

当消息写入真实的topic和队列中后,消费者就可以正常的消费到消息了。

整理下来,总的核心流程如下:

image.png
        

总结

哈哈哈,上面的内容就是Rocketmq延迟消息原理了,马上过节了,祝大家中秋国庆节快乐,天天happy。

image.png

image.png

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论