队列模型和发布订阅模型
我们说到消息队里的时候总会提到消息队列的两种消息模型:
本文重点探讨一下发布订阅模型的不同实现思路。
队列模型
早期的消息队列,就是按照数据结构的"队列"来设计,队列支持出队、入队操作并具有先进先出的特点。参考下面这个图,生产者(Producer)发送消息就是入队操作,消费者(Consumer)接收消息就是出队也就是删除操作,服务端存放消息的容器自然就称为"队列"。
如果有多个生产者往同一个队列里面发送消息,这个队列中可以消费到的消息,就是这些生产者生产的所有消息的合集。消息的顺序就是这些生产者发送消息的自然顺序。如果有多个消费者接收同一个队列的消息,这些消费者之间实际上是竞争的关系,每个消费者只能收到队列中的一部分消息,也就是说任何一条消息只能被其中的一个消费者收到。如果需要将一份消息数据分发给多个消费者,要求每个消费者都能收到全量的消息,例如对于一份订单数据,风控系统、分析系统、支付系统等都需要接收消息。这个时候,单个队列就满足不了需求,一个可行的解决方式是,为每个消费者创建一个单独的队列,让生产者发送多份,但是在真实的业务系统中,特别是一些大流量高并发的场景是非常不友好的,对生产者系统压力比较大,而且服务对于数据需要冗余存储,存储压力也比较大。
发布订阅模型
为了解决上述问题,衍生出了发布订阅模型,也叫生产者消费者模型(为了便于描述,下面统一使用生产者消者费模型)。
在生产者消费者模式中,消息的发送方称为生产者(Producer),消息的接收方称为消费者(Consumer),服务端存放消息的容器称为主题(Topic)。生产者将消息发送到主题中,消费者在接收消息之前需要先"订阅主题"。"订阅"在这里既是一个动作,同时还可以认为是主题在消费时的一个逻辑副本,每份订阅中,消费者都可以接收到主题的所有消息。在消息领域的历史上很长的一段时间,队列模式和生产者消费者模式是并存的,有些消息队列同时支持这两种消息模型,比如 ActiveMQ。
我们仔细对比一下这两种模型,生产者和生产者对应,消费者和消费者对应,队列和主题对应(实际上在具体的MQ实现中主题下往往有多个分区,分区和队列更具有类比关系),并没有本质的区别。它们最大的区别其实就是,一份消息数据能不能被消费多次的问题。实际上,在这种发布 -订阅模型中,如果只有一个消费者者就和队列模型就基本是一样的。也就是说发布订阅模型在功能层面上是可以兼容队列模型的。目前主流的MQ如Kafka, RocketMQ,Qmq都是采用发布订阅模型。
Kafka消息模型
kafka架构
核心组件:
- Broker:本质上是部署了Kafka服务端的进程,所有的后端逻辑都在Broker中,Kafka在Broker层面都是对等的,没有出从之分
- Producer:生产者,向主题发布新消息的客户端进程
- Consumer:消费者,从主题消费消息的客户端进程
- Zookeeper:Kafka控制器大量使用Zookeeper Watcher功能实现对集群的协调管理,从而实现主题管理(创建、删除、增加分区),分区重分配,副本领导者选举,集群成员管理(新增 Broker、Broker 主动关闭、Broker 宕机),数据服务等,不过在Kafka2.8版本之后引入quorum service(底层是基于Raft)来替代Zookeeper,在后续的版本中会彻底移除对Zookeeper的依赖。
其他概念:
- Record,消息,Kafka作为一个消息引擎,这里的消息就是指 Kafka 处理的主要对象
- Topic,主题,主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务
- Partition,分区,一个有序不变的消息序列,每个主题下可以有多个分区
- Offset,消息位移,表示分区中每条消息的位置信息,是一个单调递增且不变的值
- Replica,副本,Kafka 中同一条消息能够被存储到多个地方以提供数据冗余,也就是所谓的副本。副本还分为领导者副本和追随者副本的角色划分。副本是在分区维度下的概念,每个分区可配置多个副本实现高可用
- Consumer Offset,消费者位移,表征消费者消费进度,每个消费者都有自己的消费者位移
- Consumer Group,消费者组,多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐
Kafka分区和副本
分布式系统中经常会提到分区,分布式系统带来的主要好处之一就是扩展性。不同的分布式系统对分区的叫法也不尽相同。比如在 Kafka 中叫分区,在 MongoDB 和 Elasticsearch 中就叫分片 Shard,而在 HBase 中则叫 Region,在 Cassandra 中又被称作 vnode。从表面看起来它们实现原理可能不尽相同,但对底层分区(Partitioning)的整体思想却从未改变。
Kafka 有主题(Topic)的概念,是承载真实数据的逻辑容器,在主题之下还分为若干个分区,Kafka 的消息组织方式实际上是三级结构:主题 - 分区 - 消息。主题下的每条消息只会保存在某一个分区中,在同一个分区下如果有多个副本是会保存同样的消息。比较有意思的是Kafka的副本和MySQL的副本(即主从架构)不同,Kafka的副本只有Leader副本才能提供读写,Follower副本仅会同步消息,当且仅当Leader副本宕机才会切换到Follower副本进行读写,所以副本实现的是高可用的功能(HA),而分区才是实现数据扩展的功能(Scalability)。
不同的分区能够被放置到不同节点的机器上(Broker),而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且还可以通过添加新的节点机器来增加整体系统的吞吐量。
官网上的这张图非常清晰地展示了 Kafka 的三级结构
Kafka存储结构
上图描述的Kafka的存储架构,Kafka中的消息以主题为基本单位进行归类,各个主题在逻辑上互相独立,每个主题又可以分为一到多个分区,每个分区可以分为若干个副本,在物理上每个主题下的每个分区的副本会对应一个日志Log,所以Kafka的消费模型设计最终是落实到分区副本维度的。
为防止文件过大,Kafka引入了日志分段的概念(LogSegment),将一个大的日志Log拆分成多个小文件,上图Log 和LogSegnient 也不是纯粹物理意义上的概念, Log 在物理上只以文件夹的形式存储,而每个个LogSegment 对应于磁盘上的一个日志文件和两个索引文件(还有其余文件暂不考虑),Kafka日志主要有index、timestamp、log 三种类型。
向Log中追加消息时是顺序写入的,只有最后一个LogSegment才能执行写入操作,在此之前所有的LogSegment 都不能写入数据。为了方便描述,我们将最后一个LogSegment 称为"activeSegment",即表示当前活跃的日志分段。随着消息的不断写入,当activeSegment 满足一定的条件时,就需要创建新的activeSegment,之后追加的消息将写入新的activeSegment 。为了便于消息的检索,每个LogSegment 中的日志文件(以".log"为文件后缀)都有对应的两个索引文件:偏移量索引文件(以".index"为文件后缀〉和时间戳索引文件(以".timeindex"为文件后缀)。每个LogSegment 都有一个基准偏移量baseOffset,用来表示当前LogSegment中第一条消息的offset 。偏移量是一个64 位的长整型数,日志文件和两个索引文件都是根据基
准偏移量( baseOffset )命名的,名称固定为20 位数字,没有达到的位数则用0 填充。比如第
一个LogSegment 的基准偏移量为0 ,对应的日志文件为00000000000000000000.log,同理偏移量索引文件为00000000000000000000.index,时间索引文件为00000000000000000000.timeindex。
日志
Kafka发展到2.0版本已经有V0、V1、V2,现在只用考虑V2版本的消息格式
RecordBatch
字段 | 说明 | 字节数 |
---|---|---|
first offset | 开始位移 | 8 |
length | 消息长度 | 4 |
partition leader epoch | 当前分区leader副本的任期 | 4 |
magic | 魔数 | 1 |
attributes | 属性 | 2 |
last offset delta | 最后位移 | 4 |
first timestamp | 开始时间戳 | 8 |
max timestamp | 最大时间戳 | 8 |
producer id | 生产者id | 8 |
producer epoch | 生产者任期 | 2 |
first sequence | 第一个序列号 | 4 |
records count | 记录数量 | 4 |
records | 存放具体的消息 |
Record
字段 | 说明 | 字节数 |
---|---|---|
length | 消息总长度 | varint |
attributes | 弃用,但还是在消息格式中占据1B的大小,以备未来的格式扩展 | 1 |
timestamp delta | 消息内容 | varlong |
offset delta | 时间戳增量。通常一个 timestamp 需要占用8个字节,如果像这里一样保存与 RecordBatch 的起始时间戳的差值,则可以进一步节省占用的字节数 | varint |
key length | key长度 | varint |
key | 消息的key | key length |
value length | value长度 | varint |
value | 消息的值 | value length |
headers count | 消息header数量 | |
headers | 消息header内容 |
Header
字段 | 说明 | 字节数 |
---|---|---|
header key length | 消息header key长度 | varint |
header key | 消息header value内容 | utf8 |
header value length | 消息header值长度 | varint |
header value | 消息header值内容 | header value length |
Kafka消息日志存储比较有特点:
由此可见Kafka在细节上对日志存储做了很多优化。
位移索引
上面说到位移索引和时间索引是用于提高查找消息的效率。偏移量索引文件用来建立消息偏移量(offset)到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置。
字段 | 说明 | 字节数 |
---|---|---|
relativeOffset | 相对偏移量,表示消息相对于 baseOffset 的偏移量,当前索引文件的文件名即为 baseOffset 的值 | 4 |
position | 物理地址,也就是消息在日志分段文件中对应的物理位置 | 4 |
为了节约存储空间,位移索引采用稀疏索引的方式进行组织,每当写入一定量(由 broker 端参数 log.index.interval.bytes 指定,默认值为4096,即 4KB)的消息时会写入一个索引项(时间索引也是如此)。使用稀疏索引的好处是可以节省空间,将更多的索引加载到内存中,从而可以更快的进行读取和消费。
Kafka消费消息时会通过位移offset查找消息,Kafka基于二分搜索在index索引文件中搜索找到不大于offset的相对位移[relativeOffset, position],然后在log日志从position开始顺序查找,直到查找到偏移量为offset的消息。
时间索引
时间戳索引文件则根据指定的时间戳(timestamp)来查找对应的偏移量信息。
字段 | 说明 | 字节数 |
---|---|---|
timestamp | 当前日志分段最大的时间戳 | 8 |
relativeOffset | 相对偏移量,表示消息相对于 baseOffset 的偏移量,当前索引文件的文件名即为 baseOffset 的值 | 4 |
时间索引可以方便的找到某个时间点(timestamp)写入的消息,如从今天凌晨开始消费就可以用到这个功能。
kafka生产和消费流程
由于我们重点关注消息模型,所以Producer和Consumer客户端的设计和操作不用太多考虑,只用考虑服务端的大致流程。
写入消息
Kafka写入消息的服务端比较简单,最核心的流程就是将消息数据顺序写入到日志中:
找到最近的日志段LogSegment顺序写入Log日志,如果LogSegment文件已满,创建一个新的LogSegment并写入
消费消息
Kafka消费消息的核心的流程就是从日志文件中找到对应offset的消息。
在同一个消费者组中,消费者数量和分区的数量是一一对应的,在创建主题时就必须要考虑好消费者数量以便确定好分区的数量,一旦设定好分区后再进行修改就是一个非常重的操作。
总结
Kafka的消息模型是以分区副本日志为核心的,生产消费都是基于Log文件进行操作,索引文件只用于加快查找速度。
整体架构比较简单,但是细节设计上有很多巧妙的地方,比如消息压缩,变长字段,稀疏索引的设计等。
此外在消息队列设计上也有一些值得借鉴的点:
当然如果把Kafka作为一个纯粹的消息队列而不是用于大数据处理,Kafka也存在一些问题
RocketMQ消息模型
RocketMQ架构 & 存储结构
RocketMQ的架构和存储结构可以见: RocketMQ磁盘存储
RocketMQ集群主要由四个部分组成:NameServer集群,Producer,Consumer,Broker集群。
集群架构和Kafka相比有两个比较大的区别:
Master Broker支持读和写,Slave Broker仅支持读,当Master不可用或者繁忙的时候,Consumer会被自动切换到从Slave读,当一个Master机器出现故障后,Consumer仍能从Slave消费,不影响Consumer程序。在创建Topic的时候,把Topic的多个Message Queue创建在多个 Broker组上(相同Broker名称不同brokerId的机器组成一个Broker组),这样当一个Broker组的Master不可用后,其他组的Master仍然可用,Producer仍然可以发送消息。
在RocketMQ中对应的Kafka的分区Partition概念的是队列,物理实现中对应的ConsumerQueue日志。
RocketMQ生产消费流程
RocketMq主要也是有三种日志组成:CommitLog,ConsumerQueue,IndexFile,RocketMQ借鉴了Kafka日志分段的概念(LogSegment),CommitLog,ConsumerQueue,IndexFile都会拆分成指定大小的小文件方便进行读写操作。
- CommitLog: 存储的是具体的消息数据,与Kafka不同在同一个Broker中所有Topic的日志都会存储在同一个CommitLog文件中,这样可以在分区数很多时更好的实现文件顺序读写
- ConsumerQueue:消息的逻辑队列,存储的是指向物理存储的地址,每个 Topic下的每个 Message Queue都有一个对应的 ConsumeQueue文件,是一个二级索引的实现,ConsumeQueue主要用于消息消费时,消费者拉取消息使用
- IndexFile为全局索引,不区分主题队列,所有消息索引在一个文件中,用于实现消息的关键字查询
写入消息
RocketMQ Broker端接收到消息后会顺序写入CommitLog中,通过ReputMessageService定时任务会每隔1s执行一次利用CommitLog中的数据异步构建ConsumerQueue和IndexFile。
Commitlog在消息写入之后,由专门的线程产生消息转发任务从CommitLog中一条一条的读消息,同步到ConsumerQueue中的只有commitlog offset,size,tag hashcode这三个信息。ConsumerQueue文件并不是像commitLog一样文件直接堆放在一起,而是在consumerQueue目录下,根据topic/queueId,再分了两层目录,提升了检索的效率。
消费消息
ConsumerQueue队列和消费者是一一对应的,RocketMQ从ConsumerQueue索引文件查找消息的真实物理地址,然后再从CommitLog中对应位置查找消息并返回给客户端。
虽然是随机读,但是利用操作系统的 pagecache 机制,可以批量地从磁盘读取作为cache存到内存中,加速后续的读取速度。
ConsumeQueue每个索引项仅20B,1G的空间就能存储6000w个索引项,在实际情况中大部分的ConsumeQueue能够被全部读入内存,所以这个中间结构的操作速度很快,可以认为是内存读取的速度 。
总结
RocketMQ设计中吸收了很多Kafka的成功经验,并且从设计之初就是作为一个纯粹的消息队列,所以RocketMQ原生就支持MQ的高级功能,作为后来者RocketMQ也有很多可取之处
虽然RocketMQ可以通过对ConsumerQueue扩容增加消费能力,但是原有的ConsumerQueue并不会新ConsumerQueue中,所以对于历史积压消息也没有很好的效果。
QMQ消息模型
QMQ集群架构
主要组件
- MetaServer提供集群管理和集群发现的作用,类比RocketMQ的NameServer以及Kafka的Controller + ZK
- Server提供实时消息服务,可以类比成Broker
- DelayServer 提供延时/定时消息服务,延时消息先在DelayServer排队,时间到之后再发送给Server(基于时间轮)
- Producer 消息生产者
- Consumer 消息消费者
QMQ集群交互流程,其中1,2是集群启动流程,37是生产者消息发送过程,811是消费者消息消费过程
在QMQ中主题命名为Subject,其实就是对应Topic。
QMQ存储结构
QMQ存储结构可以概括为三部分:MessageLog, ConsumerLog,PullLog。
Message Log
Message Log存储的是所有Subject的日志,单个文件大小默认为1GB(即一个Segment)。如上图所示,每个方格表示一条消息,方格外的数字表示消息在Message Log中的位移(逻辑位移)。
字段 | 大小 | 说明 |
---|---|---|
magic code | 4 | 魔数,当前是V3版本消息,0xA1B2C302 |
attributes | 1 | 属性标志位,1. 1,表示当前这条是没有实际的payload一般是用于数据填充,此类型消息只有magic code,attributes,timestamp三个字段,并且这条消息后面都会添0用于补齐整个LogSegment文件(使其大小达到1GB) |
| timestamp | 8 | 生成当前这条消息是的时间戳,毫秒级, 使用的System.currentTimeMillis() |
| message logical offset | 8 | 当前这条消息在ConsumerLog中的顺序,即MessageLog内部每个Subject内部消息的顺序 |
| subject size | 2 | Subject长度 |
| subject | subject size | 存储Subject的名称,长度为subject size字段的取值 |
| payload crc32 | 8 | 消息体的CRC校验值 |
| payload size | 4 | 消息体长度 |
| payload | payload size | 存储具体的消息体,长度为payload size字段的取值 |
由上可知,
Consumer Log
Consumer Log可以理解为每个Subject的一个Partition,一个Subject对应一个Consumer Log。每个方格表示Subject一条消息,方格内的数字表示消息在Message Log中的位移(逻辑位移),方格外的数字表示消息在Consumer Log的位移(逻辑位移)。
ConsumerLog有v1和v2两个版本,这里只关注v2版本的日志存储格式。
字段 | 大小 | 说明 |
---|---|---|
magic code | 2 | 魔数,当前是V2版本,0xA3B2 |
payload type | 2 | Message Log索引,固定值为0xC10 |
timestamp | 8 | 生成当前这条消息是的时间戳,毫秒级, 使用的System.currentTimeMillis() |
wrote offset | 8 | 写入位移,对应MessagLog中的位移 |
wrote bytes | 4 | 写入字节数 |
header size | 2 | 头部大小 |
padding1 | 2 | 用于填充消息大小,凑足32个字节,全部填充0 |
padding2 | 4 |
Pull Log
Pull Log与 consumer 一一对应,是消费者的消费记录文件。Subject+ConsumerGroup+ConsumerId为维度,保留了每一个消费者的历史消费记录,这样可以在消费者重启之后快速定位到之前消费的位置。每个方格表示消费者消费的一条消息,方格内的数字表示消息在Consumer Log中的位移(逻辑位移),方格外的数字表示消费者的消费位移。
QMQ通过添加一层拉取的Pull Log来动态映射Consumer与Partition(Consumer Log)的逻辑关系,这样不仅解决了Consumer的动态扩容缩容问题,还可以继续使用一个 offset 表示消费进度。
字段 | 大小 | 说明 |
---|---|---|
message sequence | 8 | Consumer Log的Sequence号 |
总结
RocketMQ在Kafka的基础上做了一个中间层逻辑队列ConsumerQueue,QMQ则是更近一步在RocketMQ上基础上引入了PullLog中间层,彻底实现队列和消费者的解耦。回到之前说过的那个问题,如果生产者消息激增,QMQ可以扩展ConsumerLog队列,从而提升消息写入性能,如果增加消费者会动态生成新的PullLog,不会出现Kafka和RocketMQ这种队列和消费者1:1,对于消费者只能干等的场景。