Redis Stream 数据结构实现原理真的很强

2023年 9月 13日 105.9k 0

你好,我是码哥,一个拥抱硬核技术和对象,面向人民币编程的男人,设置星标不迷路。

我在【Redis 使用 List 实现消息队列的利与弊】说过使用 List 实现消息队列有很多局限性。

  • 没有 ACK 机制。
  • 没有类似 Kafka 的 ConsumerGroup 消费组概念。
  • 消息堆积。
  • List 是线性结构,查询指定数据需要遍历整个列表。

1、是什么

Stream 是 Redis 5.0 版本专门为消息队列设计的数据类型,借鉴了 Kafka 的 Consume Group 设计思路,提供了消费组概念。

同时提供了消息的持久化和主从复制机制,客户端可以访问任何时刻的数据,并且能记住每一个客户端的访问位置,从而保证消息不丢失。

以下几个是 Stream 类型的主要特性。

  • 使用 Radix Tree 和 listpack 结构来存储消息。
  • 消息 ID 序列化生成。
  • 借鉴 Kafka Consume Group 的概念,多个消费者划分到不同的 Consume Group 中,消费同一个 Streams,同一个 Consume Group 的多个消费者可以一起并行但不重复消费,提升消费能力。
  • 支持多播(多对多),阻塞和非阻塞读取。
  • ACK 确认机制,保证了消息至少被消费一次。
  • 可设置消息保存上限阈值,我会把历史消息丢弃,防止内存占用过大。

需要注意的是,Redis Stream 是一种超轻量级的 MQ,并没有完全实现消息队列的所有设计要点,所以它的使用场景需要考虑业务的数据量和对性能、可靠性的需求。

适合系统消息量不大,容忍数据丢失,使用 Redis Stream 作为消息队列就能享受高性能快速读写消息的优势。

2、修炼心法

每个 Stream 都有一个唯一的名称,作为 Stream 在 Redis 的 key,在首次使用 xadd 指令添加消息的时候会自动创建。

可以看到 Stream 在一个 Redix Tree 树上,树上存储的是消息 ID,每个消息 ID 对应的消息通过一个指针指向 listpack。

Stream 流就像是一个仅追加内容的消息链表,把消息一个个串起来,每个消息都有一个唯一的 ID 和消息内容,消息内容则由多个 field/value 键值对组成。底层使用 Radix Tree 和 listpack 数据结构存储数据。

为了便于理解,我画了一张图,并对 Radix Tree 的存储数据做了下变形,使用列表来体现 Stream 中消息的逻辑有序性。

这张图涉及很多概念,但是你不要慌。我一步步拆开说,最后你再回头看就懂了。

先带你屡下全局思路。

  • Consumer Group:消费组,每个消费组可以有一个或者多个消费者,消费者之间是竞争关系。不同消费组的消费者之间无任何关系。
  • *pel,全称是 Pending Entries List,记录了当前被客户端读取但是还没有 ack(Acknowledge character 确认字符)的消息。如果客户端没有 ack,这个变量的消息 ID 会越来越多。这是一个核心数据结构,用来确保客户端至少消费消息一次。

Stream 结构

Streams 结构的源码定义在 stream.h 源码中的 stream 结构体中。

typedef struct stream {
    rax *rax;
    uint64_t length;
    streamID last_id;
    streamID first_id;
    streamID max_deleted_entry_id;
    uint64_t entries_added;
    rax *cgroups;
} stream;

typedef struct streamID {
    uint64_t ms;
    uint64_t seq;
} streamID;
  • *rax,是一个 rax 的指针,指向一个 Radix Tree,key 存储消息 ID,value 实际上指向一个 listpack 数据结构,存储了多条消息,每条消息的 ID 都大于等于 这个 key 的消息 ID。
  • length,该 Stream 的消息条数。
  • streamID结构体,消息 ID 抽象,一共占 128 位,内部维护了毫秒时间戳(字段 ms);一个毫秒内的自增序号(字段 seq),用于区分同一毫秒内插入多条消息。
  • last_id,当前 Stream 最后一条消息的 ID。
  • first_id,当前 Stream 第一条消息的 ID。
  • max_deleted_entry_id,当前 Stream 被删除的最大的消息 ID。
  • entries_added,总共有多少条消息添加到 Stream 中,entries_added = 已删除消息条数 + 未删除消息条数。
  • *cgroups,rax 指针,也指向一个 Radix Tree ,记录当前 Stream 的所有 Consume Group,每个 Consume Group 的名称都是唯一标识,作为 Radix Tree 的 key,Consumer Group 实例作为 value。

Consumer Group

Consumer Group 由 streamCG 结构体定义,每个 Stream 可以有多个 Consumer Group,一个消费组可以有多个消费者同时对组内消息进行消费。

/* Consumer group. */
typedef struct streamCG {
    streamID last_id;
    long long entries_read;
    rax *pel;
    rax *consumers;
} streamCG;
  • last_id,表示该消费组的消费者已经读取但还未 ACK 的最后一条消息 ID。
  • *pel,是 pending entries list 简写,指向一个 Radix Tree 的指针,保存着 Consumer group 中所有消费者读取但还未 ACK 确认的消息,就是这玩意实现了 ACK 机制。该树的 key 是消息 ID,value 关联一个 streamNACK 实例。
  • *consumers, Radix Tree 指针,表示消费组中的所有消费者,key 是消费者名称,value 指向一个 streamConsumer 实例。

streamNACK

streamCG -> *pel 对应的 value 是一个 streamNACK 实例,用于抽象消费者已经读取,但是未 ACK 的消息 ID 相关信息。

/* Pending (yet not acknowledged) message in a consumer group. */
typedef struct streamNACK {
    mstime_t delivery_time;
    uint64_t delivery_count;
    streamConsumer *consumer;
} streamNACK;
  • delivery_time,该消息最后一次推送给 Consumer 的时间戳。
  • delivery_count,消息被推送次数。
  • *consumer,消息推送的 Consumer 客户端。

streamConsumer

Consumer Group 中对 Consumer 的抽象。

/* A specific consumer in a consumer group.  */
typedef struct streamConsumer {
    mstime_t seen_time;
    sds name;
    rax *pel;
} streamConsumer;
  • seen_time,消费者最近一次被激活的时间戳。
  • name,消费者名称。
  • *pel, Radix Tree 指针,对于同一个消息而言,``streamCG -> pel与streamConsumer -> pel的streamNACK` 实例是同一个。

最后来一张图,便于你理解。

肖材积:“Redis 你好,Stream 如何结合 Radix Tree 和 listpack 结构来存储消息?为什么不使用散列表来存储,消息 ID 作为散列表的 key,散列表的 value 存储消息键值对内容。’”

在回答之前,先插入几条消息到 Stream,让你对 Stream 消息的存储格式有个大体认知。

该命令的语法如下。

XADD key id field value [field value ...]

Stream 中的每个消息可以包含不同数量的多个键值对,写入消息成功后,我会把消息的 ID 返回给客户端。

执行如下指令把用户购买书籍的下单消息存放到 hotlist:books队列,消息内容主要由 payerID、amount 和 orderID。

> XADD hotlist:books * payerID 1 amount 69.00 orderID 9
1679218539571-0
> XADD hotlist:books * payerID 1 amount 36.00 orderID 15
1679218572182-0
> XADD hotlist:books * payerID 2 amount 99.00 orderID 88
1679218588426-0
> XADD hotlist:books * payerID 3 amount 68.00 orderID 80
1679218604492-0

hotlist:books 是 Stream 的名称,后面的 “*” 表示让 Redis 为插入的消息自动生成一个唯一 ID,你也可以自定义。

消息 ID 由两部分组成。

  • 当前毫秒内的时间戳。
  • 顺序编号。从 0 为起始值,用于区分同一时间内产生的多个命令。

肖材积:“如何理解 Stream 是一种只执行追加操作(append only)的数据结构?”

通过将元素 ID 与时间进行关联,并强制要求新元素的 ID 必须大于旧元素的 ID, Redis 从逻辑上将 Stream 变成了一种只执行追加操作(append only)的数据结构。

用户可以确信,新的消息和事件只会出现在已有消息和事件之后,就像现实世界里新事件总是发生在已有事件之后一样,一切都是有序进行的。

肖材积:“插入的消息 ID 大部分相同,比如这四条消息的 ID 都是 1679218 前缀。另外,每条消息键值对的键通常都是一样的,比如这四条消息的键都是 payerID、amount 和 orderID。使用散列表存储的话会很多冗余数据,你这么抠门,所以不使用散列表对不对?”

没毛病,小老弟很聪明。为了节省内存,我使用了 Radix Tree 和 listpack。Radix Tree 的 key 存储消息 ID,value 使用 listpack 数据结构存储多个消息, listapck 中的消息 ID 都大于等于 key 存储的消息 ID。

我在前面已经讲过 listpack,这是一个紧凑型列表,非常节省内存。而 Radix Tree 数据结构的最大特点是适合保存具有相同前缀的数据,从而达到节省内存。

到底 Radix Tree 是怎样的数据结构,继续往下看。

Radix Tree

Radix Tree,也被称为 Radix Trie,或者 Compact Prefix Tree),用于高效地存储和查找字符串集合。它将字符串按照前缀拆分成一个个字符,并将每个字符作为一个节点存储在树中。

当插入一个键值对时,Redis 会将键按照字符拆分成一个个字符,并根据字符在 Radix tree 中的位置找到合适的节点,如果该节点不存在,则创建新节点并添加到 Radix tree 中。

当所有字符都添加完毕后,将值对象指针保存到最后一个节点中。当查询一个键时,Redis 按照字符顺序遍历 Radix tree,如果发现某个字符不存在于树中,则键不存在;否则,如果最后一个节点表示一个完整的键,则返回对应的值对象。

如下图展示一个简单的前缀树,将根节点到叶子节点的路径对应字符拼接起来,就得到了两个 key(“他说碉堡了”、“他说碉炸了”)。

你应该发现了,这两个 key 拥有公共前缀(他说碉),前缀树实现了共享使用,这样就可以避免相同字符串重复存储。如果采用散列表的保存方式,那个 key 的相同前缀就会被多次存储,导致内存浪费。

Radix Tree 改进

每个节点只保存一个字符,一是会浪费内存空间,二是在进行查询时,还需要逐一匹配每个节点表示的字符,对查询性能也会造成影响。

所以,Redis 并没有直接使用标准前缀树,而是做了一次变种——Compact Prefix Tree(压缩前缀树)。通俗来说,当多个 key 具有相同的前缀时,那就将相同前缀的字符串合并在一个共享节点中,从而减少存储空间。

如下几个 key(test、toaster、toasting、slow、slowly)在 Radix Tree 上的布局。

由于 Compact Prefix Tree 可以共享相同前缀的节点,所以在存储一组具有相同前缀的键时,Redis 的 Radix tree 比其他数据结构(如哈希表)具有更低的空间消耗和更快的查询速度。

Radix Tree 节点的数据结构由 rax.h文件中的 raxNode 定义。

typedef struct raxNode {
    uint32_t iskey:1;
    uint32_t isnull:1;
    uint32_t iscompr:1;
    uint32_t size:29;
    unsigned char data[];
} raxNode;
  • iskey:从 Radix Tree 根节点到当前节点组成的字符串是否是一个完整的 key。是的话 iskey 的值为 1。
  • isnull:当前节点是否为空节点,如果当前节点是空节点的话,就不需要为该节点分配指向 value 的指针内存。
  • iscompr,是否为压缩节点。
  • size,当前节点的大小,具体指会根据节点类型而改变。如果是压缩节点,该值表示压缩数据的长度;如果是非压缩节点,该值表示节点的子节点个数。
  • data[],实际存储的数据,根据节点类型不同而有所不同。
  • 压缩节点,data 数据包括子节点对应的字符、指向子节点的指针,节点为最终 key 对应的 value 指针。
  • 压缩节点,data 数据包含子节点对应的合并字符串、指向子节点的指针,以及节点为最终 key 的 value 指针。
  • value 指针指向一个 listpack 实例,里面保存了消息实际内容。

Radix Tree 最大的特点就是适合保存具有相同前缀的数据,实现节省内存的目标,以及支持范围查找。而这个就是 Stream 采用 Radix Tree 作为底层数据结构的原因。

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论