Leader让我用 Redis 实现消息队列模式?我表示…😨😨

2023年 8月 23日 64.5k 0

👨‍🎓作者:bug菌
✏️博客:CSDN、掘金、infoQ、51CTO等
🎉简介:CSDN|阿里云|华为云|51CTO等社区博客专家,历届博客之星Top30,掘金年度人气作者Top40,51CTO年度博主Top12,掘金 | InfoQ | 51CTO等社区优质创作者,全网粉丝合计15w+ ;硬核微信公众号「猿圈奇妙屋」,欢迎你的加入!免费白嫖最新BAT互联网公司面试题、4000G pdf电子书籍、简历模板等海量资料。

...

✍️温馨提醒:本文字数:1999字, 阅读完需:约 5 分钟

🏆本文收录于《Spring Boot从入门到精通》,专门攻坚指数提升。

本专栏致力打造最硬核 Spring Boot 从零基础到进阶系列学习内容,🚀均为全网独家首发,打造精品专栏,专栏持续更新中…欢迎大家订阅持续学习。

环境说明:Windows10 + Idea2021.3.2 + Jdk1.8 + SpringBoot 2.3.1.RELEASE

1. 前言🔥

        话说,玩过MQ的同学可能都知道【发布&订阅】模式,不就是一种消息传递方式嘛;如果没玩过,那也不打紧,下文我会简单做个科普。但是对于Redis如何实现MQ的【发布&订阅】功能?这才是问题的关键,有的同学就说“压根没玩过呀!不造” ,哈哈,bug菌既然敢写便有法子解决,诸位还请稍安勿躁,继续往下看。

        那么,具体如何实现呢?这将又会是干货满满的一期,全程无尿点不废话只抓重点教,具有非常好的学习效果,拿好小板凳准备就坐!希望学习的过程中大家认真听好好学,学习的途中有任何不清楚或疑问的地方皆可评论区留言或私信,bug菌将第一时间给予解惑,那么废话不多说,直接开整!Fighting!! 

2. 环境说明🔥

本地的开发环境:

  • 开发工具:IDEA 2021.3
  • JDK版本: JDK 1.8
  • Spring Boot版本:2.3.1 RELEASE
  • Maven版本:3.8.2

3. 发布订阅模式🔥 

        如果有的同学实在头疼不想看枯燥文绉绉的长篇大论,可以直接跳过第3点,看第4点,手把手带你实战Redis的发布订阅模式。 

3.1 何为发布订阅模式?

        发布订阅模式,是一种消息传递模式,其中发送者(发布者)发送消息,而没有明确的接收者(订阅者)。发布者将消息发送到一个或多个主题(频道),订阅者通过订阅特定主题来接收相关消息。此模式的主要目的是解耦发布者和订阅者,使它们能够独立地进行操作。这种模式通常用于事件驱动系统,其中多个组件需要根据事件进行操作。

3.2 何为观察者模式?

        观察者模式(Observer Pattern),是一种软件设计模式,它定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象,当主题对象状态发生变化时,观察者对象会收到通知并自动更新。 

        看到这里,很多同学就很有可能会将二则搞混淆,会觉得发布订阅模式中的两个概念与观察者模式中的两个概念似乎干的是一样的事?所以?Publisher就是观察者模式中的Subject?而Subscriber就是观察者模式中的Observer?为了一探究竟,咱们接着往下看。

3.3 发布订阅模式[对比]观察者模式

        从这二者的角色任务来说确实是非常相似的,但是从这二者的实现架构上来说却是非常有区别的,有一个核心不同点!如下我用两张图来示意它两究竟不同在哪里,以此来辅助大家理解。

如下是观察者模式工作示意图:

观察者模式

 如下是发布订阅模式运作示意图:

发布订阅模式

        如上两张图,大家可以很清晰的看到一个非常大的区别:发布订阅模式在两个角色中间是一个中间角色(缓冲区)来过渡的,发布者并不直接与订阅者产生交互。

        大家回想一下【生产者消费者模式】,这个中间过渡区域对应的就是缓冲区。因为这个缓冲区的存在,发布者与订阅者的工作就可以实现大批的解耦作用。发布者不会因为订阅者处理速度慢,从而影响自己的发布任务效率,它只需要稳定&快速生产即可。而订阅者也不用太担心一时来不及处理而导致问题堆积处理不过来,因为有缓冲区在,便可以一点点排队来处理这些消息,这也就是我们所常说的“削峰填谷”。

        而我们所熟知的RabbitMQ、Kafka、RocketMQ等这些MQ中间件的本质其实就是实现【发布订阅模式】中的这个中间缓冲区。而Redis也提供了简单的发布订阅模式功能,当我们有一些简单需求的时候,选择它也再合适不错!

        届此,如果你已经对这些概念有了一定的理解与感悟,那么接下来就跟我一起动手,共同实践探索一番吧,加深你对此的掌握,正所谓:“实践出真知,行动助成长”。

4. 集成Redis 实现消息队列模式🔥 

        如下我先给同学们概括下,针对Spring Boot项目,如何使用Redis实现发布订阅模式的一些重要步骤,同学们请看:

  • 配置redis连接等信息。
  • 添加请求入口。
  • 添加消息发布类。
  • 配置消息监听类(实现MessageListener接口,重写onMessage())。
  • 添加监听容器(配置RedisMessageListenerContainer的bean)。
  • 订阅单频道。
  • 向频道发布消息。
  • 消息发布测试&消息监听。
  • 如何订阅多频道。
  •         接着我们就开始动手吧!实践出真知。 

    4.1 创建Spring Boot应用

    首先,我们先创建个基础的Spring Boot项目,如果还不会点这里,此处就不详细赘述啦。

    4.2 集成Redis

    在你的pom.xml中引入redis的依赖包,示例代码如下:

            
            
                org.springframework.boot
                spring-boot-starter-data-redis
                2.4.5
            
    

    4.3 yaml 配置Redis

            配置Redis连接等相关信息,这里同学们可以闭眼cv,示例代码如下:

    #redis配置
    Spring:
      redis:
        database: 0    #Redis数据库索引(默认为0)
        host: 127.0.0.1  #redis服务器ip,由于我是搭建在本地,固指向本地ip
        port: 6379  #redis服务器连接端口
        password:    #redis服务器连接密码(默认为空)
        # 连接池配置
        jedis.pool:
          max-active: 20      #连接池最大连接数(使用负值表示没有限制)
          max-wait: -1     #连接池最大阻塞等待时间(使用负值表示没有限制)
          max-idle: 10        #连接池中的最大空闲连接
          min-idle: 0         #连接池中的最小空闲连接
          timeout: 1000      #连接超时时间(毫秒)。我设置的是1秒
    

    4.4 创建Controller 

            创建一个接口,目的是用于自定义发布消息,包括从请求中指定监听的通道及消息。 示例代码如下:

    package com.example.demo.controller;
    
    import com.example.demo.component.redis.mediat.RedisMediator;
    import com.example.demo.component.redis.message.Publisher;
    import io.swagger.annotations.Api;
    import io.swagger.annotations.ApiParam;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * redis相关控制器
     */
    @RestController
    @RequestMapping("/redis")
    @Api(tags = "redis相关控制器", description = "redis相关控制器")
    public class RedisController {
    
        @Autowired
        private Publisher publisher;
    
        /**
         * 模拟发布消息
         */
        @GetMapping("/publish")
        public String publish(@ApiParam("通道名") @RequestParam String channel,
                              @ApiParam("消息体") @RequestParam String message) {
            // 发送消息
            return publisher.sendMessage(channel, message);
        }
    
    
    }
    

    4.5 定义消息生产者

            创建一个消息生产者,注入StringRedisTemplate,用convertAndSend()进行消息发送,示例代码如下:

    package com.example.demo.component.redis.message;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.stereotype.Service;
    
    /**
     * redis:消息生产者
     */
    @Service
    public class Publisher {
    
        @Autowired
        private StringRedisTemplate redisTemplate;
    
        public String sendMessage(String channel, String message) {
            try {
                //消息发送
                redisTemplate.convertAndSend(channel, message);
                return "消息发送成功了!";
            } catch (Exception e) {
                e.printStackTrace();
                return "消息发送失败了!";
            }
        }
    }
    

    4.6 创建消息接收者

            配置消息监听类,实现MessageListener接口,重写onMessage()方法,示例代码如下:

    package com.example.demo.component.redis.message;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.connection.Message;
    import org.springframework.data.redis.connection.MessageListener;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.stereotype.Component;
    
    /**
     * redis:接收消息者
     */
    @Component
    public class Receiver implements MessageListener {
        private static Logger logger = LoggerFactory.getLogger(Receiver.class);
    
        @Autowired
        private StringRedisTemplate redisTemplate;
    
        @Override
        public void onMessage(Message message, byte[] bytes) {
    
            // 获取消息体
            byte[] messageBody = message.getBody();
            // 使用值序列化器转换
            Object msg = redisTemplate.getValueSerializer().deserialize(messageBody);
            // 获取监听的频道
            byte[] channelByte = message.getChannel();
            // 使用字符串序列化器转换
            Object channel = redisTemplate.getStringSerializer().deserialize(channelByte);
    
            // 收到消息的处理逻辑,演示的话这里就只做打印处理
            logger.info("---频道---: " + channel);
            // 收到消息的处理逻辑,演示的话这里就只做打印处理
            logger.info("---消息内容---: " + msg);
        }
    }
    

    4.7 配置监听容器并订阅频道

            然后,在RedisConfig中配置一些监听容器并订阅频道,代码如下:

    package com.example.demo.config;
    
    import com.example.demo.component.redis.message.Receiver;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
    import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
    import org.springframework.boot.context.properties.EnableConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.core.RedisOperations;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.data.redis.listener.ChannelTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    
    /**
     * redis配置类
     */
    @Configuration
    @ConditionalOnClass(RedisOperations.class)
    @EnableConfigurationProperties(RedisProperties.class)
    public class RedisConfig {
    
        /**
         * 字符串Template
         *
         * @param connection redis连接
         */
        @Bean
        @ConditionalOnMissingBean(StringRedisTemplate.class)
        public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory connection) {
            StringRedisTemplate template = new StringRedisTemplate();
            template.setConnectionFactory(connection);
            return template;
        }
    
        /**
         * 配置
         */
        @Bean
        public RedisMessageListenerContainer container(RedisConnectionFactory factory,
                                                       Receiver listener) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(factory);
            //订阅频道 bug_music 和 bug_news
            //这个container可以添加多个redis频道(监听多个)
            container.addMessageListener(listener, new ChannelTopic("bug_music"));
    //        container.addMessageListener(listener, new ChannelTopic("bug_news"));
            return container;
        }
    
    }
    

    4.8 模拟消息发布及监听

            功能代码都写好了,我们接下来进行测试一下,通过Swagger、PostMan等测试途径,请求消息发布接口,这里图方便,就直接通过浏览器发起请求,例如:

            发起请求成功了,成功把对应的返回体通过str串进行了渲染,示例截图如下: 

            那消息是否监听到了呢?这里我们只需要检查下控制台的日志输出,示例截图如下: 

            很明显是成功拿到了通道为" bug_music "所发布的消息,我们继续发布一条消息,再看!

            倘若我们换个通道名继续发布消息?那么是否还能被监听到呢?

    http://localhost:8080/redis/publish?channel=bug_news&message=每日新闻.doc

            答案肯定是监听不到,因为我们在监听容器中只指定了监听通道为【bug_music】的消息,那么我想监听通道为【bug_news】的消息又要怎么实现呢?

    4.9 如何监听多频道?

    监听多个频道,这里我提供两种方式

    4.9.1 配置方式1

        /**
         * 配置
         */
        @Bean
        public RedisMessageListenerContainer container(RedisConnectionFactory factory,
                                                       Receiver listener) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(factory);
            //订阅频道 bug_music 和 bug_news
            //这个container可以添加多个redis频道(监听多个)
            container.addMessageListener(listener, new ChannelTopic("bug_music"));
            container.addMessageListener(listener, new ChannelTopic("bug_news"));
            return container;
        }
    

            你只需要对container继续添加指定通道名即可,示例代码可参考如上。分别配置了【bug_music】与【bug_news】,接着我们重启项目测试一波,验证是否能拿到两个通道的消息,测试如下:

            很明显都成功拿到了【bug_music】与【bug_news】通道所发布的消息; 

    4.9.2 配置方式2

            对比方式1,方式2是针对批量监听的情况下,总不能想监听100个通道的消息就配置100条container吧,这也太拉跨了吧。肯定是有办法的,在配置监听容器添加订阅频道时,除了使用ChannelTopic 外,还可以使用通配符的形式订阅消息频道,如可以通过订阅 【bug_*】即表示可同时订阅【bug_music】与【bug_news】,这里我还是针对Redis的监听容器进行部分代码修改,示例代码如下:

    /**
     * redis配置类
     */
    @Configuration
    @ConditionalOnClass(RedisOperations.class)
    @EnableConfigurationProperties(RedisProperties.class)
    public class RedisConfig {
    
        /**
         * 配置
         */
        @Bean
        public RedisMessageListenerContainer container(RedisConnectionFactory factory,
                                                       Receiver listener) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(factory);
    
            container.addMessageListener(listener,  patternTopic());
            return container;
        }
    
        /**
         * 订阅匹配的多个频道
         */
        @Bean
        public PatternTopic patternTopic() {
            return new PatternTopic("bug_*");
        }
    
    }
    

    经过多请求测试,示例截图结果如下:

            很明显也是成功拿到了【bug_music】与【bug_news】通道所发布的消息。

    4.10 小结

            Redis和MQ都支持订阅与发布功能,但是它们的订阅与发布有以下区别:

  • Redis的订阅与发布是同步的,而MQ的订阅与发布是异步的。在Redis中,当一个客户端发布消息时,所有订阅了该频道的客户端都会立即收到该消息,而在MQ中,发布者发布消息后,订阅者在接收到消息前需要等待中间件的处理。
  • Redis的订阅与发布是一对多的,而MQ的订阅与发布是多对多的。在Redis中,一个客户端可以订阅多个频道,一个频道也可以有多个订阅者,而在MQ中,一个发布者可以发布消息到多个队列,多个订阅者可以从同一个队列接收消息。
  • Redis的订阅与发布是短暂的,而MQ的订阅与发布是持久的。在Redis中,当一个订阅客户端断开连接后,它订阅的频道也会自动取消订阅,而在MQ中,即使订阅者下线了,消息依然可以被持久化存储,直到订阅者重新上线并接收到消息为止。
  • Redis的订阅与发布是基于主题的,而MQ的订阅与发布是基于消息队列的。在Redis中,发布者发布消息到一个主题,订阅者可以订阅该主题并接收所有消息;而在MQ中,发布者发布消息到一个队列,订阅者需要从该队列中接收消息,接收到的消息不会再次发送给其他订阅者。
  •         总的来说,Redis的订阅和发布适用于实时的消息传递,而MQ的订阅和发布适用于异步的消息传递和解耦。当然,选择适合自己的消息队列还需要结合自己的业务场景来判断决定,需要考虑其对业务的侵入性、对技术水平的要求、业务可靠性、及性能等多方面问题,综合考虑下再选择哪一种解决方案。

    ... ...

        ok,以上就是我这期的全部内容啦,如果还想学习更多,你可以看看如下的往期热文推荐哦,每天积累一个奇淫小知识,日积月累下去,你一定能成为令人敬仰的大佬。

    「赠人玫瑰,手留余香」,咱们下期拜拜~~

    5. 热文推荐💭

    若想学习更多,可以参考这篇专栏总结《2023最新首发,全网最全 Spring Boot 学习宝典(附思维导图)》,本专栏致力打造最硬核 Spring Boot 进阶系列学习内容,🚀均为全网独家首发,打造精品专栏,专栏持续更新中。欢迎大家订阅持续学习。

    在入门及进阶之途,我必助你一臂之力,系统性学习,从入门到精通,带你不走弯路,直奔终点;投资自己,永远性价比最高,都这么说了,你还不赶紧来学??

    本文涉及所有源代码,均已上传至github开源,供同学们一对一参考 GitHub传送门,

    同时,原创开源不易,欢迎给个star🌟,想体验下被🌟的感jio,非常感谢❗

    6. 文末💭

    我是bug菌,CSDN | 阿里云 | 华为云 | 51CTO 等社区博客专家,历届博客之星Top30,掘金年度人气作者Top40,51CTO年度博主Top12,掘金 | InfoQ | 51CTO等社区优质创作者,全网粉丝合计15w+ ;硬核微信公众号「猿圈奇妙屋」,欢迎你的加入!免费白嫖最新BAT互联网公司面试题、4000G pdf电子书籍、简历模板等海量资料。

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论