RabbitMQ 如何实现延迟队列?

2023年 9月 6日 89.6k 0

延迟队列是指当消息被发送以后,并不是立即执行,而是等待特定的时间后,消费者才会执行该消息。延迟队列的使用场景有以下几种:

  • 未按时支付的订单,30 分钟过期之后取消订单。
  • 给活跃度比较低的用户间隔 N 天之后推送消息,提高活跃度。
  • 新注册会员的用户,等待几分钟之后发送欢迎邮件等。
  • 一、如何实现延迟队列?

    延迟队列有以下两种实现方式:

    • 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能。
    • 使用官方提供的延迟插件实现延迟功能。

    早期,大部分公司都会采用第一种方式,而随着 RabbitMQ 3.5.7(2015 年底发布)的延迟插件的发布,因为其使用更简单、更方便,所以它现在才是大家普通会采用的,实现延迟队列的方式,所以本文也只讲第二种方式。

    二、实现延迟队列

    1、安装并启动延迟队列

    (1)下载延迟插件

    https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

    注意:需要根据你自己的 RabbitMQ 服务器端版本选择相同版本的延迟插件,可以在 RabbitMQ 控制台查看:

    (2)将插件放到插件目录

    接下来,将上一步下载的插件放到 RabbitMQ 服务器安装目录,如果是 docker,使用一下命令复制:

    docker cp 宿主机文件 容器名称或ID:容器目录。

    如下图所示:

    之后,进入 docker 容器,查看插件中是否包含延迟队列:

    docker exec -it 容器名称或ID /bin/bash rabbitmq-plugins list

    如下图所示:

    (3)启动插件

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange

    如下图所示:

    (4)重启RabbitMQ服务

    安装完 RabbitMQ 插件之后,需要重启 RabbitMQ 服务才能生效。如果使用的是 Docker,只需要重启 Docker 容器即可:

    docker restart 容器名称或ID

    如下图所示:

    (5)验收结果

    在 RabbitMQ 控制台查看,新建交换机时是否有延迟消息选项,如果有就说明延迟消息插件已经正常运行了,如下图所示:

    (6)手动创建延迟交换器(可选)

    此步骤可选(非必须),因为某些版本下通过程序创建延迟交换器可能会出错,如果出错了,手动创建延迟队列即可,如下图所示:

    2、编写延迟消息实现代码

    (1)配置交换器和队列

    import org.springframework.context.annotation.Configuration;
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    
    /**
     * 延迟交换器和队列
     */
    @Configuration
    public class DelayedExchangeConfig {
        public static final String EXCHANGE_NAME = "myDelayedExchange";
        public static final String QUEUE_NAME = "delayed.queue";
        public static final String ROUTING_KEY = "delayed.routing.key";
    
        @Bean
        public CustomExchange delayedExchange() {
            return new CustomExchange(EXCHANGE_NAME,
                    "x-delayed-message", // 消息类型
                    true, // 是否持久化
                    false); // 是否自动删除
        }
    
        @Bean
        public Queue delayedQueue() {
            return QueueBuilder.durable(QUEUE_NAME)
                    .withArgument("x-delayed-type", "direct")
                    .build();
        }
    
        @Bean
        public Binding delayedBinding(Queue delayedQueue,CustomExchange delayedExchange) {
            return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();
        }
    }

    (2)定义消息发送方法

    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    
    @Component
    public class DelayedMessageProducer {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Scheduled(fixedDelay = 5000)
        public void sendDelayedMessage(String message) {
            rabbitTemplate.convertAndSend(DelayedExchangeConfig.EXCHANGE_NAME,
                    DelayedExchangeConfig.ROUTING_KEY,
                    message,
                    messagePostProcessor -> {
                        messagePostProcessor.getMessageProperties().setDelay(10000); // 设置延迟时间,单位毫秒
                        return messagePostProcessor;
                    });
        }
    }

    (3)发送延迟消息

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    @RequestMapping("/delayed")
    public class DelayedMessageController {
        @Autowired
        private DelayedMessageProducer delayedMessageProducer;
    
        @GetMapping("/send")
        public String sendDirectMessage(@RequestParam String message) {
            delayedMessageProducer.sendDelayedMessage(message);
            return "Delayed message sent to Exchange: " + message;
        }
    }

    (4)接收延迟消息

    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    
    @Component
    public class DelayedMessageConsumer {
    
        @RabbitListener(queues = DelayedExchangeConfig.QUEUE_NAME)
        public void receiveDelayedMessage(String message) {
            System.out.println("Received delayed message: " + message);
        }
    }

    PS:获取本文延迟队列的实现 Demo,请加我:GG_Stone【备注:延迟队列】

    小结

    实现 RabbitMQ 延迟队列目前主流的实现方式,是采用官方提供的延迟插件来实现。而延迟插件需要先下载插件、然后配置并重启 RabbitMQ 服务,之后就可以通过编写代码的方式实现延迟队列了。

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论