前言
工作中很多种场景下会用到消息队列,消息队列简单来说就是 消息的传输过程中保存消息的容器。消息队列主要解决了应用耦合、异步处理、流量削峰等问题。今天我们来了解一下阿里开源的一款产品 RocketMQ。
RocketMQ简介
RocketMQ 是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。具备异步通信的优势,系统拓扑简单、上下游耦合较弱,主要应用于异步解耦,流量削峰填谷等场景。
NameServer
NameServer 是整个 RocketMQ 的“大脑”,是 RocketMQ 的 路由中心。NameServer 的主要作用是为消息生产者和消息消费者提供有关 Topic 的路由信息,所以 NameServer 就需要存储路由信息,并且能够管理 Broker 节点,包括路由注册、路由删除等功能。
路由中心高可用
“大脑”一旦故障,那可不是闹着玩的,那么必然要有对策来解决。NameServer 的 高可用可以通过部署多台 NameServer 服务器来实现,但彼此之间互不通信。虽然 NameServer 服务器之间在某一时刻的数据并不会完全相同,但对消息发送不会在成重大影响,无非就是短暂造成消息发送不均衡(是不是有很熟悉的味道呢?没错CAP理论,这不就是AP嘛)。RocketMQ 在 NameServer 这个模块的设计上选择了 AP。
元数据存储
既然是路由中心,那么路由信息是如何存储的呢?我们来看一下RouteInfoManager
这个类。
public class RouteInfoManager {
// Topic消息队列的路由信息
private final HashMap topicQueueTable;
// Broker的基础信息
private final HashMap brokerAddrTable;
// Broker的集群信息
private final HashMap clusterAddrTable;
// Broker的状态信息,NameServer每次收到心跳包时会替换该信息
private final HashMap brokerLiveTable;
// Broker对应的FilterServer列表,用于类模式消息过滤。类模式过滤机制在4.4及以后版本被废弃
private final HashMap filterServerTable;
}
NameServer 存储的信息,就在RouteInfoManager
这个类里。
路由注册
RockerMQ 路由注册是通过 Broker 与 NameServer 的心跳功能实现的。Broker 启动时向集群中所有的 NameServer 发送心跳语句,每隔 30s 向集群中所有的 NameServer 发送心跳包,NameServer收到心跳包会先更新 RouteInfoManager
类中 brokerLiveTable
中 BrokerLiveInfo
的 lastUpdateTimestamp
,然后每隔 10s 扫描一次 brokerLiveTable
,如果连续 120s 没有收到心跳包,NameServer 将移除该 Broker 的路由信息,同时关闭 Socket 连接。
路由删除
上边提到了 NameServer 如果连续 120s 没有收到 Broker 的心跳包,将移除该 Broker 的路由信息。还有一点就是 Broker 在 正常关闭的情况下,会执行 unregisterBroker 命令。
路由发现
RockerMQ 路由发现是非实时的,当 Topic 路由出现变化后,NameServer 不会主动推送给客户端,而是由客户端定时拉取主题最新的路由。
NameServer架构设计
Broker
上文多次提到了 Broker,Broker 是 RocketMQ 的一个核心组件,大部分重量级工作都是通过 Broker 来完成的。Borker 处理各种请求和存储消息,决定整个 RocketMQ 体系的吞吐性能、可靠性和可用性。
CommitLog文件
RocketMQ 在消息写入的过程中追求极致的磁盘顺序写,所有主题的消息全部写入一个文件,这个文件就是 CommitLog 文件。所有消息按照抵达顺序依次写入 CommitLog 文件,消息一旦写入不支持修改。
写入的每条都会引入一个身份标志,就是 消息物理偏移量(消息存储在文件的起始位置)。CommitLog 文件的命名方式极具技巧性,使用存储在该文件的第一条消息在整个 CommitLog 文件组中的偏移量来命名。这样做的好处是给出任意一个消息的物理偏移量,可以通过二分法进行查找,快速定位到这个文件的位置,然后利用消息物理偏移量减去所在文件的名称,得到的差值就是在该文件中的绝对地址。
ConsumeQueue文件
所有主题的消息都写入了 CommitLog 文件,根据主题从 CommitLog 文件中检索消息这并不是一个好主意,为了解决基于 Topic 的消息检索问题,RocketMQ 引入了 ConsumeQueue 文件。简单地说,ConsumeQueue 文件就是 CommitLog 文件基于 Topic 的索引文件。
ConsumeQueue 每个条目长度固定(8字节 CommitLog 物理偏移量、4字节消息长度、8字节 Tag 哈希码),固定长度的好处是可以使用访问类似数组下标的方式快速定位条目,极大的提高了 ConsumeQueue 文件的读取性能。
Index文件
ConsumeQueue 文件解决了基于 Topic 查找消息的问题,如果想基于消息的某一个属性进行查找,那就需要 Index 文件登场了。
Index 文件基于物理磁盘文件实现哈希索引。Index 文件由 40 字节的文件头、500万个哈希槽、2000万个 Index 条目组成,每隔哈希槽 4 个字节,每个 Index 条目含有 20 个字节(4字节索引Key的哈希码、8字节物理偏移量、4字节时间戳、4字节的前一个 Index 条目)。
内存映射
虽然顺序写大大提高了 I/O 效率,但是基于文件的存储采用常规的 Java 文件操作 API,性能提升将会很有限,所以 RockerMQ 引入了 内存映射。将磁盘文件映射到内存中,以操作内存的方式操作磁盘(在Linux服务器中使用的就是操作系统的页缓存),性能又得到了提升。
刷盘策略
引入了内存映射和页缓存机制,使 RocketMQ 的写入性能得到了极大的保证,但是又引出了一个问题,Broker 收到客户端发送的消息后,是存储到页缓存中就返回成功,还是要持久化到磁盘才算成功呢?RocketMQ 提供了同步刷盘和异步刷盘。
- 同步刷盘:同步刷盘即持久化成功后才向客户端返回成功。以牺牲写入性能为代价。
- 异步刷盘:异步刷盘是指 Broker 将消息存储到页缓存后就立即返回成功,然后开启一个异步线程定时将内存中的数据写入磁盘,默认间隔时间 500ms。
消息写入页缓存,消息消费时从页缓存中读取,高并发时压力还是比较大,为了降低页缓存的使用压力,RocketMQ 引入了 transientStorePoolEnable 机制,即内存级别的读写分离机制。
内存级别的读写分离机制:RocketMQ 通过 transientStorePoolEnable 机制,将消息先写入堆外内存并立即返回,然后异步将堆外内存的数据提交到页缓存,再异步刷盘持久化。消息消费时还是从页缓存中读取,就形成了内存级别的读写分离。该机制的缺点是如果 Broker 异常退出堆外内存的数据会丢失。
Broker高可用
为了提高消息消费的高可用,避免 Broker 发生单点故障,使得存储在 Broker 上的消息无法及时消费,RocketMQ 引入了 Broker 的主从同步机制。即消息到达主服务器后,需要将消息同步到消息从服务器,如果主服务器 Broker 宕机,消息消费者可以从从服务器拉取消息。
主题(Topic)
RocketMQ 中消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息。主题通过 TopicName 来做唯一标识和区分。
标签(Tag)
标签是 RocketMQ 提供的细粒度消息分类属性,可以在主题层级之下做消息类型的细分。消费者可以通过订阅特定的标签来实现细粒度过滤。
生产者(Producer)
生产者是 RocketMQ 系统中用来构建并传输消息到服务端的运行实体。生产者通常被集成在业务系统中,将业务消息按照要求封装成消息并发送至服务端。
消息发送
RocketMQ 支持同步、异步和单向三种消息发送方式。
- 同步(sync):发送者向 RocketMQ 执行发消息 API 时,同步等待,直到消息服务器返回发送结果。
- 异步(async):发送者向 RocketMQ 执行发消息 API 时,指定消息发送成功后的回调函数,调用消息发送 API 后立即返回,消息发送者线程不阻塞,直到运行结束,消息发送成功或失败的回调任务在一个新的线程中执行。
- 单向(one way):发送者向 RocketMQ 执行发消息 API 时,直接返回,不等待消息服务器的结果也不注册回调函数。
消息发送高可用
为了实现消息发送的高可用,RocketMQ 有两个非常重要的特性。
- 消息发送重试机制:在消息发送时如果出现失败,默认会重试两次。
- 故障规避机制:当消息第一次发送失败时,如果下一次消息还是发送到刚刚失败的 Broker 上,大概率还是会失败,为了保证重试的可靠性,在重试时会尽量避开刚刚接收失败的 Broker,而是选择其他 Broker 上的队列进行发送。
RocketMQ 默认使用轮询算法进行路由的负载均衡。在消息发送时支持自定义的负载均衡算法,需要特别注意的是,使用自定义的路由负载算法后 RocketMQ 的重试机制将失效。
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
这是MQProducer
中一个自定义队列选择方法,参数MessageQueueSelector
消息队列选择器可选择自定义的消息发送到的队列。
消息发送流程
消费者(Consumer)
消费者是 RocketMQ 中用来接收并处理消息的运行实体。消费者通常被集成在业务系统中,从服务端获取消息,并将消息转化成业务可理解的信息,供业务逻辑处理。
消费者分组(ConsumerGroup)
消息消费以组的模式展开,消费者分组是 RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组。在 RocketMQ 中,通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾。
一个消费者可以包含多个消费者,每个消费者组可以订阅多个主题。
消费模式
RocketMQ 消费者组之间有集群模式和广播模式两种消费模式。
- 集群模式:当前主题下的同一条消息只允许被其中一个消费者消费。
- 广播模式:当前主题下的同一条消息将被集群内的所有消费者消费一次。
集群模式下,多个消费者的话则需要对消息队列进行负载,负载机制遵循一个通用的思想:一个消息队列同一时间只允许被一个消费者消费,一个消费者可以消费多个消息队列。
消息传送
RocketMQ 消息服务器和消费者之间的消息传递有两种方式:推模式和拉模式。
- 推模式:推模式是消息到达消息服务器后,再推送给消息消费者。
- 拉模式:是消费端主动发起拉取消息的请求。
RocketMQ 消息推模式基于拉模式,在拉模式上包装一层,一个拉取任务完成后开始下一个拉取任务。就是说RocketMQ 并没有真正实现推模式,而是消费者主动向消息服务器拉取消息。
如果消息消费者向消息服务器发送拉取请求,消息并未到达消息队列,且未启用长轮询机制的话,则会在服务端等待一段时间后(挂起),再去判断消息是否已到达消息队列。如果消息未到达,则提示消息拉取客户端 PULL_NOT_FOUND
(消息不存在),如果开启长轮询模式,RocketMQ 一方面会每隔 5s 轮询检查一次消息是否可达,同时一有新消息到达后,立即通知挂起线程再次验证新消息是否是自己感兴趣的,如果是则从 CommitLog 文件提取消息返回给消息拉取客户端,否则挂起超时,超时时间由消息拉取方在消息拉取时封装在请求与参数中,推模式默认 15s。
消费方式
RocketMQ 支持并发消费与顺序消费两种消费方式。
- 并发消费:多个消费者同时消费同一批消息以提高处理速度,不考虑消息的先后顺序。
- 顺序消费:指同一时刻,一个队列只有一个消费者线程在消费。会在 Broker 端锁队列。
RocketMQ 支持局部顺序消费,队列存在天然的顺序,也就是保证同一个消息队列上的消息按顺序消费。上边讲生产者发送消息的时候,可以自定义队列选择,消费者消费时,就可以通过队列的特性来顺序消费了。如果要实现某一个主题的全局顺序消费,可以将该主题的队列数设置为 1,注意这样将牺牲高可用性。
并发消费和顺序消费的实现逻辑在源码 接口ConsumeMessageService
的两个实现类ConsumeMessageConcurrentlyService
和ConsumeMessageOrderlyService
里,感兴趣的朋友可以看看。
消息重试
RocketMQ 消息重试是以消费者组为单位的,消息重试主题名为 %Retry% + 消费者组名。消费者在启动时会自动订阅该主题,参与该主题的消息队列负载。要注意的是广播模式没有内置的消息重试机制。
RocketMQ 的默认重试次数为16次,且默认重试间隔时间为(10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h),超过 16 次都是 2h。
定时消息
定时消息 是 RocketMQ 提供的一种高级消息类型,消息被发送至服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。RocketMQ 并不支持任意时间精度的定时调度(支持的话将不可避免地带来巨大的性能消耗)。消息延迟级别在 Broker 端通过 messageDelayLevel
来控制,默认为 (1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)。上文消息重试也提到过这组数字,消息重试也正是借助定时任务实现的。RocketMQ 高版本也开始了支持自定义延迟时间。
public class Message implements Serializable {
。。。
public void setDelayTimeLevel(int level) {
this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
}
)
发送延迟消息时,只需要调用Message
类中的setDelayTimeLevel
方法来设置延时级别。level
值是 1到18 的int
数值,对应 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。
事务消息
事务消息是 RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。
事务消息发生在 Producer 和 Broker 之间。事务消息通过TransactionMQProducer
实现。通过TransactionMQProducer
类中的sendMessageInTransaction(final Message msg,final Object arg)
方法发送半消息后,监听类实现如下。
public class MyLocalTransactionListener implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务
// 。。。
// 返回事务状态
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 检查事务状态
// 。。。
// 返回事务状态
return LocalTransactionState.COMMIT_MESSAGE;
}
}
executeLocalTransaction
方法来执行本地事务,返回事务状态。
checkLocalTransaction
用来检查本地事务状态,并回应消息队列的检查请求。
消息过滤
RocketMQ 支持两种消息过滤模式:表达式(TAG、SQL92)与类过滤模式。
- TAG模式 是最常用的消息过滤模式,它基于消息的标签(Tag)进行过滤。
- SQL92模式 是一种更灵活的消息过滤模式,是通过消息的属性运行类似 SQL 过滤表达式进行条件匹配,消息发送时需要设置用户的属性putUserProperty方法设置属性。
- 类过滤模式 是一种基于消费者类名的过滤方式,在消费端实现过滤逻辑。
消息类型(MessageType)
消息类型是 RocketMQ 中按照消息传输特性的不同而定义的分类,用于类型管理和安全校验。RocketMQ 支持的消息类型有普通消息、顺序消息、事务消息和定时消息。
Apache RocketMQ 从5.0版本开始,支持强制校验消息类型,即每个主题Topic只允许发送一种消息类型的消息,这样可以更好的运维和管理生产系统,避免混乱。但同时保证向下兼容4.x版本行为,强制校验功能默认关闭,推荐通过服务端参数 enableTopicMessageTypeCheck 手动开启校验。
消息位点(MessageQueueOffset)
消息是按到达 RocketMQ 服务端的先后顺序存储在指定主题的多个队列中,每条消息在队列中都有一个唯一的Long类型坐标,这个坐标被定义为消息位点。每个消息消费者可以通过消息位点来确定自己消费的起始位置。也就是消息在消息队列中的偏移量。
消费位点(ConsumerOffset)
一条消息被某个消费者消费完成后不会立即从队列中删除,RocketMQ 会基于每个消费者分组记录消费过的最新一条消息的位点,即消费位点。
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
消费者默认是从最后一个位点开始消费的,注意这个配置只在消费者第一次启动时和重平衡时生效。
-
对于集群消费。消费位点是由 RocketMQ 服务器维护和管理的。在集群消费模式下,同一个消费组内的消费者共同消费同一个消息队列,每个消费者会独立地维护自己的消费位点。RocketMQ 服务器会将消费位点信息保存在 Broker 端或者 NameServer 端中(不同版本或不同配置)。这样,在消费者重启、重平衡等情况下,服务器能够正确地恢复消费者的消费进度。
-
对于广播消费。消费位点是由各个消费者自行管理并保存在本地存储中。在广播消费模式下,每个消费者都独立地消费所有消息,因此每个消费者都需要维护自己的消费位点。消费者可以选择将消费位点保存在文件系统、数据库或其他外部存储中,并在消费者启动时从本地存储中加载上次消费的位点,并在消费过程中定期将消费位点刷新到本地存储。
注意事项
- 配置管理:RocketMQ 各个组件都有很多配置,使用前务必仔细了解并正确配置。
- 队列数量:队列数量多少关系到消费的能力上限。一个队列只能被一个消费者消费(假如一个Topic有8个队列,启动了 10 个消费者,那么是有两个消费者空闲的)。
- 消息可靠:RocketMQ 默认情况下是异步刷盘的,根据业务场景可以强制立即刷盘。
- 消费者组:确保不同的消费者组使用不同的消费者名称。同一个消费者组只配置一种消费方式(碰到过一个消费者组有几个集群消费者有几个广播消费者的事情,竟然也启动起来了。。。)。
- 消费顺序:要注意是否需要消息的顺序消费,RocketMQ 可以保证单个 Queue 中消息的顺序消费,如果需要保证消息的顺序,可以把相关消息发送到同一个 Queue 中。
有帮助的话方便点个赞吧😄。