Spring Boot进阶:Redis魔法:用发布订阅功能打造高效消息队列!

2023年 9月 12日 88.9k 0

1. 前言🔥

        话说,玩过MQ的同学可能都知道【发布&订阅】模式,不就是一种消息传递方式嘛;如果没玩过,那也不打紧,下文我会简单做个科普。但是对于Redis如何实现MQ的【发布&订阅】功能?这才是问题的关键,有的同学就说“压根没玩过呀!不造” ,哈哈,bug菌既然敢写便有法子解决,诸位还请稍安勿躁,继续往下看。

        那么,具体如何实现呢?这将又会是干货满满的一期,全程无尿点不废话只抓重点教,具有非常好的学习效果,拿好小板凳准备就坐!希望学习的过程中大家认真听好好学,学习的途中有任何不清楚或疑问的地方皆可评论区留言或私信,bug菌将第一时间给予解惑,那么废话不多说,直接开整!Fighting!! 

2 . 环境说明🔥

本地的开发环境:

  • 开发工具:IDEA 2021.3
  • JDK版本: JDK 1.8
  • Spring Boot版本:2.3.1 RELEASE
  • Maven版本:3.8.2

3. 发布订阅模式🔥 

        如果有的同学实在头疼不想看枯燥文绉绉的长篇大论,可以直接跳过第3点,看第4点,手把手带你实战Redis的发布订阅模式。 

3.1 何为发布订阅模式?

        发布订阅模式,是一种消息传递模式,其中发送者(发布者)发送消息,而没有明确的接收者(订阅者)。发布者将消息发送到一个或多个主题(频道),订阅者通过订阅特定主题来接收相关消息。此模式的主要目的是解耦发布者和订阅者,使它们能够独立地进行操作。这种模式通常用于事件驱动系统,其中多个组件需要根据事件进行操作。

3.2 何为观察者模式?

        观察者模式(Observer Pattern),是一种软件设计模式,它定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象,当主题对象状态发生变化时,观察者对象会收到通知并自动更新。 

        看到这里,很多同学就很有可能会将二则搞混淆,会觉得发布订阅模式中的两个概念与观察者模式中的两个概念似乎干的是一样的事?所以?Publisher就是观察者模式中的Subject?而Subscriber就是观察者模式中的Observer?为了一探究竟,咱们接着往下看。

3.3 发布订阅模式[对比]观察者模式

        从这二者的角色任务来说确实是非常相似的,但是从这二者的实现架构上来说却是非常有区别的,有一个核心不同点!如下我用两张图来示意它两究竟不同在哪里,以此来辅助大家理解。

如下是观察者模式工作示意图:

观察者模式

 如下是发布订阅模式运作示意图:

发布订阅模式

        如上两张图,大家可以很清晰的看到一个非常大的区别:发布订阅模式在两个角色中间是一个中间角色(缓冲区)来过渡的,发布者并不直接与订阅者产生交互。

        大家回想一下【生产者消费者模式】,这个中间过渡区域对应的就是缓冲区。因为这个缓冲区的存在,发布者与订阅者的工作就可以实现大批的解耦作用。发布者不会因为订阅者处理速度慢,从而影响自己的发布任务效率,它只需要稳定&快速生产即可。而订阅者也不用太担心一时来不及处理而导致问题堆积处理不过来,因为有缓冲区在,便可以一点点排队来处理这些消息,这也就是我们所常说的“削峰填谷”。

        而我们所熟知的RabbitMQ、Kafka、RocketMQ等这些MQ中间件的本质其实就是实现【发布订阅模式】中的这个中间缓冲区。而Redis也提供了简单的发布订阅模式功能,当我们有一些简单需求的时候,选择它也再合适不错!

        届此,如果你已经对这些概念有了一定的理解与感悟,那么接下来就跟我一起动手,共同实践探索一番吧,加深你对此的掌握,正所谓:“实践出真知,行动助成长”。

4. 集成Redis 实现消息队列模式🔥 

        如下我先给同学们概括下,针对Spring Boot项目,如何使用Redis实现发布订阅模式的一些重要步骤,同学们请看:

  • 配置redis连接等信息。
  • 添加请求入口。
  • 添加消息发布类。
  • 配置消息监听类(实现MessageListener接口,重写onMessage())。
  • 添加监听容器(配置RedisMessageListenerContainer的bean)。
  • 订阅单频道。
  • 向频道发布消息。
  • 消息发布测试&消息监听。
  • 如何订阅多频道。
  •         接着我们就开始动手吧!实践出真知。

    4.1 创建Spring Boot应用

    首先,我们先创建个基础的Spring Boot项目,如果还不会点这里,此处就不详细赘述啦。

    4.2 集成Redis

    在你的pom.xml中引入redis的依赖包,示例代码如下:

            
            
                org.springframework.boot
                spring-boot-starter-data-redis
                2.4.5
            
    

    4.3 yaml 配置Redis

            配置Redis连接等相关信息,这里同学们可以闭眼cv,示例代码如下:

    #redis配置
    Spring:
      redis:
        database: 0    #Redis数据库索引(默认为0)
        host: 127.0.0.1  #redis服务器ip,由于我是搭建在本地,固指向本地ip
        port: 6379  #redis服务器连接端口
        password:    #redis服务器连接密码(默认为空)
        # 连接池配置
        jedis.pool:
          max-active: 20      #连接池最大连接数(使用负值表示没有限制)
          max-wait: -1     #连接池最大阻塞等待时间(使用负值表示没有限制)
          max-idle: 10        #连接池中的最大空闲连接
          min-idle: 0         #连接池中的最小空闲连接
          timeout: 1000      #连接超时时间(毫秒)。我设置的是1秒
    

    4.4 创建Controller 

            创建一个接口,目的是用于自定义发布消息,包括从请求中指定监听的通道及消息。 示例代码如下:

    package com.example.demo.controller;
     
    import com.example.demo.component.redis.mediat.RedisMediator;
    import com.example.demo.component.redis.message.Publisher;
    import io.swagger.annotations.Api;
    import io.swagger.annotations.ApiParam;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
     
    /**
     * redis相关控制器
     */
    @RestController
    @RequestMapping("/redis")
    @Api(tags = "redis相关控制器", description = "redis相关控制器")
    public class RedisController {
     
        @Autowired
        private Publisher publisher;
     
        /**
         * 模拟发布消息
         */
        @GetMapping("/publish")
        public String publish(@ApiParam("通道名") @RequestParam String channel,
                              @ApiParam("消息体") @RequestParam String message) {
            // 发送消息
            return publisher.sendMessage(channel, message);
        }
     
     
    }
    

    4.5 定义消息生产者

            创建一个消息生产者,注入StringRedisTemplate,用convertAndSend()进行消息发送,示例代码如下:

    package com.example.demo.component.redis.message;
     
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.stereotype.Service;
     
    /**
     * redis:消息生产者
     */
    @Service
    public class Publisher {
     
        @Autowired
        private StringRedisTemplate redisTemplate;
     
        public String sendMessage(String channel, String message) {
            try {
                //消息发送
                redisTemplate.convertAndSend(channel, message);
                return "消息发送成功了!";
            } catch (Exception e) {
                e.printStackTrace();
                return "消息发送失败了!";
            }
        }
    }
    

    4.6 创建消息接收者

            配置消息监听类,实现MessageListener接口,重写onMessage()方法,示例代码如下:

    package com.example.demo.component.redis.message;
     
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.connection.Message;
    import org.springframework.data.redis.connection.MessageListener;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.stereotype.Component;
     
    /**
     * redis:接收消息者
     */
    @Component
    public class Receiver implements MessageListener {
        private static Logger logger = LoggerFactory.getLogger(Receiver.class);
     
        @Autowired
        private StringRedisTemplate redisTemplate;
     
        @Override
        public void onMessage(Message message, byte[] bytes) {
     
            // 获取消息体
            byte[] messageBody = message.getBody();
            // 使用值序列化器转换
            Object msg = redisTemplate.getValueSerializer().deserialize(messageBody);
            // 获取监听的频道
            byte[] channelByte = message.getChannel();
            // 使用字符串序列化器转换
            Object channel = redisTemplate.getStringSerializer().deserialize(channelByte);
     
            // 收到消息的处理逻辑,演示的话这里就只做打印处理
            logger.info("---频道---: " + channel);
            // 收到消息的处理逻辑,演示的话这里就只做打印处理
            logger.info("---消息内容---: " + msg);
        }
    }
    

    4.7 配置监听容器并订阅频道

            然后,在RedisConfig中配置一些监听容器并订阅频道,代码如下:

    package com.example.demo.config;
     
    import com.example.demo.component.redis.message.Receiver;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
    import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
    import org.springframework.boot.context.properties.EnableConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.core.RedisOperations;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.data.redis.listener.ChannelTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
     
    /**
     * redis配置类
     */
    @Configuration
    @ConditionalOnClass(RedisOperations.class)
    @EnableConfigurationProperties(RedisProperties.class)
    public class RedisConfig {
     
        /**
         * 字符串Template
         *
         * @param connection redis连接
         */
        @Bean
        @ConditionalOnMissingBean(StringRedisTemplate.class)
        public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory connection) {
            StringRedisTemplate template = new StringRedisTemplate();
            template.setConnectionFactory(connection);
            return template;
        }
     
        /**
         * 配置
         */
        @Bean
        public RedisMessageListenerContainer container(RedisConnectionFactory factory,
                                                       Receiver listener) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(factory);
            //订阅频道 bug_music 和 bug_news
            //这个container可以添加多个redis频道(监听多个)
            container.addMessageListener(listener, new ChannelTopic("bug_music"));
    //        container.addMessageListener(listener, new ChannelTopic("bug_news"));
            return container;
        }
     
    }
    

    4.8 模拟消息发布及监听

            功能代码都写好了,我们接下来进行测试一下,通过Swagger、PostMan等测试途径,请求消息发布接口,这里图方便,就直接通过浏览器发起请求,例如:

            发起请求成功了,成功把对应的返回体通过str串进行了渲染,示例截图如下: 

            那消息是否监听到了呢?这里我们只需要检查下控制台的日志输出,示例截图如下: 

            很明显是成功拿到了通道为" bug_music "所发布的消息,我们继续发布一条消息,再看!

            倘若我们换个通道名继续发布消息?那么是否还能被监听到呢?

    http://localhost:8080/redis/publish?channel=bug_news&message=每日新闻.doc

            答案肯定是监听不到,因为我们在监听容器中只指定了监听通道为【bug_music】的消息,那么我想监听通道为【bug_news】的消息又要怎么实现呢?

    4.9 如何监听多频道?

    监听多个频道,这里我提供两种方式

    4.9.1 配置方式1

        /**
         * 配置
         */
        @Bean
        public RedisMessageListenerContainer container(RedisConnectionFactory factory,
                                                       Receiver listener) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(factory);
            //订阅频道 bug_music 和 bug_news
            //这个container可以添加多个redis频道(监听多个)
            container.addMessageListener(listener, new ChannelTopic("bug_music"));
            container.addMessageListener(listener, new ChannelTopic("bug_news"));
            return container;
        }
    

            你只需要对container继续添加指定通道名即可,示例代码可参考如上。分别配置了【bug_music】与【bug_news】,接着我们重启项目测试一波,验证是否能拿到两个通道的消息,测试如下:

            很明显都成功拿到了【bug_music】与【bug_news】通道所发布的消息; 

    4.9.2 配置方式2

            对比方式1,方式2是针对批量监听的情况下,总不能想监听100个通道的消息就配置100条container吧,这也太拉跨了吧。肯定是有办法的,在配置监听容器添加订阅频道时,除了使用ChannelTopic 外,还可以使用通配符的形式订阅消息频道,如可以通过订阅 【bug_*】即表示可同时订阅【bug_music】与【bug_news】,这里我还是针对Redis的监听容器进行部分代码修改,示例代码如下:

    /**
     * redis配置类
     */
    @Configuration
    @ConditionalOnClass(RedisOperations.class)
    @EnableConfigurationProperties(RedisProperties.class)
    public class RedisConfig {
     
        /**
         * 配置
         */
        @Bean
        public RedisMessageListenerContainer container(RedisConnectionFactory factory,
                                                       Receiver listener) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(factory);
     
            container.addMessageListener(listener,  patternTopic());
            return container;
        }
     
        /**
         * 订阅匹配的多个频道
         */
        @Bean
        public PatternTopic patternTopic() {
            return new PatternTopic("bug_*");
        }
     
    }
    

    经过多请求测试,示例截图结果如下:

            很明显也是成功拿到了【bug_music】与【bug_news】通道所发布的消息。

    4.10 小结

            Redis和MQ都支持订阅与发布功能,但是它们的订阅与发布有以下区别:

  • Redis的订阅与发布是同步的,而MQ的订阅与发布是异步的。在Redis中,当一个客户端发布消息时,所有订阅了该频道的客户端都会立即收到该消息,而在MQ中,发布者发布消息后,订阅者在接收到消息前需要等待中间件的处理。
  • Redis的订阅与发布是一对多的,而MQ的订阅与发布是多对多的。在Redis中,一个客户端可以订阅多个频道,一个频道也可以有多个订阅者,而在MQ中,一个发布者可以发布消息到多个队列,多个订阅者可以从同一个队列接收消息。
  • Redis的订阅与发布是短暂的,而MQ的订阅与发布是持久的。在Redis中,当一个订阅客户端断开连接后,它订阅的频道也会自动取消订阅,而在MQ中,即使订阅者下线了,消息依然可以被持久化存储,直到订阅者重新上线并接收到消息为止。
  • Redis的订阅与发布是基于主题的,而MQ的订阅与发布是基于消息队列的。在Redis中,发布者发布消息到一个主题,订阅者可以订阅该主题并接收所有消息;而在MQ中,发布者发布消息到一个队列,订阅者需要从该队列中接收消息,接收到的消息不会再次发送给其他订阅者。
  •         总的来说,Redis的订阅和发布适用于实时的消息传递,而MQ的订阅和发布适用于异步的消息传递和解耦。当然,选择适合自己的消息队列还需要结合自己的业务场景来判断决定,需要考虑其对业务的侵入性、对技术水平的要求、业务可靠性、及性能等多方面问题,综合考虑下再选择哪一种解决方案。

    5. 热文推荐🔥

    滴~如下推荐【Spring Boot 进阶篇】的学习大纲,请小伙伴们注意查收。

    Spring Boot进阶(01):Spring Boot 集成 Redis,实现缓存自由

    Spring Boot进阶(02):使用Validation进行参数校验

    Spring Boot进阶(03):如何使用MyBatis-Plus实现字段的自动填充

    Spring Boot进阶(04):如何使用MyBatis-Plus快速实现自定义sql分页

    Spring Boot进阶(05):Spring Boot 整合RabbitMq,实现消息队列服务

    Spring Boot进阶(06):Windows10系统搭建 RabbitMq Server 服务端

    Spring Boot进阶(07):集成EasyPoi,实现Excel/Word的导入导出

    Spring Boot进阶(08):集成EasyPoi,实现Excel/Word携带图片导出

    Spring Boot进阶(09):集成EasyPoi,实现Excel文件多sheet导入导出

    Spring Boot进阶(10):集成EasyPoi,实现Excel模板导出成PDF文件

    Spring Boot进阶(11):Spring Boot 如何实现纯文本转成.csv格式文件?

    Spring Boot进阶(12):Spring Boot 如何获取Excel sheet页的数量?

    Spring Boot进阶(13):Spring Boot 如何获取@ApiModelProperty(value = “序列号“, name = “uuid“)中的value值name值?

    Spring Boot进阶(14):Spring Boot 如何手动连接库并获取指定表结构?一文教会你

    Spring Boot进阶(15):根据数据库连接信息指定分页查询表结构信息

    Spring Boot进阶(16):Spring Boot 如何通过Redis实现手机号验证码功能?

    Spring Boot进阶(17):Spring Boot如何在swagger2中配置header请求头等参数信息

    Spring Boot进阶(18):SpringBoot如何使用@Scheduled创建定时任务?

    Spring Boot进阶(19):Spring Boot 整合ElasticSearch

    Spring Boot进阶(20):配置Jetty容器

    Spring Boot进阶(21):配置Undertow容器

    Spring Boot进阶(22):Tomcat与Undertow容器性能对比分析

    Spring Boot进阶(23):实现文件上传

    Spring Boot进阶(24):如何快速实现多文件上传?

    Spring Boot进阶(25):文件上传的单元测试怎么写?

    Spring Boot进阶(26):Mybatis 中 resultType、resultMap详解及实战教学

    Spring Boot进阶(27):Spring Boot 整合 kafka(环境搭建+演示)

    Spring Boot进阶(28):Jar包Linux后台启动部署及滚动日志查看,日志输出至实体文件保存

    Spring Boot进阶(29):如何正确使用@PathVariable,@RequestParam、@RequestBody等注解?不会我教你,结合Postman演示

    Spring Boot进阶(30):@RestController和@Controller 注解使用区别,实战演示

    ...

    本项目源码地址:GitHub - luoyong0603/SpringBoot-demo: 该项目代码为《滚雪球学Spring Boot》《Spring Boot进阶实战》专栏知识点讲解案例及相关源码,开源给同学们参考。

    6. 文末🔥

            如果想系统性的学习Spring Boot,小伙伴们直接订阅bug菌专门为大家创建的Spring Boot专栏《滚雪球学Spring Boot》从入门到精通,从无到有,从零到一!以知识点+实例+项目的学习模式由浅入深对Spring Boot框架进行学习&使用。

            如果你有一定的基础却又想精进Spring Boot,那么《Spring Boot进阶实战》将会是你的最好的选择;此栏进行知识点+实例+项目的学习方式全面深入框架剖析及各种高阶玩法,励志打造全网最全最新springboot学习专栏,投资学习自己性价比最高。

            本文涉及所有源代码,均已上传至github开源,供同学们一对一参考,GitHub,同时,原创开源不易,欢迎给个star🌟,想体验下被加Star的感jio,非常感谢 ❗

    相关文章

    服务器端口转发,带你了解服务器端口转发
    服务器开放端口,服务器开放端口的步骤
    产品推荐:7月受欢迎AI容器镜像来了,有Qwen系列大模型镜像
    如何使用 WinGet 下载 Microsoft Store 应用
    百度搜索:蓝易云 – 熟悉ubuntu apt-get命令详解
    百度搜索:蓝易云 – 域名解析成功但ping不通解决方案

    发布评论