消息队列的消费模型讨论

2023年 10月 10日 85.0k 0

队列模型和发布订阅模型

我们说到消息队里的时候总会提到消息队列的两种消息模型:

  • 队列模型
  • 发布订阅模型
  • 本文重点探讨一下发布订阅模型的不同实现思路。

    队列模型

    早期的消息队列,就是按照数据结构的"队列"来设计,队列支持出队、入队操作并具有先进先出的特点。参考下面这个图,生产者(Producer)发送消息就是入队操作,消费者(Consumer)接收消息就是出队也就是删除操作,服务端存放消息的容器自然就称为"队列"。

    如果有多个生产者往同一个队列里面发送消息,这个队列中可以消费到的消息,就是这些生产者生产的所有消息的合集。消息的顺序就是这些生产者发送消息的自然顺序。如果有多个消费者接收同一个队列的消息,这些消费者之间实际上是竞争的关系,每个消费者只能收到队列中的一部分消息,也就是说任何一条消息只能被其中的一个消费者收到。如果需要将一份消息数据分发给多个消费者,要求每个消费者都能收到全量的消息,例如对于一份订单数据,风控系统、分析系统、支付系统等都需要接收消息。这个时候,单个队列就满足不了需求,一个可行的解决方式是,为每个消费者创建一个单独的队列,让生产者发送多份,但是在真实的业务系统中,特别是一些大流量高并发的场景是非常不友好的,对生产者系统压力比较大,而且服务对于数据需要冗余存储,存储压力也比较大。

    发布订阅模型

    为了解决上述问题,衍生出了发布订阅模型,也叫生产者消费者模型(为了便于描述,下面统一使用生产者消者费模型)。

    在生产者消费者模式中,消息的发送方称为生产者(Producer),消息的接收方称为消费者(Consumer),服务端存放消息的容器称为主题(Topic)。生产者将消息发送到主题中,消费者在接收消息之前需要先"订阅主题"。"订阅"在这里既是一个动作,同时还可以认为是主题在消费时的一个逻辑副本,每份订阅中,消费者都可以接收到主题的所有消息。在消息领域的历史上很长的一段时间,队列模式和生产者消费者模式是并存的,有些消息队列同时支持这两种消息模型,比如 ActiveMQ。

    我们仔细对比一下这两种模型,生产者和生产者对应,消费者和消费者对应,队列和主题对应(实际上在具体的MQ实现中主题下往往有多个分区,分区和队列更具有类比关系),并没有本质的区别。它们最大的区别其实就是,一份消息数据能不能被消费多次的问题。实际上,在这种发布 -订阅模型中,如果只有一个消费者者就和队列模型就基本是一样的。也就是说发布订阅模型在功能层面上是可以兼容队列模型的。目前主流的MQ如Kafka, RocketMQ,Qmq都是采用发布订阅模型。

    Kafka消息模型

    kafka架构

    核心组件:

    • Broker:本质上是部署了Kafka服务端的进程,所有的后端逻辑都在Broker中,Kafka在Broker层面都是对等的,没有出从之分
    • Producer:生产者,向主题发布新消息的客户端进程
    • Consumer:消费者,从主题消费消息的客户端进程
    • Zookeeper:Kafka控制器大量使用Zookeeper Watcher功能实现对集群的协调管理,从而实现主题管理(创建、删除、增加分区),分区重分配,副本领导者选举,集群成员管理(新增 Broker、Broker 主动关闭、Broker 宕机),数据服务等,不过在Kafka2.8版本之后引入quorum service(底层是基于Raft)来替代Zookeeper,在后续的版本中会彻底移除对Zookeeper的依赖。

    其他概念:

    • Record,消息,Kafka作为一个消息引擎,这里的消息就是指 Kafka 处理的主要对象
    • Topic,主题,主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务
    • Partition,分区,一个有序不变的消息序列,每个主题下可以有多个分区
    • Offset,消息位移,表示分区中每条消息的位置信息,是一个单调递增且不变的值
    • Replica,副本,Kafka 中同一条消息能够被存储到多个地方以提供数据冗余,也就是所谓的副本。副本还分为领导者副本和追随者副本的角色划分。副本是在分区维度下的概念,每个分区可配置多个副本实现高可用
    • Consumer Offset,消费者位移,表征消费者消费进度,每个消费者都有自己的消费者位移
    • Consumer Group,消费者组,多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐

    Kafka分区和副本

    分布式系统中经常会提到分区,分布式系统带来的主要好处之一就是扩展性。不同的分布式系统对分区的叫法也不尽相同。比如在 Kafka 中叫分区,在 MongoDB 和 Elasticsearch 中就叫分片 Shard,而在 HBase 中则叫 Region,在 Cassandra 中又被称作 vnode。从表面看起来它们实现原理可能不尽相同,但对底层分区(Partitioning)的整体思想却从未改变。

    Kafka 有主题(Topic)的概念,是承载真实数据的逻辑容器,在主题之下还分为若干个分区,Kafka 的消息组织方式实际上是三级结构:主题 - 分区 - 消息。主题下的每条消息只会保存在某一个分区中,在同一个分区下如果有多个副本是会保存同样的消息。比较有意思的是Kafka的副本和MySQL的副本(即主从架构)不同,Kafka的副本只有Leader副本才能提供读写,Follower副本仅会同步消息,当且仅当Leader副本宕机才会切换到Follower副本进行读写,所以副本实现的是高可用的功能(HA),而分区才是实现数据扩展的功能(Scalability)。

    不同的分区能够被放置到不同节点的机器上(Broker),而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且还可以通过添加新的节点机器来增加整体系统的吞吐量。

    官网上的这张图非常清晰地展示了 Kafka 的三级结构

    Kafka存储结构

    上图描述的Kafka的存储架构,Kafka中的消息以主题为基本单位进行归类,各个主题在逻辑上互相独立,每个主题又可以分为一到多个分区,每个分区可以分为若干个副本,在物理上每个主题下的每个分区的副本会对应一个日志Log,所以Kafka的消费模型设计最终是落实到分区副本维度的。

    为防止文件过大,Kafka引入了日志分段的概念(LogSegment),将一个大的日志Log拆分成多个小文件,上图Log 和LogSegnient 也不是纯粹物理意义上的概念, Log 在物理上只以文件夹的形式存储,而每个个LogSegment 对应于磁盘上的一个日志文件和两个索引文件(还有其余文件暂不考虑),Kafka日志主要有index、timestamp、log 三种类型。

    向Log中追加消息时是顺序写入的,只有最后一个LogSegment才能执行写入操作,在此之前所有的LogSegment 都不能写入数据。为了方便描述,我们将最后一个LogSegment 称为"activeSegment",即表示当前活跃的日志分段。随着消息的不断写入,当activeSegment 满足一定的条件时,就需要创建新的activeSegment,之后追加的消息将写入新的activeSegment 。为了便于消息的检索,每个LogSegment 中的日志文件(以".log"为文件后缀)都有对应的两个索引文件:偏移量索引文件(以".index"为文件后缀〉和时间戳索引文件(以".timeindex"为文件后缀)。每个LogSegment 都有一个基准偏移量baseOffset,用来表示当前LogSegment中第一条消息的offset 。偏移量是一个64 位的长整型数,日志文件和两个索引文件都是根据基

    准偏移量( baseOffset )命名的,名称固定为20 位数字,没有达到的位数则用0 填充。比如第

    一个LogSegment 的基准偏移量为0 ,对应的日志文件为00000000000000000000.log,同理偏移量索引文件为00000000000000000000.index,时间索引文件为00000000000000000000.timeindex。

    日志

    Kafka发展到2.0版本已经有V0、V1、V2,现在只用考虑V2版本的消息格式

    RecordBatch

    字段 说明 字节数
    first offset 开始位移 8
    length 消息长度 4
    partition leader epoch 当前分区leader副本的任期 4
    magic 魔数 1
    attributes 属性 2
    last offset delta 最后位移 4
    first timestamp 开始时间戳 8
    max timestamp 最大时间戳 8
    producer id 生产者id 8
    producer epoch 生产者任期 2
    first sequence 第一个序列号 4
    records count 记录数量 4
    records 存放具体的消息

    Record

    字段 说明 字节数
    length 消息总长度 varint
    attributes 弃用,但还是在消息格式中占据1B的大小,以备未来的格式扩展 1
    timestamp delta 消息内容 varlong
    offset delta 时间戳增量。通常一个 timestamp 需要占用8个字节,如果像这里一样保存与 RecordBatch 的起始时间戳的差值,则可以进一步节省占用的字节数 varint
    key length key长度 varint
    key 消息的key key length
    value length value长度 varint
    value 消息的值 value length
    headers count 消息header数量
    headers 消息header内容

    Header

    字段 说明 字节数
    header key length 消息header key长度 varint
    header key 消息header value内容 utf8
    header value length 消息header值长度 varint
    header value 消息header值内容 header value length

    Kafka消息日志存储比较有特点:

  • Kafka消息采用批量存储压缩存储空间,多条消息Record会聚合成一个RecordBatch,可以看到first offset到records count都是这个RecordBatch的整体属性,在写入消息时通过直接批量写入整个RecordBatch提升写性能;读的时候每个RecordBatch的first offset和records count,就可以很容易找到这records count个消息
  • Kafka v2版本的消息还引入了变长字段,用于压缩数字类型的存储空间,感兴趣可以看:Kafka消息格式中的变长字段(Varints)
  • 由此可见Kafka在细节上对日志存储做了很多优化。

    位移索引

    上面说到位移索引和时间索引是用于提高查找消息的效率。偏移量索引文件用来建立消息偏移量(offset)到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置。

    字段 说明 字节数
    relativeOffset 相对偏移量,表示消息相对于 baseOffset 的偏移量,当前索引文件的文件名即为 baseOffset 的值 4
    position 物理地址,也就是消息在日志分段文件中对应的物理位置 4

    为了节约存储空间,位移索引采用稀疏索引的方式进行组织,每当写入一定量(由 broker 端参数 log.index.interval.bytes 指定,默认值为4096,即 4KB)的消息时会写入一个索引项(时间索引也是如此)。使用稀疏索引的好处是可以节省空间,将更多的索引加载到内存中,从而可以更快的进行读取和消费。

    Kafka消费消息时会通过位移offset查找消息,Kafka基于二分搜索在index索引文件中搜索找到不大于offset的相对位移[relativeOffset, position],然后在log日志从position开始顺序查找,直到查找到偏移量为offset的消息。

    时间索引

    时间戳索引文件则根据指定的时间戳(timestamp)来查找对应的偏移量信息。

    字段 说明 字节数
    timestamp 当前日志分段最大的时间戳 8
    relativeOffset 相对偏移量,表示消息相对于 baseOffset 的偏移量,当前索引文件的文件名即为 baseOffset 的值 4

    时间索引可以方便的找到某个时间点(timestamp)写入的消息,如从今天凌晨开始消费就可以用到这个功能。

    kafka生产和消费流程

    由于我们重点关注消息模型,所以Producer和Consumer客户端的设计和操作不用太多考虑,只用考虑服务端的大致流程。

    写入消息

    Kafka写入消息的服务端比较简单,最核心的流程就是将消息数据顺序写入到日志中:

  • Producer向Broker发起写入消息请求,服务端根据消息中的主题名和分区找到对应的日志文件位置
  • 找到最近的日志段LogSegment顺序写入Log日志,如果LogSegment文件已满,创建一个新的LogSegment并写入

  • 每当写入一定量(由 broker 端参数 log.index.interval.bytes 指定,默认值为4096,即 4KB)的消息时,会向偏移量索引文件和时间戳索引文件分别新增加一个索引项
  • 消费消息

    Kafka消费消息的核心的流程就是从日志文件中找到对应offset的消息。

  • Consumer向Broker发起消费消息请求,服务端根据消息中的主题名和分区找到对应的日志文件位置
  • 使用位移日志进行二分搜索找到消费请求的offset的索引位置,获取消息在Log文件的物理位置
  • 从Log日志中获取消息
  • 在同一个消费者组中,消费者数量和分区的数量是一一对应的,在创建主题时就必须要考虑好消费者数量以便确定好分区的数量,一旦设定好分区后再进行修改就是一个非常重的操作。

    总结

    Kafka的消息模型是以分区副本日志为核心的,生产消费都是基于Log文件进行操作,索引文件只用于加快查找速度。

    整体架构比较简单,但是细节设计上有很多巧妙的地方,比如消息压缩,变长字段,稀疏索引的设计等。

    此外在消息队列设计上也有一些值得借鉴的点:

  • 顺序文件读写,提高磁盘读写能力,从而实现高性能的生产和消费能力
  • 主题-分区-副本的三层架构,既实现了高可用也在一定程度提高了扩展性
  • 当然如果把Kafka作为一个纯粹的消息队列而不是用于大数据处理,Kafka也存在一些问题

  • 每个topic下的分区都写在不同的文件,如果同一个Broker太多就会创建太多的文件,写入时性能会有影响(无法保证顺序写)
  • 由于分区的固定设置,consumer消费的时候也是和partition一一对应的,如果consumer数目大于partition的数据, 就会出现consumer处于空闲的状态如果partition数据大于consumer的数据就会出现部分consumer繁忙的状况,扩展性并不是特别理想,如果生产者数据量激增导致Broker消息积压实际上没有比较好的方法处理(当然可以在客户端进行多线程消费,不过这需要使用者进行额外的开发)
  • 作为消息队列功能比较简陋,如为了提高生产者发送的性能,引入了RecordBatch会导致消息有一定程度的延迟,实时性不好,还有延迟队列、死信队列等功能都需要用户自己实现
  • RocketMQ消息模型

    RocketMQ架构 & 存储结构

    RocketMQ的架构和存储结构可以见: RocketMQ磁盘存储

    RocketMQ集群主要由四个部分组成:NameServer集群,Producer,Consumer,Broker集群。

    集群架构和Kafka相比有两个比较大的区别:

  • RocketMQ自己实现了注册中心NameServer,用于管理整个集群,替代了Kafka中的Zk集群,而且Producer和Consumer都会直接从NameServer集群中获取元信息,Broker集群只需要做好数据的读写即可
  • RocketMQ的Broker是有主从区别的,所以RocketMQ的主从同步是Broker维度的(在RocketMQ里叫HA,即高可用),所以在RocketMQ里面没有Kafka副本的概念
  • Master Broker支持读和写,Slave Broker仅支持读,当Master不可用或者繁忙的时候,Consumer会被自动切换到从Slave读,当一个Master机器出现故障后,Consumer仍能从Slave消费,不影响Consumer程序。在创建Topic的时候,把Topic的多个Message Queue创建在多个 Broker组上(相同Broker名称不同brokerId的机器组成一个Broker组),这样当一个Broker组的Master不可用后,其他组的Master仍然可用,Producer仍然可以发送消息。

    在RocketMQ中对应的Kafka的分区Partition概念的是队列,物理实现中对应的ConsumerQueue日志。

    RocketMQ生产消费流程

    RocketMq主要也是有三种日志组成:CommitLog,ConsumerQueue,IndexFile,RocketMQ借鉴了Kafka日志分段的概念(LogSegment),CommitLog,ConsumerQueue,IndexFile都会拆分成指定大小的小文件方便进行读写操作。

    • CommitLog: 存储的是具体的消息数据,与Kafka不同在同一个Broker中所有Topic的日志都会存储在同一个CommitLog文件中,这样可以在分区数很多时更好的实现文件顺序读写
    • ConsumerQueue:消息的逻辑队列,存储的是指向物理存储的地址,每个 Topic下的每个 Message Queue都有一个对应的 ConsumeQueue文件,是一个二级索引的实现,ConsumeQueue主要用于消息消费时,消费者拉取消息使用
    • IndexFile为全局索引,不区分主题队列,所有消息索引在一个文件中,用于实现消息的关键字查询

    写入消息

    RocketMQ Broker端接收到消息后会顺序写入CommitLog中,通过ReputMessageService定时任务会每隔1s执行一次利用CommitLog中的数据异步构建ConsumerQueue和IndexFile。

    Commitlog在消息写入之后,由专门的线程产生消息转发任务从CommitLog中一条一条的读消息,同步到ConsumerQueue中的只有commitlog offset,size,tag hashcode这三个信息。ConsumerQueue文件并不是像commitLog一样文件直接堆放在一起,而是在consumerQueue目录下,根据topic/queueId,再分了两层目录,提升了检索的效率。

    消费消息

    ConsumerQueue队列和消费者是一一对应的,RocketMQ从ConsumerQueue索引文件查找消息的真实物理地址,然后再从CommitLog中对应位置查找消息并返回给客户端。

    虽然是随机读,但是利用操作系统的 pagecache 机制,可以批量地从磁盘读取作为cache存到内存中,加速后续的读取速度。

    ConsumeQueue每个索引项仅20B,1G的空间就能存储6000w个索引项,在实际情况中大部分的ConsumeQueue能够被全部读入内存,所以这个中间结构的操作速度很快,可以认为是内存读取的速度 。

    总结

    RocketMQ设计中吸收了很多Kafka的成功经验,并且从设计之初就是作为一个纯粹的消息队列,所以RocketMQ原生就支持MQ的高级功能,作为后来者RocketMQ也有很多可取之处

  • Broker消息存储CommitLog在Broker中是全局共享的,所有的Topic接受的消息都会顺序写入到一个文件,即使同一个Broker中有上千个Topic,也不会因此导致磁盘的随机读写,这样可以支持更多Topic(这里需要说明下,Kafka设计的目的是用于大数据日志传输,因此在一个集群中Topic的数量不会很多,但是单个Topic的消息是非常庞大的,而在业务系统中正好相反,Topic数量很多,而每个Topic支持的消息数并一定会非常多)
  • RocketMQ中消费者和ConsumerQueue对应的,二者关系正如Kafka消费者和分区,在Kafka里分区是一个比较重的资源,增加太多分区需要考虑整个集群的处理能力,当高峰期过,如果想缩容消费者也比较麻烦,因为分区只能增加,不能减少。Kafka的分区是在不同的物理机器上,而RocketMQ是逻辑分区,用的队列形式,扩容相对比较简单,但是缩容需要里面的数据全部消费完后,缩容操作实践意义不大。
  • 虽然RocketMQ可以通过对ConsumerQueue扩容增加消费能力,但是原有的ConsumerQueue并不会新ConsumerQueue中,所以对于历史积压消息也没有很好的效果。

    QMQ消息模型

    QMQ集群架构

    主要组件

    • MetaServer提供集群管理和集群发现的作用,类比RocketMQ的NameServer以及Kafka的Controller + ZK
    • Server提供实时消息服务,可以类比成Broker
    • DelayServer 提供延时/定时消息服务,延时消息先在DelayServer排队,时间到之后再发送给Server(基于时间轮)
    • Producer 消息生产者
    • Consumer 消息消费者

    QMQ集群交互流程,其中1,2是集群启动流程,37是生产者消息发送过程,811是消费者消息消费过程

  • DelayServer向MetaServer注册
  • 实时Server 向MetaServer注册
  • Producer在发送消息前需要询问MetaServer获取server list
  • MetaServer返回server list给producer(根据producer请求的消息类型返回不同的server list)
  • Producer发送延时/定时消息
  • 延时时间已到,MetaServer将消息投递给实时Server
  • Producer发送实时消息
  • Consumer需要拉取消息,在拉取之前向MetaServer获取server list(只会获取实时server的list)
  • MetaServer返回server list给Consumer
  • Consumer向实时Server发起pull请求
  • 实时Server将消息返回给Consumer
  • 在QMQ中主题命名为Subject,其实就是对应Topic。

    QMQ存储结构

    QMQ存储结构可以概括为三部分:MessageLog, ConsumerLog,PullLog。

    Message Log

    Message Log存储的是所有Subject的日志,单个文件大小默认为1GB(即一个Segment)。如上图所示,每个方格表示一条消息,方格外的数字表示消息在Message Log中的位移(逻辑位移)。

    字段 大小 说明
    magic code 4 魔数,当前是V3版本消息,0xA1B2C302
    attributes 1 属性标志位,1. 1,表示当前这条是没有实际的payload一般是用于数据填充,此类型消息只有magic code,attributes,timestamp三个字段,并且这条消息后面都会添0用于补齐整个LogSegment文件(使其大小达到1GB)
  • 0,表示当前消息有实际的payload,正常的消息都是0 |
    | timestamp | 8 | 生成当前这条消息是的时间戳,毫秒级, 使用的System.currentTimeMillis() |
    | message logical offset | 8 | 当前这条消息在ConsumerLog中的顺序,即MessageLog内部每个Subject内部消息的顺序 |
    | subject size | 2 | Subject长度 |
    | subject | subject size | 存储Subject的名称,长度为subject size字段的取值 |
    | payload crc32 | 8 | 消息体的CRC校验值 |
    | payload size | 4 | 消息体长度 |
    | payload | payload size | 存储具体的消息体,长度为payload size字段的取值 |
  • 由上可知,

  • 对于正常的消息,每条消息的大小为 4 + 1 + 8 + 8 + 2 + 8 + 4 + subjectSize + payloadSize,所以每条消息的大小仅由Subject名称的长度和消息体的长度来确定;
  • 对于非正常的消息,每条消息的大小为 4 + 1 + 8 = 13,并且直到文件尾部都会补0,当LogSegment文件不能存放一整条消息时就会在文件尾部的消息写入一条这样的消息。
  • Consumer Log

    Consumer Log可以理解为每个Subject的一个Partition,一个Subject对应一个Consumer Log。每个方格表示Subject一条消息,方格内的数字表示消息在Message Log中的位移(逻辑位移),方格外的数字表示消息在Consumer Log的位移(逻辑位移)。

    ConsumerLog有v1和v2两个版本,这里只关注v2版本的日志存储格式。

    字段 大小 说明
    magic code 2 魔数,当前是V2版本,0xA3B2
    payload type 2 Message Log索引,固定值为0xC10
    timestamp 8 生成当前这条消息是的时间戳,毫秒级, 使用的System.currentTimeMillis()
    wrote offset 8 写入位移,对应MessagLog中的位移
    wrote bytes 4 写入字节数
    header size 2 头部大小
    padding1 2 用于填充消息大小,凑足32个字节,全部填充0
    padding2 4

    Pull Log

    Pull Log与 consumer 一一对应,是消费者的消费记录文件。Subject+ConsumerGroup+ConsumerId为维度,保留了每一个消费者的历史消费记录,这样可以在消费者重启之后快速定位到之前消费的位置。每个方格表示消费者消费的一条消息,方格内的数字表示消息在Consumer Log中的位移(逻辑位移),方格外的数字表示消费者的消费位移。

    QMQ通过添加一层拉取的Pull Log来动态映射Consumer与Partition(Consumer Log)的逻辑关系,这样不仅解决了Consumer的动态扩容缩容问题,还可以继续使用一个 offset 表示消费进度。

    字段 大小 说明
    message sequence 8 Consumer Log的Sequence号

    总结

    RocketMQ在Kafka的基础上做了一个中间层逻辑队列ConsumerQueue,QMQ则是更近一步在RocketMQ上基础上引入了PullLog中间层,彻底实现队列和消费者的解耦。回到之前说过的那个问题,如果生产者消息激增,QMQ可以扩展ConsumerLog队列,从而提升消息写入性能,如果增加消费者会动态生成新的PullLog,不会出现Kafka和RocketMQ这种队列和消费者1:1,对于消费者只能干等的场景。

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论