RabbitMQ消息中间件

2023年 9月 25日 100.3k 0

一、安装 RabbitMQ

①:安装

安装笔记:juejin.cn/post/726153…

②:RabbitMq相关命令

1. 创建用户

sudo rabbitmqctl add_user  用户名  密码
sudo rabbitmqctl add_user Coke 123456

image.png

2. 查看所有用户

sudo rabbitmqctl list_users

image.png

3. 设置用户角色

sudo rabbitmqctl set_user_tags Coke administrator

image.png

4. 设置用户权限

sudo rabbitmqctl set_permissions Coke 读   写    配置
sudo rabbitmqctl set_permissions Coke ".*" ".*" ".*"

image.png

5. 查看用户权限

sudo rabbitmqctl list_permissions

image.png

③:RabbitMQ工作模型

image.png

broker相当于mysq1服务器,virtual host相当于数据库(可以有多个数据库)
queue相当于表,消息相当于记录。

消息队列有三个核心要素:消息生产者、消息队列、消息消费者;

  • 生产者(Producer):发送消息的应用;(java程序,也可能是别的语言写的程序)

  • 消费者(Consumer):接收消息的应用;(java程序,也可能是别的语言写的程序)

  • 代理(Broker):就是消息服务器,RabbitMQ Server就是Message Broker;

  • 连接(Connection):连接RabbitMQ服务器的TCP长连接;

  • 信道(Channel):连接中的一个虚拟通道,消息队列发送或者接收消息时,都是通过信道
    进行的;

虚拟主机(Virtual host): 一个虚拟分组,在代码中就是一个字符串,当多个不同的用
户使用同一个RabbitMQ服务时,可以划分出多个Virtual host,每个用户在自己的Virtual
host创建exchange/queue等;(分类比较清晰、相互隔离)

交换机(Exchange): 交换机负责从生产者接收消息,并根据交换机类型分发到对应的消
息队列中,起到一个路由的作用;

路由键(Routing Key): 交换机根据路由键来决定消息分发到哪个队列,路由键是消息
的目的地址;

绑定(Binding): 绑定是队列和交换机的一个关联连接(关联关系);

队列(Queue): 存储消息的缓存;

消息(Message): 由生产者通过RabbitMQ发送给消费者的信息;(消息可以任何数据,
字符串、user对象,json串等等)

二、RabbitMQ -- 交换机

1:交换机类型

1、Fanout Exchange(扇形)

2、Direct Exchange(直连)

3、Topic Exchange(主题)

4、Headers Exchange(头部)

2:扇形交换机(Fanout)

Fanout扇形的,散开的;
扇形交换机投递到所有绑定的队列,不需要路由键,不需要进行路由键的匹配,相当于广播、群发;

image.png

①:导入RabbitMQ依赖


    org.springframework.boot
    spring-boot-starter-amqp

②:配置连接信息

server:
  port: 8801

spring:
  rabbitmq:
    host: 192.168.100.134
    port: 5672
    username: Coke
    password: 123456

③:添加配置类

定义交换机、队列以及绑定队列与交换机

image.png

@Configuration
public class RabbitConfig {
    // 1. 定义交换机
    // 2. 定义队列
    // 3. 绑定队列
    public static String FANOUT_EXCHANGE_NAME = "fanout_exchange_1";
    public static String QUEUE_NAME_1 = "queue_name_1";
    public static String QUEUE_NAME_2 = "queue_name_2";
    
    /**
     * 定于交换机
     * @DateTime: 2023/8/6 22:48
     *
     * @return FanoutExchange
     * @author: Coke
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return ExchangeBuilder.fanoutExchange(FANOUT_EXCHANGE_NAME).build();
    }
    
    /**
     * 定义队列1
     * @DateTime: 2023/8/6 22:50
     *
     * @return Queue
     * @author: Coke
     */
    @Bean
    public Queue queue1(){
        return QueueBuilder.durable(QUEUE_NAME_1).build();
    }
    
    
    
    /**
     * 定义队列2
     * @DateTime: 2023/8/6 22:51
     *
     * @return Queue
     * @author: Coke
     */
    @Bean
    public Queue queue2(){
        return QueueBuilder.durable(QUEUE_NAME_2).build();
    }
    
    /**
     * 交换机 与 队列1 绑定
     * @DateTime: 2023/8/6 22:52
     *
     * @param fanoutExchange:
     * @param queue1:
     * @return Binding
     * @author: Coke
     */
    @Bean
    public Binding binding1(FanoutExchange fanoutExchange, Queue queue1){
        return BindingBuilder.bind(queue1).to(fanoutExchange);
    }
    
    /**
     * 交换机 与 队列2 绑定
     * @DateTime: 2023/8/6 22:52
     *
     * @param fanoutExchange:
     * @param queue2:
     * @return Binding
     * @author: Coke
     */
    @Bean
    public Binding binding2(FanoutExchange fanoutExchange, Queue queue2){
        return BindingBuilder.bind(queue2).to(fanoutExchange);
    }
}

④:发送消息

RabbitMQ消息中间件-1

@Component
@Slf4j
public class SendMessage {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendMessage(){
        // 定义发送消息
        Message message = new Message("这是扇形交换机发送的1条消息".getBytes());
        rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE_NAME,"",message);
        log.info("消息发送成功!{}",new Date());
    }
}

⑤:配置启动程序后 发送消息

@SpringBootApplication
public class Rabbit01FanoutApplication implements ApplicationRunner {
    @Autowired
    private SendMessage sendMessage;
    
    public static void main (String[] args) {
        SpringApplication.run(Rabbit01FanoutApplication.class, args);
    }
    
    /**
     * 程序启动时发送消息(程序一启动就会运行该方法)
     * @DateTime: 2023/8/6 22:59
     *
     * @param args:
     * @return void
     * @author: Coke
     */
    @Override
    public void run (ApplicationArguments args) throws Exception {
        sendMessage.sendMessage();
    }
}

启动程序(消息发送成功)

image.png

每个队列中都有一条消息

image.png

⑥:接收消息

这里新建一个模块用来接收消息(一般情况生产者和消费者都是分开的)

RabbitMQ消息中间件-2

01. 引入依赖和配置

引入依赖和配置和发送端一样的(改一下端口号即可)

02. 接收消息

创建ReceiveMessage接收消息的类

@Slf4j
public class ReceiveMessage {
    
    public static final String QUEUE_NAME_1 = "queue_name_1";
    public static final String QUEUE_NAME_2 = "queue_name_2";
    
    /**
     * 接收消息
     * @DateTime: 2023/8/13 15:53
     *
     * @param message:
     * @return void
     * @author: Coke
     */
   // queues = {QUEUE_NAME_1, QUEUE_NAME_2}  指定需要接收哪个队列中的消息(可以是多个)
    @RabbitListener (queues = {QUEUE_NAME_1, QUEUE_NAME_2})
    public void receiveMessage (Message message) {
        byte[] body = message.getBody();
        String msg = new String(body);
        log.info("{} 接收到一条消息:{}", new Date(), msg);
    }
}

启动项目即可

  • 接收到了两条消息

image.png

  • 队列中已经没有消息了

image.png

3:直连交换机(Direct)

RabbitMQ消息中间件-3

RabbitMQ消息中间件-4

①:引入依赖和配置

引入依赖和配置和扇形交换机一样的(改一下端口号即可)

②:添加配置类

定义交换机、队列以及绑定队列与交换机

@Configuration
public class RabbitConfig {
    // 交换机和对队列名
    public static String DIRECT_EXCHANGE_NAME = "direct_exchange_1";
    public static String DIRECT_QUEUE_1 = "direct_queue_1";
    public static String DIRECT_QUEUE_2 = "direct_queue_2";
    
    /**
     * 定义直连交换机
     * @DateTime: 2023/8/13 21:22
     *
     * @return DirectExchange
     * @author: Coke
     */
    @Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange(DIRECT_EXCHANGE_NAME).build();
    }
    
    /**
     * 定义队列1
     * @DateTime: 2023/8/13 21:23
     *
     * @return Queue
     * @author: Coke
     */
    @Bean
    public Queue queue1(){
        return QueueBuilder.durable(DIRECT_QUEUE_1).build();
    }
    
    
    /**
     * 定义队列2
     * @DateTime: 2023/8/13 21:23
     *
     * @return Queue
     * @author: Coke
     */
    @Bean
    public Queue queue2(){
        return QueueBuilder.durable(DIRECT_QUEUE_2).build();
    }
    
    // 绑定交换机与队列1 key= "error"
    @Bean
    public Binding bindingQueue1(DirectExchange directExchange, Queue queue1){
        return BindingBuilder.bind(queue1).to(directExchange).with("error");
    }
    
    // 绑定交换机与队列2 key= "info"
    @Bean
    public Binding bindingQueue2(DirectExchange directExchange, Queue queue2){
        return BindingBuilder.bind(queue2).to(directExchange).with("info");
    }
    
    // 绑定交换机与队列2 key= "error"
    @Bean
    public Binding bindingQueue3(DirectExchange directExchange, Queue queue2){
        return BindingBuilder.bind(queue2).to(directExchange).with("error");
    }
    
    // 绑定交换机与队列2 key= "warning"
    @Bean
    public Binding bindingQueue4(DirectExchange directExchange, Queue queue2){
        return BindingBuilder.bind(queue2).to(directExchange).with("warning");
    }
}

③:发送消息

RabbitMQ消息中间件-5

@Slf4j
@Component
public class SendMessage {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendMsg(){
        // 创建发送的消息体
        String msg = "这是直连交换机的一条消息";
        Message message = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8)).build();
        // 发送
        // convertAndSend(String exchange, String routingKey, Object message)
        rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE_NAME, "error", message);
        log.info("消息发送成功:{}", new Date());
    }
}

④:配置启动程序后 发送消息

@SpringBootApplication
public class Rabbit02DirectApplication implements ApplicationRunner {
    
    @Autowired
    private SendMessage sendMessage;
    
    
    public static void main (String[] args) {
        SpringApplication.run(Rabbit02DirectApplication.class, args);
    }

    @Override
    public void run (ApplicationArguments args) throws Exception {
        sendMessage.sendMsg();
    }
}

消息发送成功,配置的key="error" 所以两个队列都可以接收到消息"

RabbitMQ消息中间件-6

RabbitMQ消息中间件-7

⑤:改变key="info"再次测试

RabbitMQ消息中间件-8

消息发送成功,配置的key="info" 所以只有队列2可以接收到消息"
RabbitMQ消息中间件-9

RabbitMQ消息中间件-10

4:主题交换机(Topic)

RabbitMQ消息中间件-11

image.png

①:引入依赖和配置

引入依赖和配置


   org.springframework.boot
   spring-boot-starter-amqp

server:
  port: 8084

spring:
  rabbitmq:
    host: 192.168.100.134
    port: 5672
    username: Coke
    password: 123456

rabbit:
  exchangeName: exchange.topic
  queueName1: queue.topic.1
  queueName2: queue.topic.2

RabbitMQ消息中间件-12

②:添加配置类

@Configuration
public class RabbitConfig {
    
    @Value ("rabbit.exchangeName")
    private String exchangeName;
    
    
    @Value ("${rabbit.queueName1}")
    private String queueName1;
    
    @Value ("${rabbit.queueName2}")
    private String queueName2;
    
    // 定义交换机
    @Bean
    public TopicExchange topicExchange(){
        return ExchangeBuilder.topicExchange(exchangeName).build();
    }
    // 定义队列1
    @Bean
    public Queue queue1(){
        return QueueBuilder.durable(queueName1).build();
    }
    // 定义队列2
    @Bean
    public Queue queue2(){
        return QueueBuilder.durable(queueName2).build();
    }
    
    // 绑定队列与交换机
    @Bean
    public Binding binding1(Queue queue1, TopicExchange topicExchange){
        return BindingBuilder.bind(queue1).to(topicExchange).with("*.orange.*");
    }
    
    @Bean
    public Binding binding2(Queue queue2, TopicExchange topicExchange){
        return BindingBuilder.bind(queue2).to(topicExchange).with("*.*.rabbit");
    }
    
    @Bean
    public Binding binding3(Queue queue2, TopicExchange topicExchange){
        return BindingBuilder.bind(queue2).to(topicExchange).with("lazy.#");
    }
}

③:发送消息

@Service
@Slf4j
public class MessageService {
    
    @Value ("${rabbit.exchangeName}")
    private String exchangeName;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendMsg(){
        Message message = MessageBuilder.withBody("主题交换机".getBytes(StandardCharsets.UTF_8)).build();
        // 参数 (交换机   路由key  消息体)
        rabbitTemplate.convertAndSend(exchangeName, "lazy.orange.rabbit", message);
        log.info("消息发送成!");
    }
}

④:配置启动程序后 发送消息

@SpringBootApplication
public class Rabbit03TopicApplication implements ApplicationRunner {
    @Autowired
    private MessageService messageService;
    
    public static void main (String[] args) {
        SpringApplication.run(Rabbit03TopicApplication.class, args);
    }
    
    @Override
    public void run (ApplicationArguments args) throws Exception {
        messageService.sendMsg();
    }
}

消息发送成功,配置的key="lazy.orange.rabbit" 所以两个队列都可以接收到消息"

RabbitMQ消息中间件-13

RabbitMQ消息中间件-14

5:头部交换机(Headers)

RabbitMQ消息中间件-15

①:引入依赖和配置

引入依赖和配置


   org.springframework.boot
   spring-boot-starter-amqp

server:
  port: 8085

spring:
  rabbitmq:
    host: 192.168.100.134
    port: 5672
    username: Coke
    password: 123456

rabbit:
  exchangeName: exchange.headers
  queueName1: queue.headers.1
  queueName2: queue.headers.2

RabbitMQ消息中间件-16

②:添加配置类

@Configuration
public class RabbitConfig {
    
    @Value("${rabbit.exchangeName}")
    private String exchangeName;
    
    @Value("${rabbit.queueName1}")
    private String queueName1;
    
    @Value("${rabbit.queueName2}")
    private String queueName2;
    
    @Bean
    public HeadersExchange headersExchange(){
        return ExchangeBuilder.headersExchange(exchangeName).build();
    }
    
    
    @Bean
    public Queue queue1(){
        return QueueBuilder.durable(queueName1).build();
    }
    
    @Bean
    public Queue queue2(){
        return QueueBuilder.durable(queueName2).build();
    }
    
    @Bean
    public Binding bindingA(HeadersExchange headersExchange, Queue queue1){
        HashMap map = new HashMap();
        map.put("type", "m");
        map.put("status",1);
        return BindingBuilder.bind(queue1).to(headersExchange).whereAll(map).match();
    }
    
    @Bean
    public Binding bindingB(HeadersExchange headersExchange, Queue queue2){
        HashMap map = new HashMap();
        map.put("type", "s");
        map.put("status",0);
        return BindingBuilder.bind(queue2).to(headersExchange).whereAll(map).match();
    }
    
}

③:发送消息

@Service
public class MessageService {
    
    @Value("${rabbit.exchangeName}")
    private String exchangeName;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendMsg () {
        // 设置消息头属性与队列 2 一样
        MessageProperties properties = new MessageProperties();
        properties.setHeader("type", "s");
        properties.setHeader("status", 0);
    
        Message message = MessageBuilder.withBody("这是头部交换机的一个消息".getBytes())
              .andProperties(properties).build();
        // 头部交换机 可以不设置路由key
        rabbitTemplate.convertAndSend(exchangeName, "", message);
    }
}

④:配置启动程序后 发送消息

@SpringBootApplication
@Slf4j
public class Rabbit04HeadersApplication implements ApplicationRunner {
    @Autowired
    private MessageService messageService;
    
    public static void main (String[] args) {
        SpringApplication.run(Rabbit04HeadersApplication.class, args);
    }
    
    
    @Override
    public void run (ApplicationArguments args) throws Exception {
        messageService.sendMsg();
        log.info("消息发送成功!{}", new Date());
    }
}

⑤:效果

只有队列2收到了消息

RabbitMQ消息中间件-17

三、RabbitMQ过期消息

1. 设置单条消息的过期时间

①:引入依赖和配置

引入依赖和配置


   org.springframework.boot
   spring-boot-starter-amqp

server:
  port: 8086

spring:
  rabbitmq:
    host: 192.168.100.134
    port: 5672
    username: Coke
    password: 123456

rabbit:
  exchangeName: exchange.ttl1
  queueName: queue.ttl.1

image.png

②:添加配置类

@Configuration
public class RabbitConfig {
    
    @Value ("${rabbit.exchangeName}")
    private String exchangeName;
    
    
    @Value ("${rabbit.queueName}")
    private String queueName;
    
    
    // 定义交换机
    @Bean
    public TopicExchange topicExchange(){
        return ExchangeBuilder.topicExchange(exchangeName).build();
    }
    // 定义队列
    @Bean
    public Queue queue(){
        return QueueBuilder.durable(queueName).build();
    }

    
    // 绑定队列与交换机
    @Bean
    public Binding binding1(Queue queue, TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with("info");
    }
}

③:发送消息

@Service
@Slf4j
public class MessageService {
    
    @Value ("${rabbit.exchangeName}")
    private String exchangeName;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendMsg(){
        MessageProperties properties = new MessageProperties();
        // 设置消息过期时间为20秒
        properties.setExpiration("20000");
    
        Message message = MessageBuilder.withBody("这是一个带有过期时间的消息".getBytes(StandardCharsets.UTF_8))
              .andProperties(properties).build();
        // 参数 (交换机   路由key  消息体)
        rabbitTemplate.convertAndSend(exchangeName, "info", message);
        log.info("消息发送成!{}", new Date());
    }
}

④:配置启动程序后发送消息

@SpringBootApplication
public class Rabbit03TopicApplication implements ApplicationRunner {
    @Autowired
    private MessageService messageService;
    
    public static void main (String[] args) {
        SpringApplication.run(Rabbit03TopicApplication.class, args);
    }
    
    @Override
    public void run (ApplicationArguments args) throws Exception {
        messageService.sendMsg();
    }
}

⑤:效果

RabbitMQ消息中间件-18

1.可以看到队列中有一条消息

RabbitMQ消息中间件-19

2. 查看消息的属性

RabbitMQ消息中间件-20

3.时间到后如果消息没有被取出来就会自动删除

image.png

2. 通过队列属性设置消息的过期时间

①:引入依赖和配置

引入依赖和配置


   org.springframework.boot
   spring-boot-starter-amqp

server:
  port: 8087

spring:
  rabbitmq:
    host: 192.168.100.134
    port: 5672
    username: Coke
    password: 123456

rabbit:
  exchangeName: exchange.ttl2
  queueName: queue.ttl.2

image.png

②:添加配置类

@Configuration
public class RabbitConfig {
    
    @Value ("${rabbit.exchangeName}")
    private String exchangeName;
    
    @Value ("${rabbit.queueName}")
    private String queueName;
    
    // 定义交换机
    @Bean
    public TopicExchange topicExchange(){
        return ExchangeBuilder.topicExchange(exchangeName).build();
    }
    // 定义队列
    @Bean
    public Queue queue(){
        return QueueBuilder.durable(queueName).ttl(20000).build();
    }
    
    // 绑定队列与交换机
    @Bean
    public Binding binding1(Queue queue, TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with("info");
    }
}

③:发送消息

@Service
@Slf4j
public class MessageService {
    
    @Value ("${rabbit.exchangeName}")
    private String exchangeName;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendMsg(){
        MessageProperties properties = new MessageProperties();
        // 设置消息过期时间为20秒
        properties.setExpiration("20000");
    
        Message message = MessageBuilder.withBody("这是一个带有过期时间的消息".getBytes(StandardCharsets.UTF_8))
              .andProperties(properties).build();
        // 参数 (交换机   路由key  消息体)
        rabbitTemplate.convertAndSend(exchangeName, "info", message);
        log.info("消息发送成!{}", new Date());
    }
}

④:配置启动程序后发送消息

@SpringBootApplication
public class Rabbit03TopicApplication implements ApplicationRunner {
    @Autowired
    private MessageService messageService;
    
    public static void main (String[] args) {
        SpringApplication.run(Rabbit03TopicApplication.class, args);
    }
    
    @Override
    public void run (ApplicationArguments args) throws Exception {
        messageService.sendMsg();
    }
}

⑤:效果

image.png

1.可以看到队列中有一条消息

RabbitMQ消息中间件-21

2. 查看队列和消息的属性

RabbitMQ消息中间件-22

RabbitMQ消息中间件-23

3.时间到后如果消息没有被取出来就会自动删除

RabbitMQ消息中间件-24

四、RabbitMQ死信队列

image.png

1. 队列过期

①:引入依赖和配置

引入依赖和配置


   org.springframework.boot
   spring-boot-starter-amqp

server:
  port: 8089

spring:
  rabbitmq:
    host: 192.168.100.134
    port: 5672
    username: Coke
    password: 123456

rabbit:
  exchangeNormalName: exchange.normal.1 # 正常交换机
  queueNormalName: queue.normal.1  # 正常队列
  exchangeDlxName: exchange.dlx.1 # 死信交换机
  queueDlxName: queue.dlx.1  # 死信队列

image.png

②:添加配置类

@Configuration
public class RabbitConfig {
    
    @Value ("${rabbit.exchangeNormalName}")
    private String exchangeNormalName;
    
    @Value ("${rabbit.queueNormalName}")
    private String queueNormalName;
    
    @Value ("${rabbit.exchangeDlxName}")
    private String exchangeDlxName;
    
    @Value ("${rabbit.queueDlxName}")
    private String queueDlxName;
    
    // 定义交换机
    @Bean
    public DirectExchange topicNormalExchange(){
        return ExchangeBuilder.directExchange(exchangeNormalName).build();
    }
    
    @Bean
    public DirectExchange topicDlxExchange(){
        return ExchangeBuilder.directExchange(exchangeDlxName).build();
    }
    
    // 定义队列
    @Bean
    public Queue queueNormal(){
        HashMap map = new HashMap();
        map.put("x-message-ttl",20000); // 设置队列的过期时间 (20秒)
        map.put("x-dead-letter-exchange",exchangeDlxName); // 设置 队列的死信交换机(消息过期后进入死信交换机)
        map.put("x-dead-letter-routing-key","error"); // 设置死信路由key
        return QueueBuilder.durable(queueNormalName).withArguments(map).build();
    }
    
    @Bean
    public Queue queueDlx(){
        return QueueBuilder.durable(queueDlxName).build();
    }
    
    // 绑定队列与交换机
    @Bean
    public Binding binding1(Queue queueNormal, DirectExchange topicNormalExchange){
        return BindingBuilder.bind(queueNormal).to(topicNormalExchange).with("order");
    }
    
    @Bean
    public Binding binding2(Queue queueDlx, DirectExchange topicDlxExchange){
        return BindingBuilder.bind(queueDlx).to(topicDlxExchange).with("error");
    }
}

③:发送消息

@Service
@Slf4j
public class MessageService {
    
    @Value ("${rabbit.exchangeNormalName}")
    private String exchangeNormalName;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendMsg(){
        Message message = MessageBuilder.withBody("这是一个带有过期时间的死信消息".getBytes(StandardCharsets.UTF_8)).build();
        // 参数 (交换机   路由key  消息体)
        rabbitTemplate.convertAndSend(exchangeNormalName, "order", message);
        log.info("消息发送成!{}", new Date());
    }
}

④:配置启动程序后发送消息

@SpringBootApplication
public class Rabbit03TopicApplication implements ApplicationRunner {
    @Autowired
    private MessageService messageService;
    
    public static void main (String[] args) {
        SpringApplication.run(Rabbit03TopicApplication.class, args);
    }
    
    @Override
    public void run (ApplicationArguments args) throws Exception {
        messageService.sendMsg();
    }
}

⑤:效果

  • 1. 消息发送成功

RabbitMQ消息中间件-25

  • 2. 消息进入到正常队列中

RabbitMQ消息中间件-26

  • 3. 20秒后消息进入到死信队列中

image.png

2. 消息过期

①:引入依赖和配置

引入依赖和配置


   org.springframework.boot
   spring-boot-starter-amqp

server:
  port: 8090

spring:
  rabbitmq:
    host: 192.168.100.134
    port: 5672
    username: Coke
    password: 123456

rabbit:
  exchangeNormalName: exchange.normal.2 # 正常交换机
  queueNormalName: queue.normal.2  # 正常队列
  exchangeDlxName: exchange.dlx.2 # 死信交换机
  queueDlxName: queue.dlx.2  # 死信队列

image.png

②:添加配置类

@Configuration
public class RabbitConfig {
    
    @Value ("${rabbit.exchangeNormalName}")
    private String exchangeNormalName;
    
    @Value ("${rabbit.queueNormalName}")
    private String queueNormalName;
    
    @Value ("${rabbit.exchangeDlxName}")
    private String exchangeDlxName;
    
    @Value ("${rabbit.queueDlxName}")
    private String queueDlxName;
    
    // 定义交换机
    @Bean
    public DirectExchange topicNormalExchange(){
        return ExchangeBuilder.directExchange(exchangeNormalName).build();
    }
    
    @Bean
    public DirectExchange topicDlxExchange(){
        return ExchangeBuilder.directExchange(exchangeDlxName).build();
    }
    
    // 定义队列
    @Bean
    public Queue queueNormal(){
        HashMap map = new HashMap();
        map.put("x-dead-letter-exchange",exchangeDlxName); // 设置 队列的死信交换机(消息过期后进入死信交换机)
        map.put("x-dead-letter-routing-key","error"); // 设置死信路由key
        return QueueBuilder.durable(queueNormalName).withArguments(map).build();
    }
    
    @Bean
    public Queue queueDlx(){
        return QueueBuilder.durable(queueDlxName).build();
    }
    
    // 绑定队列与交换机
    @Bean
    public Binding binding1(Queue queueNormal, DirectExchange topicNormalExchange){
        return BindingBuilder.bind(queueNormal).to(topicNormalExchange).with("order");
    }
    
    @Bean
    public Binding binding2(Queue queueDlx, DirectExchange topicDlxExchange){
        return BindingBuilder.bind(queueDlx).to(topicDlxExchange).with("error");
    }
}

③:发送消息

@Service
@Slf4j
public class MessageService {
    
    @Value ("${rabbit.exchangeNormalName}")
    private String exchangeNormalName;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendMsg () {
        MessageProperties properties = new MessageProperties();
        properties.setExpiration("20000"); // 设置消息过期时间为20秒
        Message message = MessageBuilder.withBody("这是一个带有过期时间的死信消息".getBytes(StandardCharsets.UTF_8)).andProperties(properties).build();
        // 参数 (交换机   路由key  消息体)
        rabbitTemplate.convertAndSend(exchangeNormalName, "order", message);
        log.info("消息发送成!{}", new Date());
    }
}

④:配置启动程序后发送消息

@SpringBootApplication
public class Rabbit03TopicApplication implements ApplicationRunner {
    @Autowired
    private MessageService messageService;
    
    public static void main (String[] args) {
        SpringApplication.run(Rabbit03TopicApplication.class, args);
    }
    
    @Override
    public void run (ApplicationArguments args) throws Exception {
        messageService.sendMsg();
    }
}

⑤:效果

  • 1. 消息发送成功

RabbitMQ消息中间件-27

  • 2. 消息进入到正常队列中

image.png

  • 3. 20秒后消息进入到死信队列中

RabbitMQ消息中间件-28

3. 队列达到最大长度

①:引入依赖和配置

引入依赖和配置


   org.springframework.boot
   spring-boot-starter-amqp

server:
  port: 8090

spring:
  rabbitmq:
    host: 192.168.100.134
    port: 5672
    username: Coke
    password: 123456

rabbit:
  exchangeNormalName: exchange.normal.2 # 正常交换机
  queueNormalName: queue.normal.2  # 正常队列
  exchangeDlxName: exchange.dlx.2 # 死信交换机
  queueDlxName: queue.dlx.2  # 死信队列

image.png

②:添加配置类

@Configuration
public class RabbitConfig {
    
    @Value ("${rabbit.exchangeNormalName}")
    private String exchangeNormalName;
    
    @Value ("${rabbit.queueNormalName}")
    private String queueNormalName;
    
    @Value ("${rabbit.exchangeDlxName}")
    private String exchangeDlxName;
    
    @Value ("${rabbit.queueDlxName}")
    private String queueDlxName;
    
    // 定义交换机
    @Bean
    public DirectExchange topicNormalExchange(){
        return ExchangeBuilder.directExchange(exchangeNormalName).build();
    }
    
    @Bean
    public DirectExchange topicDlxExchange(){
        return ExchangeBuilder.directExchange(exchangeDlxName).build();
    }
    
    // 定义队列
    @Bean
    public Queue queueNormal(){
        HashMap map = new HashMap();
        map.put("x-dead-letter-exchange",exchangeDlxName); // 设置 队列的死信交换机(消息过期后进入死信交换机)
        map.put("x-dead-letter-routing-key","error"); // 设置死信路由key
        return QueueBuilder.durable(queueNormalName).withArguments(map).build();
    }
    
    @Bean
    public Queue queueDlx(){
        return QueueBuilder.durable(queueDlxName).build();
    }
    
    // 绑定队列与交换机
    @Bean
    public Binding binding1(Queue queueNormal, DirectExchange topicNormalExchange){
        return BindingBuilder.bind(queueNormal).to(topicNormalExchange).with("order");
    }
    
    @Bean
    public Binding binding2(Queue queueDlx, DirectExchange topicDlxExchange){
        return BindingBuilder.bind(queueDlx).to(topicDlxExchange).with("error");
    }
}

③:发送消息

@Service
@Slf4j
public class MessageService {
    
    @Value ("${rabbit.exchangeNormalName}")
    private String exchangeNormalName;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendMsg () {
        MessageProperties properties = new MessageProperties();
        properties.setExpiration("20000"); // 设置消息过期时间为20秒
        Message message = MessageBuilder.withBody("这是一个带有过期时间的死信消息".getBytes(StandardCharsets.UTF_8)).andProperties(properties).build();
        // 参数 (交换机   路由key  消息体)
        rabbitTemplate.convertAndSend(exchangeNormalName, "order", message);
        log.info("消息发送成!{}", new Date());
    }
}

④:配置启动程序后发送消息

@SpringBootApplication
public class Rabbit03TopicApplication implements ApplicationRunner {
    @Autowired
    private MessageService messageService;
    
    public static void main (String[] args) {
        SpringApplication.run(Rabbit03TopicApplication.class, args);
    }
    
    @Override
    public void run (ApplicationArguments args) throws Exception {
        messageService.sendMsg();
    }
}

⑤:效果

  • 1. 消息发送成功

image.png

  • 2. 有5条消息保留在正常队列中 有3条消息保留在死信队列中

RabbitMQ消息中间件-29

  • 3. 队列(先进先出)进入死信队列的3条消息是前3条

RabbitMQ消息中间件-30

4. 消费者拒绝消息不进行重新投递

①:生产者配置


   org.springframework.boot
   spring-boot-starter-amqp

server:
  port: 8092

spring:
  rabbitmq:
    host: 192.168.100.134
    port: 5672
    username: Coke
    password: 123456

rabbit:
  exchangeNormalName: exchange.normal.4 # 正常交换机
  queueNormalName: queue.normal.4  # 正常队列
  exchangeDlxName: exchange.dlx.4 # 私信交换机
  queueDlxName: queue.dlx.4  # 私信队列

②:添加配置类

package com.it.rabbit.config;


import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

/**
 * TODO
 *
 * @author: Coke
 * @DateTime: 2023/08/20/19:15
 **/
@Configuration
public class RabbitConfig {
    
    @Value ("${rabbit.exchangeNormalName}")
    private String exchangeNormalName;
    
    @Value ("${rabbit.queueNormalName}")
    private String queueNormalName;
    
    @Value ("${rabbit.exchangeDlxName}")
    private String exchangeDlxName;
    
    @Value ("${rabbit.queueDlxName}")
    private String queueDlxName;
    
    // 定义交换机
    @Bean
    public DirectExchange topicNormalExchange(){
        return ExchangeBuilder.directExchange(exchangeNormalName).build();
    }
    
    @Bean
    public DirectExchange topicDlxExchange(){
        return ExchangeBuilder.directExchange(exchangeDlxName).build();
    }
    
    // 定义队列
    @Bean
    public Queue queueNormal(){
        HashMap map = new HashMap();
        map.put("x-dead-letter-exchange",exchangeDlxName); // 设置 队列的死信交换机(消息过期后进入私信交换机)
        map.put("x-dead-letter-routing-key","error"); // 设置死信路由key
        return QueueBuilder.durable(queueNormalName).withArguments(map).build();
    }
    
    @Bean
    public Queue queueDlx(){
        return QueueBuilder.durable(queueDlxName).build();
    }
    
    // 绑定队列与交换机
    @Bean
    public Binding binding1(Queue queueNormal, DirectExchange topicNormalExchange){
        return BindingBuilder.bind(queueNormal).to(topicNormalExchange).with("order");
    }
    
    @Bean
    public Binding binding2(Queue queueDlx, DirectExchange topicDlxExchange){
        return BindingBuilder.bind(queueDlx).to(topicDlxExchange).with("error");
    }
}

③:发送消息

package com.it.rabbit.service;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;
import java.util.Date;

/**
 * TODO
 *
 * @author: Coke
 * @DateTime: 2023/08/20/19:25
 **/
@Service
@Slf4j
public class MessageService {
    
    @Value ("${rabbit.exchangeNormalName}")
    private String exchangeNormalName;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendMsg () {
            String sir = "这是第1个消息";
            Message message = MessageBuilder.withBody(sir.getBytes(StandardCharsets.UTF_8)).build();
            // 参数 (交换机   路由key  消息体)
            rabbitTemplate.convertAndSend(exchangeNormalName, "order", message);
            log.info("消息发送成!{}", new Date());
    }
}

④:配置启动类发送消息

@SpringBootApplication
public class Rabbit03TopicApplication implements ApplicationRunner {
    @Autowired
    private MessageService messageService;
    
    public static void main (String[] args) {
        SpringApplication.run(Rabbit03TopicApplication.class, args);
    }
    
    @Override
    public void run (ApplicationArguments args) throws Exception {
        messageService.sendMsg();
    }
}

⑤:消费者配置


   org.springframework.boot
   spring-boot-starter-amqp

server:
  port: 8092

spring:
  rabbitmq:
    host: 192.168.100.134
    port: 5672
    username: Coke
    password: 123456
    listener:
      simple:
        acknowledge-mode: manual # 开启消费者手动确认

⑥:接收消息

package com.it.receiver.serive;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.xml.crypto.Data;
import java.io.IOException;
import java.util.Date;

/**
 * 接收消息
 *
 * @author: Coke
 * @DateTime: 2023/09/16/16:33
 **/

@Component
@Slf4j
public class ReceiverMsg {
    @RabbitListener (queues = {"queue.normal.4"})
    public void receiverMsg (Message message, Channel channel) throws IOException {
        // 获取消息属性
        MessageProperties properties = message.getMessageProperties();
        // 获取消息的唯一标识(类似身份证号)
        long tag = properties.getDeliveryTag();
        try {
            byte[] body = message.getBody();
            String msg = new String(body);
            log.info("接收到消息{},时间为:{}", msg, new Date());
            int i= 1/0;
            // 消费者手动确认  tag: 消息唯一标识    false: 只确认当前消息(true:批量确认)
            channel.basicAck(tag,false);
        }catch (Exception e){
            e.printStackTrace();
            // tag: 消息唯一标识    false: 只确认当前消息  false: 不重新入队
            channel.basicNack(tag,false,false);
        }
    }
}

⑦:效果

RabbitMQ消息中间件-31

RabbitMQ消息中间件-32

5. 消费者拒绝消息

  • 只需要将接收消费中的channel.basicNack(tag,false,false);改为channel.basicReject(tag, false);

  • 可参考:4. 消费者拒绝消息不进行重新投递

image.png

五、RabbitMQ延迟队列

1. 采用消息中间件(rabbitmq)

问题:先发送的消息,延时时间长,会影响到后面的消息

RabbitMQ消息中间件-33

①:引入依赖和配置


   org.springframework.boot
   spring-boot-starter-amqp

server:
  port: 8093

spring:
  rabbitmq:
    host: 192.168.100.134
    port: 5672
    username: Coke
    password: 123456

rabbit:
  exchangeName: exchange.delay.1 # 既是正常交换机 又是 死信交换机
  queueNormalName: queue.delay.normal.1
  queueDlxName: queue.delay.dlx.1

②:添加配置类

package com.it.rabbit.config;


import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * TODO
 *
 * @author: Coke
 * @DateTime: 2023/08/20/19:15
 **/
@Configuration
public class RabbitConfig {
    
    @Value ("${rabbit.exchangeName}")
    private String exchangeName;
    
    @Value ("${rabbit.queueNormalName}")
    private String queueNormalName;
    
    @Value ("${rabbit.queueDlxName}")
    private String queueDlxName;
    
    // 定义交换机
    @Bean
    public TopicExchange topicExchange () {
        return ExchangeBuilder
              .topicExchange(exchangeName)
              .build();
    }
    
    // 定义正常队列
    @Bean
    public Queue queueNormal () {
        return QueueBuilder
              .durable(queueNormalName) // 队列名称
              .ttl(25000)  // 队列过期时间
              .deadLetterExchange(exchangeName)  // 设置死信交换机
              .deadLetterRoutingKey("error")  // 设置死信路由key
              .build();
    }
    
    // 定义死信队列
    @Bean
    public Queue queueDlx () {
        return QueueBuilder.durable(queueDlxName).build();
    }
    
    
    // 绑定正常队列与交换机
    @Bean
    public Binding bindingNormal (Queue queueNormal, TopicExchange topicExchange) {
        return BindingBuilder.bind(queueNormal).to(topicExchange).with("order");
    }
    
    // 绑定正常死信与交换机
    @Bean
    public Binding bindingDlx (Queue queueDlx, TopicExchange topicExchange) {
        return BindingBuilder.bind(queueDlx).to(topicExchange).with("error");
    }
    
}

③:发送消息

package com.it.rabbit.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;
import java.util.Date;

/**
 * TODO
 *
 * @author: Coke
 * @DateTime: 2023/08/20/19:25
 **/
@Service
@Slf4j
public class MessageService {
    
    @Value ("${rabbit.exchangeName}")
    private String exchangeName;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendMsg(){
        Message message = MessageBuilder.withBody("这是一个延时队列的消息".getBytes(StandardCharsets.UTF_8)).build();
        // 参数 (交换机   路由key  消息体)
        rabbitTemplate.convertAndSend(exchangeName, "order", message);
        log.info("消息发送成!{}", new Date());
    }
}

④:配置启动类发送消息

@SpringBootApplication
public class Rabbit03TopicApplication implements ApplicationRunner {
    @Autowired
    private MessageService messageService;
    
    public static void main (String[] args) {
        SpringApplication.run(Rabbit03TopicApplication.class, args);
    }
    
    @Override
    public void run (ApplicationArguments args) throws Exception {
        messageService.sendMsg();
    }
}

⑤:效果

  • 正常队列中有一条消息

RabbitMQ消息中间件-34

  • 25秒后消息进入到死信队列中

RabbitMQ消息中间件-35

2. 使用插件(rabbitmq-delayed-message-exchange)

RabbitMQ消息中间件-36

①:下载插件

1. 官网:github.com/rabbitmq/ra…

RabbitMQ消息中间件-37

RabbitMQ消息中间件-38

②:安装插件

1. 将插件复制到linux中rabbitmq的/plugins目录下

RabbitMQ消息中间件-39

2. 解压 unzip rabbitmq_delayed_message_exchange-3.9.0.ez

image.png

  • 解压后

RabbitMQ消息中间件-40

③:启动插件

1. 可以先使用命令列出所有的插件名字 rabbitmq_delayed_message_exchange

RabbitMQ消息中间件-41

2.启动插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchange

RabbitMQ消息中间件-42

RabbitMQ消息中间件-43

** 3.可以看到多了一种交换机类型**

RabbitMQ消息中间件-44

④: 引入依赖与配置

image.png


   org.springframework.boot
   spring-boot-starter-amqp

server:
  port: 8094

spring:
  rabbitmq:
    host: 192.168.100.134
    port: 5672
    username: Coke
    password: 123456

rabbit:
  exchangeName: exchange.delay.2 # 正常交换机
  queueDelayName: queue.delay.2  # 正常队列

⑤:添加配置类

@Configuration
public class RabbitConfig {
    
    @Value ("${rabbit.exchangeName}")
    private String exchangeName;
    
    @Value ("${rabbit.queueDelayName}")
    private String queueDelayName;
    
    // 定义交换机 (自定义交换机)
    @Bean
    public CustomExchange customExchange () {
        HashMap arguments = new HashMap();
        arguments.put("x-delayed-type", "direct"); // 放一个参数
        return new CustomExchange(exchangeName, "x-delayed-message", true, false, arguments);
    }
    
    // 定义正常队列
    @Bean
    public Queue queueNormal () {
        return QueueBuilder
              .durable(queueDelayName) // 队列名称
              .build();
    }
    
    // 绑定正常队列与交换机
    @Bean
    public Binding bindingNormal (Queue queueNormal, CustomExchange customExchange) {
        // 绑定 需要指定路由key
        return BindingBuilder.bind(queueNormal).to(customExchange).with("plugin").noargs();
    }
    
}

⑥:发送消息

@Service
@Slf4j
public class MessageService {
    
    @Value ("${rabbit.exchangeName}")
    private String exchangeName;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendMsg () {
        {
            MessageProperties properties = new MessageProperties();
            properties.setHeader("x-delay", 25000); // 延时25秒
            Message message = MessageBuilder.withBody("这是一个延时25秒的消息消息".getBytes(StandardCharsets.UTF_8)).andProperties(properties).build();
            // 参数 (交换机   路由key  消息体)
            rabbitTemplate.convertAndSend(exchangeName, "plugin", message);
            log.info("延时25秒消息发送成!{}", new Date());
        }
        {
            MessageProperties properties = new MessageProperties();
            properties.setHeader("x-delay", 15000); // 延时15秒
            Message message = MessageBuilder.withBody("这是一个延时15秒的消息消息".getBytes(StandardCharsets.UTF_8)).andProperties(properties).build();
            // 参数 (交换机   路由key  消息体)
            rabbitTemplate.convertAndSend(exchangeName, "plugin", message);
            log.info("延时15秒消息发送成!{}", new Date());
        }

    }
}

⑦:接收消息

@Component
@Slf4j
public class ReceiveMessage {
    
    @RabbitListener (queues = "queue.delay.2")
    public void receiveMessage (Message message) {
        byte[] body = message.getBody();
        String msg = new String(body);
        log.info("接收到消息:{}, 时间为:{}", msg, new Date());
    }
}

⑧:可以看到消息延时时间长的并没有影响到消息延时时间短的

image.png

六、消息可靠性(消息到达交换机)

1. confirm确认模式

image.png

①:配置文件(开启确认模式)


   org.springframework.boot
   spring-boot-starter-amqp

server:
  port: 8097

spring:
  rabbitmq:
    host: 192.168.100.134
    port: 5672
    username: Coke
    password: 123456
    publisher-confirm-type: correlated  # 开启生产者的消息确认模式

rabbit:
  exchangeName: exchange.confirm.1
  queueName: queue.confirm.1

②:添加配置类

@Configuration
public class RabbitConfig {
    
    @Value ("${rabbit.exchangeName}")
    private String exchangeName;
    
    
    @Value ("${rabbit.queueName}")
    private String queueName;
    
    
    // 定义交换机
    @Bean
    public TopicExchange topicExchange(){
        return ExchangeBuilder.topicExchange(exchangeName).build();
    }
    // 定义队列
    @Bean
    public Queue queue(){
        return QueueBuilder.durable(queueName).build();
    }

    
    // 绑定队列与交换机
    @Bean
    public Binding binding1(Queue queue, TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with("info");
    }
}
@Component
@Slf4j
public class MyConfirmCallBack implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm (CorrelationData correlationData, boolean ack, String cause) {
        if (ack){
            log.info("消息正确的达到交换机!");
            return;
        }
        log.error("消息没有到达交换机,原因为:{}", cause);
    }
}

③:发送消息

@Service
@Slf4j
public class MessageService {
    
    @Value ("${rabbit.exchangeName}")
    private String exchangeName;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Autowired
    private MyConfirmCallBack myConfirmCallBack;
    
    @PostConstruct // 该注解会在构造方法执行后执行,相当于初始化作用
    public void init(){
        rabbitTemplate.setConfirmCallback(myConfirmCallBack);
    }
    
    public void sendMsg(){
        Message message = MessageBuilder.withBody("这是一个带有过期时间的消息".getBytes(StandardCharsets.UTF_8)).build();
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId("id_110"); // 设置消息的关联id
        // 参数 (交换机   路由key  消息体  相关生产者确认的数据)
        rabbitTemplate.convertAndSend(exchangeName, "info", message,correlationData);
        log.info("消息发送成!{}", new Date());
    }
}

④:测试

1.发送消息时 故意将交换机名字写错 模拟错误

RabbitMQ消息中间件-45

2.正常测试

RabbitMQ消息中间件-46

2.confirm确认模式(lambda表达式写法)

RabbitMQ消息中间件-47

MessageService(发送消息类)

@Service
@Slf4j
public class MessageService {
    
    @Value ("${rabbit.exchangeName}")
    private String exchangeName;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    
/*    @PostConstruct // 该注解会在构造方法执行后执行,相当于初始化作用
    public void init(){
        rabbitTemplate.setConfirmCallback(
              new RabbitTemplate.ConfirmCallback() {
                  @Override
                  public void confirm (CorrelationData correlationData, boolean ack, String cause) {
                      if (ack){
                          log.info("消息正确的达到交换机!");
                          return;
                      }
                      log.error("消息没有到达交换机,原因为:{}", cause);
                  }
              }
        );
    }*/
    
    @PostConstruct // 该注解会在构造方法执行后执行,相当于初始化作用
    public void init () {
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("消息正确的达到交换机!");
                return;
            }
            log.error("消息没有到达交换机,原因为:{}", cause);
        });
    }
    
    public void sendMsg () {
        Message message = MessageBuilder.withBody("这是一个带有过期时间的消息".getBytes(StandardCharsets.UTF_8)).build();
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId("id_110"); // 设置消息的关联id
        // 参数 (交换机   路由key  消息体  相关生产者确认的数据)
        rabbitTemplate.convertAndSend(exchangeName, "info", message, correlationData);
        log.info("消息发送成!{}", new Date());
    }
}

七、消息可靠性(交换机到队列)

image.png

1. Return模式

RabbitMQ消息中间件-48

①:配置文件(开启确认模式)


   org.springframework.boot
   spring-boot-starter-amqp

server:
  port: 8098

spring:
  rabbitmq:
    host: 192.168.100.134
    port: 5672
    username: Coke
    password: 123456
    publisher-returns: true # 开启return模式

rabbit:
  exchangeName: exchange.return.1
  queueName: queue.return.1

②:添加配置类

@Configuration
public class RabbitConfig {
    
    @Value ("${rabbit.exchangeName}")
    private String exchangeName;
    
    
    @Value ("${rabbit.queueName}")
    private String queueName;
    
    
    // 定义交换机
    @Bean
    public TopicExchange topicExchange(){
        return ExchangeBuilder.topicExchange(exchangeName).build();
    }
    // 定义队列
    @Bean
    public Queue queue(){
        return QueueBuilder.durable(queueName).build();
    }

    
    // 绑定队列与交换机
    @Bean
    public Binding binding1(Queue queue, TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with("info");
    }
}
@Component
@Slf4j
public class MyReturnCallBack implements RabbitTemplate.ReturnsCallback {
    
    // 只有消息从交换机没有正确到达队列时才会回调
    @Override
    public void returnedMessage (ReturnedMessage returned) {
        log.error("消息从交换机未能正确到达队列,原因为:{}", returned.getReplyText());
    }
}

③:发送消息

@Service
@Slf4j
public class MessageService {
    
    @Value ("${rabbit.exchangeName}")
    private String exchangeName;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Autowired
    private MyReturnCallBack myReturnCallBack;
    
    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnsCallback(myReturnCallBack);
    }
    
    public void sendMsg(){
        Message message = MessageBuilder.withBody("这是一个消息".getBytes(StandardCharsets.UTF_8)).build();
        // 参数 (交换机   路由key  消息体)
        rabbitTemplate.convertAndSend(exchangeName, "info11", message);
        log.info("消息发送成!{}", new Date());
    }
}

④:测试

1.发送消息时 故意将路由key写错 模拟错误

image.png

RabbitMQ消息中间件-49

2.正常测试(消息发送成功,没有报错)

RabbitMQ消息中间件-50

RabbitMQ消息中间件-51

2. Return模式(lambda表达式写法)

image.png

MessageService(发送消息类)

@Service
@Slf4j
public class MessageService {
    
    @Value ("${rabbit.exchangeName}")
    private String exchangeName;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @PostConstruct
    public void init () {
        rabbitTemplate.setReturnsCallback(
              returned -> log.error("消息从交换机未能正确到达队列,原因为:{}", returned.getReplyText())
        );
    }
    
    public void sendMsg () {
        Message message = MessageBuilder.withBody("这是一个消息".getBytes(StandardCharsets.UTF_8)).build();
        // 参数 (交换机   路由key  消息体)
        rabbitTemplate.convertAndSend(exchangeName, "info", message);
        log.info("消息发送成!{}", new Date());
    }
}

3. 交换机属性

// 定义交换机
@Bean
public TopicExchange topicExchange(){
    return ExchangeBuilder.
          topicExchange(exchangeName)
          .durable(false) // isDurable – 持久标志(默认为 true)
          .autoDelete() // 设置自动删除 (默认不是自动删除)
          .build();
}

RabbitMQ消息中间件-52

八、备用交换机

RabbitMQ消息中间件-53

RabbitMQ消息中间件-54

1. 实现

①:引入依赖与配置


   org.springframework.boot
   spring-boot-starter-amqp

server:
  port: 8110

spring:
  rabbitmq:
    host: 192.168.100.134
    port: 5672
    username: Coke
    password: 123456

rabbit:
  exchangeName: exchange.alternate.normal.1  # 正常交换机
  exchangeAlternateName: exchange.alternate.alternate.1 # 备用交换机
  queueName: queue.alternate.normal.1  # 正常队列
  queueAlternateName: queue.alternate.alternate.1  # 备用队列

②:添加配置类

@Configuration
public class RabbitConfig {
    
    @Value ("${rabbit.exchangeName}")
    private String exchangeName;
    
    @Value ("${rabbit.queueName}")
    private String queueName;
    
    @Value ("${rabbit.exchangeAlternateName}")
    private String exchangeAlternateName;
    
    @Value ("${rabbit.queueAlternateName}")
    private String queueAlternateName;
    
    
    // 正常交换机
    @Bean
    public TopicExchange topicExchange(){
        return ExchangeBuilder.
              topicExchange(exchangeName)
              .alternate(exchangeAlternateName) // 设置备用交换机
              .build();
    }
    // 正常队列
    @Bean
    public Queue queue(){
        return QueueBuilder.durable(queueName).build();
    }
    
    // 绑定正常队列与交换机
    @Bean
    public Binding binding1(Queue queue, TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with("info");
    }
    
    // 备用交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return ExchangeBuilder.fanoutExchange(exchangeAlternateName).build();
    }
    
    // 备用队列
    @Bean
    public Queue queueAlternate(){
        return QueueBuilder.durable(queueAlternateName).build();
    }
    
    // 绑定备用交换机与队列
    @Bean
    public Binding binding(Queue queueAlternate, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queueAlternate).to(fanoutExchange);
    }
}

③:发送消息

@Service
@Slf4j
public class MessageService {
    
    @Value ("${rabbit.exchangeName}")
    private String exchangeName;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendMsg(){
        Message message = MessageBuilder.withBody("这是一个消息".getBytes(StandardCharsets.UTF_8)).build();
        // 参数 (交换机   路由key  消息体)
        rabbitTemplate.convertAndSend(exchangeName, "info", message);
        log.info("消息发送成!{}", new Date());
    }
}

④:测试效果

1. 正常测试(指定正确路由key)

image.png

RabbitMQ消息中间件-55

2.非正常测试(指定错误的路由key)通过备用交换机进入备用队列中

RabbitMQ消息中间件-56

RabbitMQ消息中间件-57

RabbitMQ消息中间件-58

相关文章

服务器端口转发,带你了解服务器端口转发
服务器开放端口,服务器开放端口的步骤
产品推荐:7月受欢迎AI容器镜像来了,有Qwen系列大模型镜像
如何使用 WinGet 下载 Microsoft Store 应用
百度搜索:蓝易云 – 熟悉ubuntu apt-get命令详解
百度搜索:蓝易云 – 域名解析成功但ping不通解决方案

发布评论