SpringBoot整合RabbitMQ延迟队列&优先级队列详解

2023年 8月 10日 17.6k 0

延迟队列

延迟队列:简单说就是发送出去的消息经过给定的时间后,消费者才能看见消息(消费消息)。

这里简单说下步骤:

  • 创建一个队列,如:bs-queue, 设置死信交换机(死信交换机路由key(这是可选的))及队列,如:dead-exchange; 消息的消费端监听该dead-queue队列。设置消息有效期参数x-message-ttl参数(值为自己需要延迟的时间,单位:毫秒)。
  • 发送消息发送到bs-queue上。由于消息消费端监听的是死信队列,所以只需要等待指定的时间后消息会自动被转发到死信队列上(dead-queue)。
  • 消息的消费端监听dead-queu队列即可。
  • 优先级队列

    优先级队列是在RabbitMQ3.5.0之后的版本才支持的。

    具有高优先级的队列具有高的优先权,优先级高的消息具备优先被消费的特权。

    队列的优先级通过x-max-priority参数设置。

    建立一个priority-exchange交换机,类型:direct。

    图片图片

    建立一个priority-queue队列,并与priority-exchange绑定。

    图片图片

    设置x-max-priority参数的值为100,表示最大优先级为100。

    注意:x-max-priority参数的值应该介于1到255。建议使用1到10之间的队列。如果设置的优先级更大将使用更多的Erlang进程消耗更多的CPU资源。运行时调度也会受到影响。

    接下来演示优先级队列

    我们先只发送消息,然后再把消息的消费功能打开。

    发送消息接口:

    @GetMapping("/sendPriority")
    public Object sendPriority(String msg, Integer priority) {
      ms.sendPriorityQueue(msg, priority) ;
      return "success" ;
    }
    public void sendPriorityQueue(String msg, Integer priority) {
      logger.info("准备发送消息:{}", msg);
      Message message = MessageBuilder.withBody(msg.getBytes()).setPriority(priority).build() ;
      rabbitTemplate.convertAndSend("priority-exchange", "pe.msg", message) ;
    }

    发送4条消息:

    // 第一条消息
    msg=第一条消息&priority=2 
    // 第二条消息
    msg=第二条消息&priority=10
    // 第三条消息
    msg=第三条消息&priority=1
    // 第四条消息
    msg=第四条消息&priority=7

    查看消息队列:

    图片图片

    消息消费端:

    @RabbitListener(queues = { "priority-queue" })
    @RabbitHandler
    public void listenerPriority(Message message, Channel channel) {
      System.out.println("接受到消息.....income");
      byte[] body = message.getBody();
      MessageProperties mps = message.getMessageProperties();
      String content = new String(body, Charset.forName("UTF-8"));
      try {
        System.out.println("接受到消息来自交换机: 【" + mps.getReceivedExchange() + "】, 队列:【" + mps.getConsumerQueue()+ "】:n内容: " + content);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
      } catch (Exception e) {
        e.printStackTrace();
        try {
          channel.basicReject(mps.getDeliveryTag(), false);
        } catch (IOException e1) {
          e1.printStackTrace() ;
        }
      }
    }

    启动服务

    图片图片

    根据打印出的结果,正好是我们设置优先级的顺序输出。

    上面设置的消息优先级都是在指定的范围

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论