民工哥死磕Redis教程(四):发布与订阅(pub/sub)

2023年 7月 10日 32.3k 0

什么是发布订阅?

Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。

Redis 的 subscribe 命令可以让客户端订阅任意数量的频道, 每当有新信息发送到被订阅的频道时, 信息就会被发送给所有订阅指定频道的客户端。

☛ 下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:民工哥死磕Redis教程(四):发布与订阅(pub/sub)☛ 当有新消息通过 publish 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:民工哥死磕Redis教程(四):发布与订阅(pub/sub)

为什么要用发布订阅?

熟悉消息中间件的同学都知道,针对消息订阅发布功能,市面上很多大厂使用的是kafkaRabbitMQActiveMQRocketMQ等这几种,redis的订阅发布功能跟这三者相比,相对轻量,针对数据准确和安全性要求没有那么高可以直接使用,适用于小公司。

redis 的List数据类型结构提供了 blpop 、brpop 命令结合 rpush、lpush 命令可以实现消息队列机制,基于双端链表实现的发布与订阅功能

这种方式存在两个局限性:

  • 不能支持一对多的消息分发。
  • 如果生产者生成的速度远远大于消费者消费的速度,易堆积大量未消费的消息

◇ 双端队列图解如下:民工哥死磕Redis教程(四):发布与订阅(pub/sub)

✦ 解析:双端队列模式只能有一个或多个消费者轮着去消费,却不能将消息同时发给其他消费者

◇ 发布/订阅模式图解如下:民工哥死磕Redis教程(四):发布与订阅(pub/sub)

✦ 解析:redis订阅发布模式,生产者生产完消息通过频道分发消息,给订阅了该频道的所有消费

发布/订阅如何使用?

Redis有两种发布/订阅模式:

  • 基于频道(Channel)的发布/订阅
  • 基于模式(pattern)的发布/订阅

操作命令如下民工哥死磕Redis教程(四):发布与订阅(pub/sub)

基于频道(Channel)的发布/订阅

"发布/订阅" 包含2种角色:发布者和订阅者。发布者可以向指定的频道(channel)发送消息;订阅者可以订阅一个或者多个频道(channel),所有订阅此频道的订阅者都会收到此消息。民工哥死磕Redis教程(四):发布与订阅(pub/sub)

  • 订阅者订阅频道 subscribe channel  [channel ...]
--------------------------客户端1(订阅者) :订阅频道 ---------------------
 
# 订阅 “meihuashisan” 和 “csdn” 频道(如果不存在则会创建频道)
127.0.0.1:6379> subscribe meihuashisan csdn 
Reading messages... (press Ctrl-C to quit)
 
1) "subscribe"    -- 返回值类型:表示订阅成功!
2) "meihuashisan" -- 订阅频道的名称
3) (integer) 1    -- 当前客户端已订阅频道的数量
 
1) "subscribe"
2) "csdn"
3) (integer) 2
 
#注意:订阅后,该客户端会一直监听消息,如果发送者有消息发给频道,这里会立刻接收到消息
  • 发布者发布消息 publish channel message
-----------------------客户端2(发布者):发布消息给频道 -------------------

# 给“meihuashisan”这个频道 发送一条消息:“I am meihuashisan”
127.0.0.1:6379> publish meihuashisan "I am meihuashisan"
(integer) 1  # 接收到信息的订阅者数量,无订阅者返回0

客户端2(发布者)发布消息给频道后,此时我们再来观察 客户端1(订阅者)的客户端窗口变化:

--------------------------客户端1(订阅者) :订阅频道 -----------------
127.0.0.1:6379> subscribe meihuashisan csdn 
Reading messages... (press Ctrl-C to quit)
 
1) "subscribe"    -- 返回值类型:表示订阅成功!
2) "meihuashisan" -- 订阅频道的名称
3) (integer) 1    -- 当前客户端已订阅频道的数量
 
1) "subscribe"
2) "csdn"
3) (integer) 2
 
--------------------变化如下:(实时接收到了该频道的发布者的消息)------------
1) "message"           -- 返回值类型:消息
2) "meihuashisan"      -- 来源(从哪个频道发过来的)
3) "I am meihuashisan" -- 消息内容

命令操作图解如下:

注意:如果是先发布消息,再订阅频道,不会收到订阅之前就发布到该频道的消息!民工哥死磕Redis教程(四):发布与订阅(pub/sub)注意:进入订阅状态的客户端,不能使用除了subscribeunsubscribepsubscribe 和 punsubscribe 这四个属于"发布/订阅"之外的命令,否则会报错!

这里的客户端指的是 jedis、lettuce的客户端,redis-cli是无法退出订阅状态的!更多关于 Redis 学习的文章,请参阅:NoSQL 数据库系列之 Redis ,本系列持续更新中。

实现原理

底层通过字典实现。pubsub_channels 是一个字典类型,保存订阅频道的信息:字典的key为订阅的频道, 字典的value是一个链表, 链表中保存了所有订阅该频道的客户端

struct redisServer { 
  /* General */ 
  pid_t pid; 
 
  //省略百十行 
 
  // 将频道映射到已订阅客户端的列表(就是保存客户端和订阅的频道信息)
  dict *pubsub_channels; /* Map channels to list of subscribed clients */ 
}

实现图如下:民工哥死磕Redis教程(四):发布与订阅(pub/sub)

频道订阅:订阅频道时先检查字段内部是否存在;不存在则为当前频道创建一个字典且创建一个链表存储客户端id;否则直接将客户端id插入到链表中。

取消频道订阅:取消时将客户端id从对应的链表中删除;如果删除之后链表已经是空链表了,则将会把这个频道从字典中删除。

发布:首先根据 channel 定位到字典的键, 然后将信息发送给字典值链表中的所有客户端

基于模式(pattern)的发布/订阅

如果有某个/某些模式和该频道匹配,所有订阅这个/这些频道的客户端也同样会收到信息。

图解

下图展示了一个带有频道和模式的例子, 其中 com.ahead.* 频道匹配了 com.ahead.juc 频道和 com.ahead.thread 频道, 并且有不同的客户端分别订阅它们三个,如下图:

当有信息发送到com.ahead.thread 频道时, 信息除了发送给 client 4 和 client 5 之外, 还会发送给订阅 com.ahead.* 频道模式的 client x 和 client y民工哥死磕Redis教程(四):发布与订阅(pub/sub)✦ 解析:反之也是,如果当有消息发送给 com.ahead.juc 频道,消息发送给订阅了 juc 频道的客户端之外,还会发送给订阅了 com.ahead.* 频道的客户端: client x 、client y

通配符中?表示1个占位符,*表示任意个占位符(包括0),?*表示1个以上占位符。

  • 订阅者订阅频道 psubscribe pattern [pattern ...]
--------------------------客户端1(订阅者) :订阅频道 --------------------
 
#  1. ------------订阅 “a?” "com.*" 2种模式频道--------------
127.0.0.1:6379> psubscribe a? com.*
# 进入订阅状态后处于阻塞,可以按Ctrl+C键退出订阅状态
Reading messages... (press Ctrl-C to quit) 
 
---------------订阅成功-------------------
 
1) "psubscribe"  -- 返回值的类型:显示订阅成功
2) "a?"          -- 订阅的模式
3) (integer) 1   -- 目前已订阅的模式的数量
 
1) "psubscribe"
2) "com.*"
3) (integer) 2
 
---------------接收消息 (已订阅 “a?” "com.*" 两种模式!)-----------------
 
# ---- 发布者第1条命令:publish ahead "hello"
结果:没有接收到消息,匹配失败,不满足 “a?” ,“?”表示一个占位符, a后面的head有4个占位符
 
# ---- 发布者第2条命令:  publish aa "hello" (满足 “a?”)
1) "pmessage" -- 返回值的类型:信息
2) "a?"       -- 信息匹配的模式:a?
3) "aa"       -- 信息本身的目标频道:aa
4) "hello"    -- 信息的内容:"hello"
 
# ---- 发布者第3条命令:publish com.juc "hello2"(满足 “com.*”, *表示任意个占位符)
1) "pmessage" -- 返回值的类型:信息
2) "com.*"    -- 匹配模式:com.*
3) "com.juc"  -- 实际频道:com.juc
4) "hello2"   -- 信息:"hello2"
 
---- 发布者第4条命令:publish com. "hello3"(满足 “com.*”, *表示任意个占位符)
1) "pmessage" -- 返回值的类型:信息
2) "com.*"    -- 匹配模式:com.*
3) "com."     -- 实际频道:com.
4) "hello3"   -- 信息:"hello3"
  • 发布者发布消息   publish channel message
------------------------客户端2(发布者):发布消息给频道 ------------------
 
注意:订阅者已订阅 “a?” "com.*" 两种模式!
 
# 1. ahead 不符合“a?”模式,?表示1个占位符
127.0.0.1:6379> publish ahead "hello"  
(integer) 0    -- 匹配失败,0:无订阅者
 
# 2. aa 符合“a?”模式,?表示1个占位符
127.0.0.1:6379> publish aa "hello"      
(integer) 1
 
# 3. 符合“com.*”模式,*表示任意个占位符
127.0.0.1:6379> publish com.juc "hello2" 
(integer) 1
 
# 4. 符合“com.*”模式,*表示任意个占位符
127.0.0.1:6379> publish com. "hello3" 
(integer) 1

命令操作图解如下:民工哥死磕Redis教程(四):发布与订阅(pub/sub)

实现原理

底层是pubsubPattern节点的链表。

struct redisServer {
    //...
    list *pubsub_patterns; 
    // ...
}
 
// 1303行订阅模式列表结构:
typedef struct pubsubPattern {
    client *client;  -- 订阅模式客户端
    robj *pattern;   -- 被订阅的模式
} pubsubPattern;

实现图如下:民工哥死磕Redis教程(四):发布与订阅(pub/sub)

模式订阅:新增一个pubsub_pattern数据结构添加到链表的最后尾部,同时保存客户端ID。

取消模式订阅:从当前的链表pubsub_pattern结构中删除需要取消的pubsubPattern结构。

使用小结

订阅者(listener)负责订阅频道(channel);发送者(publisher)负责向频道发送二进制的字符串消息,然后频道收到消息时,推送给订阅者。

使用场景
  • 电商中,用户下单成功之后向指定频道发送消息,下游业务订阅支付结果这个频道处理自己相关业务逻辑
  • 粉丝关注功能
  • 文章推送
使用注意
  • 客户端需要及时消费和处理消息。
    • 客户端订阅了channel之后,如果接收消息不及时,可能导致DCS实例消息堆积,当达到消息堆积阈值(默认值为32MB),或者达到某种程度(默认8MB)一段时间(默认为1分钟)后,服务器端会自动断开该客户端连接,避免导致内部内存耗尽。
  • 客户端需要支持重连。
    • 当连接断开之后,客户端需要使用subscribe或者psubscribe重新进行订阅,否则无法继续接收消息。
  • 不建议用于消息可靠性要求高的场景中。
    • Redis的pubsub不是一种可靠的消息系统。当出现客户端连接退出,或者极端情况下服务端发生主备切换时,未消费的消息会被丢弃。

更多关于 Redis 学习的文章,请参阅:NoSQL 数据库系列之 Redis ,本系列持续更新中。

深入理解

我们通过几个问题,来深入理解Redis的订阅发布机制

基于频道(Channel)的发布/订阅如何实现的?

底层是通过字典(图中的pubsub_channels)实现的,这个字典就用于保存订阅频道的信息:字典的键为正在被订阅的频道, 而字典的值则是一个链表, 链表中保存了所有订阅这个频道的客户端。

  • 数据结构

比如说,在下图展示的这个 pubsub_channels 示例中, client2 、 client5 和 client1 就订阅了 channel1 , 而其他频道也分别被别的客户端所订阅:民工哥死磕Redis教程(四):发布与订阅(pub/sub)

  • 订阅

当客户端调用 SUBSCRIBE 命令时, 程序就将客户端和要订阅的频道在 pubsub_channels 字典中关联起来。

举个例子,如果客户端 client10086 执行命令 SUBSCRIBE channel1 channel2 channel3 ,那么前面展示的 pubsub_channels 将变成下面这个样子:民工哥死磕Redis教程(四):发布与订阅(pub/sub)

  • 发布

当调用 PUBLISH channel message 命令, 程序首先根据 channel 定位到字典的键, 然后将信息发送给字典值链表中的所有客户端。

比如说,对于以下这个 pubsub_channels 实例, 如果某个客户端执行命令 PUBLISH channel1 "hello moto" ,那么 client2 、 client5 和 client1 三个客户端都将接收到 "hello moto" 信息:

  • 退订

使用 UNSUBSCRIBE 命令可以退订指定的频道, 这个命令执行的是订阅的反操作:它从 pubsub_channels 字典的给定频道(键)中, 删除关于当前客户端的信息, 这样被退订频道的信息就不会再发送给这个客户端。

基于模式(Pattern)的发布/订阅如何实现的?

底层是pubsubPattern节点的链表。

  • 数据结构 redisServer.pubsub_patterns 属性是一个链表,链表中保存着所有和模式相关的信息:
struct redisServer {
    // ...
    list *pubsub_patterns;
    // ...
};

链表中的每个节点都包含一个 redis.h/pubsubPattern 结构:

typedef struct pubsubPattern {
    redisClient *client;
    robj *pattern;
} pubsubPattern;

client 属性保存着订阅模式的客户端,而 pattern 属性则保存着被订阅的模式。

每当调用 PSUBSCRIBE 命令订阅一个模式时, 程序就创建一个包含客户端信息和被订阅模式的 pubsubPattern 结构, 并将该结构添加到 redisServer.pubsub_patterns 链表中。

作为例子,下图展示了一个包含两个模式的 pubsub_patterns 链表, 其中 client123 和 client256 都正在订阅 tweet.shop.* 模式:民工哥死磕Redis教程(四):发布与订阅(pub/sub)

  • 订阅

如果这时客户端 client10086 执行 PSUBSCRIBE broadcast.list.* , 那么 pubsub_patterns 链表将被更新成这样:民工哥死磕Redis教程(四):发布与订阅(pub/sub)通过遍历整个 pubsub_patterns 链表,程序可以检查所有正在被订阅的模式,以及订阅这些模式的客户端。

  • 发布

发送信息到模式的工作也是由 PUBLISH 命令进行的, 显然就是匹配模式获得Channels,然后再把消息发给客户端。

  • 退订

使用 PUNSUBSCRIBE 命令可以退订指定的模式, 这个命令执行的是订阅模式的反操作:程序会删除 redisServer.pubsub_patterns 链表中, 所有和被退订模式相关联的 pubsubPattern 结构, 这样客户端就不会再收到和模式相匹配的频道发来的信息。更多关于 Redis 学习的文章,请参阅:NoSQL 数据库系列之 Redis ,本系列持续更新中。

SpringBoot结合Redis发布/订阅实例?

最佳实践是通过RedisTemplate,关键代码如下:

// 发布
redisTemplate.convertAndSend("my_topic_name", "message_content");

// 配置订阅
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(xxxMessageListenerAdapter, "my_topic_name");

总结

1、redis的订阅频道的信息是redis服务器进程自己维持在pubsub_channels链表字典当中。字典的KEY为被订阅的频道,值为订阅的客户端。

2、当发送者发送消息时,redis服务器遍历频道对应的所有客户端,然后将消息发送到所订阅的客户端上。

3、当有信息发送时,除了订阅该频道的客户端会收到消息,以及和订阅了匹配频道的客户端,其它客户端是收不到该信息的。

4、退订频道、退订模式和订阅频道、订阅模式是两组反操作。

应用场景

俗话说的好,知识学得好不好,还得看用到哪。反正笔者看到redis的发布与订阅的模式的特点后,第一时间想到的是可以用来做一个实时聊天系统,还可以用来做分布式架构中写的过程,利用redis的实时发布功能,把要写入的值及时快速的分发到各个写入程序当中,保证分布式架构中数据的完整一致性。再比如博客系统和自媒体平台中,粉丝关注功能,就比如现在我的200000粉丝,当我发布文章时,就可以及时推送文章到粉丝的客户端上。总而言之,应用的场景比较多,需要大家多思考,多交流。

参考来源:blog.csdn.net/w15558056319/article/details/121490953 pdai.tech/md/db/nosql-redis/db-redis-x-pub-sub.html https://www.wenjiangs.com/doc/mt0ueji7b8sc

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论