RabbitMQ延迟消费

2023年 9月 7日 67.6k 0

前言

日常开发中我们都有需要某个任务延迟执行的场景,比如说订单在一定时间内自动关闭,或者某个操作后进行相应的延迟检查...

要达成任务的延迟执行有很多方法,今天主要用RabbitMQ,并以Nestjs为示例代码进行讲解。

用到的技术

  • Docker
  • RabbitMQs
  • NestJS

这些技术就不一一介绍了,不懂的自行搜索(或者问问神奇的ChatGPT)。

RabbitMQ

基本介绍

RabbitMQ的话我是用Docker进行安装的,安装的是rabbitmq:3.7.7-management

本篇文章主要通过延迟交换机去进行消息的延迟推送,这一块需要安装一个rabbitmq-delayed-message-exchange的插件,关于插件版本的话,要注意是否适合你的rabbitmq版本,比如说我安装的是3.7的,那就要找对应的版本,不然会报错:
01.png

安装RabbitMQ插件

  • 下载对应版本的.ez文件:
  • 02.png

  • 将下载好的文件拷贝到容器中:
  • docker cp yourLocation/rabbitmq_delayed_message_exchange-3.8.0.ez yourCotainerId:/plugins
    

    当然,你可以通过Docker的volume将文件映射到容器中,我比较喜欢这种方式。

  • 执行命令进入容器:
  • docker run -it yourCotainerId /bin/bash
    
  • 执行命令安装插件:
  • rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    

    没问题的话会输出这些信息,如果有报错的话这里的started 1 plugins就不会显示:
    03.png

  • 然后退出容器,重启一下容器。
  • docker restart yourCotainerId
    

    NestJS

    应用层面的话我们用@golevelup/nestjs-rabbitmq这个库来进行消息的发送与消费。

    交换器配置

    交换器的typex-delayed-message则是使用延迟交换器,对应的消息也会被延迟消费,主要注意typearguments里的{ 'x-delayed-type': 'direct' }记得填就行。

    const mqClient = RabbitMQModule.forRootAsync(RabbitMQModule, {
      imports: [ConfigModule],
      useFactory: (configService: ConfigService) => {
        const urlInfo = configService.get('rabbitMQ.urlInfo');
        return {
          exchanges: [
            {
              name: MqExchange.DIRECT,
              type: 'direct',
              options: {
                durable: true,
              },
            },
            {
              name: MqExchange.DIRECT_DELAYED,
              type: 'x-delayed-message',
              options: {
                durable: true,
                arguments: { 'x-delayed-type': 'direct' },
              },
            },
          ],
          uri: `${urlInfo.protocol}://${urlInfo.username}:${urlInfo.password}@${urlInfo.hostname}:${urlInfo.port}/${urlInfo.vhost}`,
          channels: {
            'channel-1': {
              prefetchCount: 1,
              default: true,
            },
          },
        };
      },
      inject: [ConfigService],
    });
    

    消息推送

    消息推送的话只要在publish函数的第4个参数里加上x-delay,单位是毫秒,这个就是对应的延迟时间。

    public async push2Queue(items) {
        const res = await this.amqpConnection.publish(
          MqExchange.DIRECT_DELAYED,
          'test',
          { message: 'hello' },
          {
            headers: {
              'x-delay': 10000,
            },
          },
        );
      }
    

    消息消费

    消息消费的话没啥好说,对应的交换机、路由和队列配置好就行。

    @RabbitRPC({
        exchange: MqExchange.DIRECT_DELAYED,
        routingKey: 'test',
        queue: 'test3',
        queueOptions: {
          durable: true,
          exclusive: false,
          autoDelete: false,
        },
      })
      public async ackMessage(item) {
        console.log('delayed msg', item);
      }
    

    测试

    我们推送一条消息,在RabbitMQ的管理后台可以看到有一条延迟的消息:
    04.png
    10秒后我们会收到一条消息:

    delayed msg { message: 'hello' }
    

    总结

    实现任务延迟消费的方式还有很多,比如说RabbitMQ还可以用死信队列去实现。方法总是有很多的,找到适合你们团队的就行。

    原文地址

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论