前言
延迟消息在业务场景中使用的非常多,订单失效,过期通知等功能都可以借助延迟消息机制来实现。本文将从源码层面来分析Rocketmq的延迟消息实现原理机制。
一、延迟消息的使用
先看下延迟消息的使用,发送消息逻辑和普通消息一样,只要在生产者端将Message对象中设置延迟消息的等级
,Rocketmq的开源版本支持18个等级,每个等级代表一个延迟时间。
Rocketmq有18个延迟等级分别对应如下延迟时间
,最小1s,最大2h
,客户端从等级从1开始,上面设置为2,代表延迟5s。
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
三、broker端原理解析
- 配置解析与初始化定时任务
broker端分析,在Broker服务启动的时候,会初始化延迟消息的配置解析和对延迟消息处理的定时任务的开启。
brocker启动时,根据不同等级开启定时器,即分别对每个延迟等级都开启一个定时器,rocketmq是基于java.util.Timer的定时器。
源码中单独开启了一个定时任务来持久化对延迟等级和offset的关系。
为什么需要持久化延迟等级和offset的关系呢?
offset是每次写入消息都会实时更新,下次直接在新的offset写消息即可,持久化可以保证brocker重启后offset不丢失,实现高可用机制,并且高性能(避免重新计算)。
- Broker接收到生产者延迟消息处理
下面broker接收到延迟消息的处理逻辑。会将生产者提供的真实的topic和队列进行备份暂存
,然后将消息存储到内置的一个延迟消息topic队列中。
- 定时将时间到的延迟消息写入真实topic和队列
延迟消息处理定时任务类DeliverDelayedMessageTimerTask
,每次都将当前延迟等级对应的消息,写入真实的topic和队列,并维护新的offset,然后重新开启定时任务,等下次再重新处理该延迟等级的消息。
如果时间到了,他的处理核心逻辑比较清晰
1、 首先是还原延迟消息真实topic和队列
2、将消息写入commitlog
//将消息写入真实的topic和队列
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
当消息写入真实的topic和队列中后,消费者就可以正常的消费到消息了。
整理下来,总的核心流程如下:
总结
哈哈哈,上面的内容就是Rocketmq延迟消息原理了,马上过节了,祝大家中秋国庆节快乐,天天happy。