一、安装 RabbitMQ
①:安装
安装笔记:juejin.cn/post/726153…
②:RabbitMq相关命令
1. 创建用户
| sudo rabbitmqctl add_user 用户名 密码 |
| sudo rabbitmqctl add_user Coke 123456 |

2. 查看所有用户
sudo rabbitmqctl list_users

3. 设置用户角色
sudo rabbitmqctl set_user_tags Coke administrator

4. 设置用户权限
| sudo rabbitmqctl set_permissions Coke 读 写 配置 |
| sudo rabbitmqctl set_permissions Coke ".*" ".*" ".*" |

5. 查看用户权限
sudo rabbitmqctl list_permissions

③:RabbitMQ工作模型

broker相当于mysq1服务器,virtual host相当于数据库(可以有多个数据库)
queue相当于表,消息相当于记录。
消息队列有三个核心要素:消息生产者、消息队列、消息消费者;
-
生产者(Producer):发送消息的应用;(java程序,也可能是别的语言写的程序)
-
消费者(Consumer):接收消息的应用;(java程序,也可能是别的语言写的程序)
-
代理(Broker):就是消息服务器,RabbitMQ Server就是Message Broker;
-
连接(Connection):连接RabbitMQ服务器的TCP长连接;
-
信道(Channel):连接中的一个虚拟通道,消息队列发送或者接收消息时,都是通过信道
进行的;
虚拟主机(Virtual host): 一个虚拟分组,在代码中就是一个字符串,当多个不同的用
户使用同一个RabbitMQ服务时,可以划分出多个Virtual host,每个用户在自己的Virtual
host创建exchange/queue等;(分类比较清晰、相互隔离)
交换机(Exchange): 交换机负责从生产者接收消息,并根据交换机类型分发到对应的消
息队列中,起到一个路由的作用;
路由键(Routing Key): 交换机根据路由键来决定消息分发到哪个队列,路由键是消息
的目的地址;
绑定(Binding): 绑定是队列和交换机的一个关联连接(关联关系);
队列(Queue): 存储消息的缓存;
消息(Message): 由生产者通过RabbitMQ发送给消费者的信息;(消息可以任何数据,
字符串、user对象,json串等等)
二、RabbitMQ -- 交换机
1:交换机类型
1、Fanout Exchange(扇形)
2、Direct Exchange(直连)
3、Topic Exchange(主题)
4、Headers Exchange(头部)
2:扇形交换机(Fanout)
Fanout扇形的,散开的;
扇形交换机投递到所有绑定的队列,不需要路由键,不需要进行路由键的匹配,相当于广播、群发;

①:导入RabbitMQ依赖
| |
| org.springframework.boot |
| spring-boot-starter-amqp |
| |
②:配置连接信息
| server: |
| port: 8801 |
| |
| spring: |
| rabbitmq: |
| host: 192.168.100.134 |
| port: 5672 |
| username: Coke |
| password: 123456 |
| |
③:添加配置类
定义交换机、队列以及绑定队列与交换机

| @Configuration |
| public class RabbitConfig { |
| |
| |
| |
| public static String FANOUT_EXCHANGE_NAME = "fanout_exchange_1"; |
| public static String QUEUE_NAME_1 = "queue_name_1"; |
| public static String QUEUE_NAME_2 = "queue_name_2"; |
| |
| |
| |
| |
| |
| |
| |
| |
| @Bean |
| public FanoutExchange fanoutExchange(){ |
| return ExchangeBuilder.fanoutExchange(FANOUT_EXCHANGE_NAME).build(); |
| } |
| |
| |
| |
| |
| |
| |
| |
| |
| @Bean |
| public Queue queue1(){ |
| return QueueBuilder.durable(QUEUE_NAME_1).build(); |
| } |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| @Bean |
| public Queue queue2(){ |
| return QueueBuilder.durable(QUEUE_NAME_2).build(); |
| } |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| @Bean |
| public Binding binding1(FanoutExchange fanoutExchange, Queue queue1){ |
| return BindingBuilder.bind(queue1).to(fanoutExchange); |
| } |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| @Bean |
| public Binding binding2(FanoutExchange fanoutExchange, Queue queue2){ |
| return BindingBuilder.bind(queue2).to(fanoutExchange); |
| } |
| } |
④:发送消息

| @Component |
| @Slf4j |
| public class SendMessage { |
| @Autowired |
| private RabbitTemplate rabbitTemplate; |
| |
| public void sendMessage(){ |
| |
| Message message = new Message("这是扇形交换机发送的1条消息".getBytes()); |
| rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE_NAME,"",message); |
| log.info("消息发送成功!{}",new Date()); |
| } |
| } |
⑤:配置启动程序后 发送消息
| @SpringBootApplication |
| public class Rabbit01FanoutApplication implements ApplicationRunner { |
| @Autowired |
| private SendMessage sendMessage; |
| |
| public static void main (String[] args) { |
| SpringApplication.run(Rabbit01FanoutApplication.class, args); |
| } |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| @Override |
| public void run (ApplicationArguments args) throws Exception { |
| sendMessage.sendMessage(); |
| } |
| } |
启动程序(消息发送成功)

每个队列中都有一条消息

⑥:接收消息
这里新建一个模块用来接收消息(一般情况生产者和消费者都是分开的)

01. 引入依赖和配置
引入依赖和配置和发送端一样的(改一下端口号即可)
02. 接收消息
创建ReceiveMessage接收消息的类
| @Slf4j |
| public class ReceiveMessage { |
| |
| public static final String QUEUE_NAME_1 = "queue_name_1"; |
| public static final String QUEUE_NAME_2 = "queue_name_2"; |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| @RabbitListener (queues = {QUEUE_NAME_1, QUEUE_NAME_2}) |
| public void receiveMessage (Message message) { |
| byte[] body = message.getBody(); |
| String msg = new String(body); |
| log.info("{} 接收到一条消息:{}", new Date(), msg); |
| } |
| } |
启动项目即可


3:直连交换机(Direct)


①:引入依赖和配置
引入依赖和配置和扇形交换机一样的(改一下端口号即可)
②:添加配置类
定义交换机、队列以及绑定队列与交换机
| @Configuration |
| public class RabbitConfig { |
| |
| public static String DIRECT_EXCHANGE_NAME = "direct_exchange_1"; |
| public static String DIRECT_QUEUE_1 = "direct_queue_1"; |
| public static String DIRECT_QUEUE_2 = "direct_queue_2"; |
| |
| |
| |
| |
| |
| |
| |
| |
| @Bean |
| public DirectExchange directExchange(){ |
| return ExchangeBuilder.directExchange(DIRECT_EXCHANGE_NAME).build(); |
| } |
| |
| |
| |
| |
| |
| |
| |
| |
| @Bean |
| public Queue queue1(){ |
| return QueueBuilder.durable(DIRECT_QUEUE_1).build(); |
| } |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| @Bean |
| public Queue queue2(){ |
| return QueueBuilder.durable(DIRECT_QUEUE_2).build(); |
| } |
| |
| |
| @Bean |
| public Binding bindingQueue1(DirectExchange directExchange, Queue queue1){ |
| return BindingBuilder.bind(queue1).to(directExchange).with("error"); |
| } |
| |
| |
| @Bean |
| public Binding bindingQueue2(DirectExchange directExchange, Queue queue2){ |
| return BindingBuilder.bind(queue2).to(directExchange).with("info"); |
| } |
| |
| |
| @Bean |
| public Binding bindingQueue3(DirectExchange directExchange, Queue queue2){ |
| return BindingBuilder.bind(queue2).to(directExchange).with("error"); |
| } |
| |
| |
| @Bean |
| public Binding bindingQueue4(DirectExchange directExchange, Queue queue2){ |
| return BindingBuilder.bind(queue2).to(directExchange).with("warning"); |
| } |
| } |
③:发送消息

| @Slf4j |
| @Component |
| public class SendMessage { |
| |
| @Autowired |
| private RabbitTemplate rabbitTemplate; |
| |
| public void sendMsg(){ |
| |
| String msg = "这是直连交换机的一条消息"; |
| Message message = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8)).build(); |
| |
| |
| rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE_NAME, "error", message); |
| log.info("消息发送成功:{}", new Date()); |
| } |
| } |
④:配置启动程序后 发送消息
| @SpringBootApplication |
| public class Rabbit02DirectApplication implements ApplicationRunner { |
| |
| @Autowired |
| private SendMessage sendMessage; |
| |
| |
| public static void main (String[] args) { |
| SpringApplication.run(Rabbit02DirectApplication.class, args); |
| } |
| |
| @Override |
| public void run (ApplicationArguments args) throws Exception { |
| sendMessage.sendMsg(); |
| } |
| } |
消息发送成功,配置的key="error" 所以两个队列都可以接收到消息"


⑤:改变key="info"再次测试

消息发送成功,配置的key="info" 所以只有队列2可以接收到消息"


4:主题交换机(Topic)


①:引入依赖和配置
引入依赖和配置
| |
| org.springframework.boot |
| spring-boot-starter-amqp |
| |
| server: |
| port: 8084 |
| |
| spring: |
| rabbitmq: |
| host: 192.168.100.134 |
| port: 5672 |
| username: Coke |
| password: 123456 |
| |
| rabbit: |
| exchangeName: exchange.topic |
| queueName1: queue.topic.1 |
| queueName2: queue.topic.2 |

②:添加配置类
| @Configuration |
| public class RabbitConfig { |
| |
| @Value ("rabbit.exchangeName") |
| private String exchangeName; |
| |
| |
| @Value ("${rabbit.queueName1}") |
| private String queueName1; |
| |
| @Value ("${rabbit.queueName2}") |
| private String queueName2; |
| |
| |
| @Bean |
| public TopicExchange topicExchange(){ |
| return ExchangeBuilder.topicExchange(exchangeName).build(); |
| } |
| |
| @Bean |
| public Queue queue1(){ |
| return QueueBuilder.durable(queueName1).build(); |
| } |
| |
| @Bean |
| public Queue queue2(){ |
| return QueueBuilder.durable(queueName2).build(); |
| } |
| |
| |
| @Bean |
| public Binding binding1(Queue queue1, TopicExchange topicExchange){ |
| return BindingBuilder.bind(queue1).to(topicExchange).with("*.orange.*"); |
| } |
| |
| @Bean |
| public Binding binding2(Queue queue2, TopicExchange topicExchange){ |
| return BindingBuilder.bind(queue2).to(topicExchange).with("*.*.rabbit"); |
| } |
| |
| @Bean |
| public Binding binding3(Queue queue2, TopicExchange topicExchange){ |
| return BindingBuilder.bind(queue2).to(topicExchange).with("lazy.#"); |
| } |
| } |
③:发送消息
| @Service |
| @Slf4j |
| public class MessageService { |
| |
| @Value ("${rabbit.exchangeName}") |
| private String exchangeName; |
| |
| @Autowired |
| private RabbitTemplate rabbitTemplate; |
| |
| public void sendMsg(){ |
| Message message = MessageBuilder.withBody("主题交换机".getBytes(StandardCharsets.UTF_8)).build(); |
| |
| rabbitTemplate.convertAndSend(exchangeName, "lazy.orange.rabbit", message); |
| log.info("消息发送成!"); |
| } |
| } |
④:配置启动程序后 发送消息
| @SpringBootApplication |
| public class Rabbit03TopicApplication implements ApplicationRunner { |
| @Autowired |
| private MessageService messageService; |
| |
| public static void main (String[] args) { |
| SpringApplication.run(Rabbit03TopicApplication.class, args); |
| } |
| |
| @Override |
| public void run (ApplicationArguments args) throws Exception { |
| messageService.sendMsg(); |
| } |
| } |
消息发送成功,配置的key="lazy.orange.rabbit" 所以两个队列都可以接收到消息"


5:头部交换机(Headers)

①:引入依赖和配置
引入依赖和配置
| |
| org.springframework.boot |
| spring-boot-starter-amqp |
| |
| server: |
| port: 8085 |
| |
| spring: |
| rabbitmq: |
| host: 192.168.100.134 |
| port: 5672 |
| username: Coke |
| password: 123456 |
| |
| rabbit: |
| exchangeName: exchange.headers |
| queueName1: queue.headers.1 |
| queueName2: queue.headers.2 |

②:添加配置类
| @Configuration |
| public class RabbitConfig { |
| |
| @Value("${rabbit.exchangeName}") |
| private String exchangeName; |
| |
| @Value("${rabbit.queueName1}") |
| private String queueName1; |
| |
| @Value("${rabbit.queueName2}") |
| private String queueName2; |
| |
| @Bean |
| public HeadersExchange headersExchange(){ |
| return ExchangeBuilder.headersExchange(exchangeName).build(); |
| } |
| |
| |
| @Bean |
| public Queue queue1(){ |
| return QueueBuilder.durable(queueName1).build(); |
| } |
| |
| @Bean |
| public Queue queue2(){ |
| return QueueBuilder.durable(queueName2).build(); |
| } |
| |
| @Bean |
| public Binding bindingA(HeadersExchange headersExchange, Queue queue1){ |
| HashMap map = new HashMap(); |
| map.put("type", "m"); |
| map.put("status",1); |
| return BindingBuilder.bind(queue1).to(headersExchange).whereAll(map).match(); |
| } |
| |
| @Bean |
| public Binding bindingB(HeadersExchange headersExchange, Queue queue2){ |
| HashMap map = new HashMap(); |
| map.put("type", "s"); |
| map.put("status",0); |
| return BindingBuilder.bind(queue2).to(headersExchange).whereAll(map).match(); |
| } |
| |
| } |
③:发送消息
| @Service |
| public class MessageService { |
| |
| @Value("${rabbit.exchangeName}") |
| private String exchangeName; |
| |
| @Autowired |
| private RabbitTemplate rabbitTemplate; |
| |
| public void sendMsg () { |
| |
| MessageProperties properties = new MessageProperties(); |
| properties.setHeader("type", "s"); |
| properties.setHeader("status", 0); |
| |
| Message message = MessageBuilder.withBody("这是头部交换机的一个消息".getBytes()) |
| .andProperties(properties).build(); |
| |
| rabbitTemplate.convertAndSend(exchangeName, "", message); |
| } |
| } |
④:配置启动程序后 发送消息
| @SpringBootApplication |
| @Slf4j |
| public class Rabbit04HeadersApplication implements ApplicationRunner { |
| @Autowired |
| private MessageService messageService; |
| |
| public static void main (String[] args) { |
| SpringApplication.run(Rabbit04HeadersApplication.class, args); |
| } |
| |
| |
| @Override |
| public void run (ApplicationArguments args) throws Exception { |
| messageService.sendMsg(); |
| log.info("消息发送成功!{}", new Date()); |
| } |
| } |
⑤:效果
只有队列2收到了消息

三、RabbitMQ过期消息
1. 设置单条消息的过期时间
①:引入依赖和配置
引入依赖和配置
| |
| org.springframework.boot |
| spring-boot-starter-amqp |
| |
| server: |
| port: 8086 |
| |
| spring: |
| rabbitmq: |
| host: 192.168.100.134 |
| port: 5672 |
| username: Coke |
| password: 123456 |
| |
| rabbit: |
| exchangeName: exchange.ttl1 |
| queueName: queue.ttl.1 |

②:添加配置类
| @Configuration |
| public class RabbitConfig { |
| |
| @Value ("${rabbit.exchangeName}") |
| private String exchangeName; |
| |
| |
| @Value ("${rabbit.queueName}") |
| private String queueName; |
| |
| |
| |
| @Bean |
| public TopicExchange topicExchange(){ |
| return ExchangeBuilder.topicExchange(exchangeName).build(); |
| } |
| |
| @Bean |
| public Queue queue(){ |
| return QueueBuilder.durable(queueName).build(); |
| } |
| |
| |
| |
| @Bean |
| public Binding binding1(Queue queue, TopicExchange topicExchange){ |
| return BindingBuilder.bind(queue).to(topicExchange).with("info"); |
| } |
| } |
③:发送消息
| @Service |
| @Slf4j |
| public class MessageService { |
| |
| @Value ("${rabbit.exchangeName}") |
| private String exchangeName; |
| |
| @Autowired |
| private RabbitTemplate rabbitTemplate; |
| |
| public void sendMsg(){ |
| MessageProperties properties = new MessageProperties(); |
| |
| properties.setExpiration("20000"); |
| |
| Message message = MessageBuilder.withBody("这是一个带有过期时间的消息".getBytes(StandardCharsets.UTF_8)) |
| .andProperties(properties).build(); |
| |
| rabbitTemplate.convertAndSend(exchangeName, "info", message); |
| log.info("消息发送成!{}", new Date()); |
| } |
| } |
④:配置启动程序后发送消息
| @SpringBootApplication |
| public class Rabbit03TopicApplication implements ApplicationRunner { |
| @Autowired |
| private MessageService messageService; |
| |
| public static void main (String[] args) { |
| SpringApplication.run(Rabbit03TopicApplication.class, args); |
| } |
| |
| @Override |
| public void run (ApplicationArguments args) throws Exception { |
| messageService.sendMsg(); |
| } |
| } |
⑤:效果

1.可以看到队列中有一条消息

2. 查看消息的属性

3.时间到后如果消息没有被取出来就会自动删除

2. 通过队列属性设置消息的过期时间
①:引入依赖和配置
引入依赖和配置
| |
| org.springframework.boot |
| spring-boot-starter-amqp |
| |
| server: |
| port: 8087 |
| |
| spring: |
| rabbitmq: |
| host: 192.168.100.134 |
| port: 5672 |
| username: Coke |
| password: 123456 |
| |
| rabbit: |
| exchangeName: exchange.ttl2 |
| queueName: queue.ttl.2 |

②:添加配置类
| @Configuration |
| public class RabbitConfig { |
| |
| @Value ("${rabbit.exchangeName}") |
| private String exchangeName; |
| |
| @Value ("${rabbit.queueName}") |
| private String queueName; |
| |
| |
| @Bean |
| public TopicExchange topicExchange(){ |
| return ExchangeBuilder.topicExchange(exchangeName).build(); |
| } |
| |
| @Bean |
| public Queue queue(){ |
| return QueueBuilder.durable(queueName).ttl(20000).build(); |
| } |
| |
| |
| @Bean |
| public Binding binding1(Queue queue, TopicExchange topicExchange){ |
| return BindingBuilder.bind(queue).to(topicExchange).with("info"); |
| } |
| } |
③:发送消息
| @Service |
| @Slf4j |
| public class MessageService { |
| |
| @Value ("${rabbit.exchangeName}") |
| private String exchangeName; |
| |
| @Autowired |
| private RabbitTemplate rabbitTemplate; |
| |
| public void sendMsg(){ |
| MessageProperties properties = new MessageProperties(); |
| |
| properties.setExpiration("20000"); |
| |
| Message message = MessageBuilder.withBody("这是一个带有过期时间的消息".getBytes(StandardCharsets.UTF_8)) |
| .andProperties(properties).build(); |
| |
| rabbitTemplate.convertAndSend(exchangeName, "info", message); |
| log.info("消息发送成!{}", new Date()); |
| } |
| } |
④:配置启动程序后发送消息
| @SpringBootApplication |
| public class Rabbit03TopicApplication implements ApplicationRunner { |
| @Autowired |
| private MessageService messageService; |
| |
| public static void main (String[] args) { |
| SpringApplication.run(Rabbit03TopicApplication.class, args); |
| } |
| |
| @Override |
| public void run (ApplicationArguments args) throws Exception { |
| messageService.sendMsg(); |
| } |
| } |
⑤:效果

1.可以看到队列中有一条消息

2. 查看队列和消息的属性


3.时间到后如果消息没有被取出来就会自动删除

四、RabbitMQ死信队列

1. 队列过期
①:引入依赖和配置
引入依赖和配置
| |
| org.springframework.boot |
| spring-boot-starter-amqp |
| |
| server: |
| port: 8089 |
| |
| spring: |
| rabbitmq: |
| host: 192.168.100.134 |
| port: 5672 |
| username: Coke |
| password: 123456 |
| |
| rabbit: |
| exchangeNormalName: exchange.normal.1 |
| queueNormalName: queue.normal.1 |
| exchangeDlxName: exchange.dlx.1 |
| queueDlxName: queue.dlx.1 |

②:添加配置类
| @Configuration |
| public class RabbitConfig { |
| |
| @Value ("${rabbit.exchangeNormalName}") |
| private String exchangeNormalName; |
| |
| @Value ("${rabbit.queueNormalName}") |
| private String queueNormalName; |
| |
| @Value ("${rabbit.exchangeDlxName}") |
| private String exchangeDlxName; |
| |
| @Value ("${rabbit.queueDlxName}") |
| private String queueDlxName; |
| |
| |
| @Bean |
| public DirectExchange topicNormalExchange(){ |
| return ExchangeBuilder.directExchange(exchangeNormalName).build(); |
| } |
| |
| @Bean |
| public DirectExchange topicDlxExchange(){ |
| return ExchangeBuilder.directExchange(exchangeDlxName).build(); |
| } |
| |
| |
| @Bean |
| public Queue queueNormal(){ |
| HashMap map = new HashMap(); |
| map.put("x-message-ttl",20000); |
| map.put("x-dead-letter-exchange",exchangeDlxName); |
| map.put("x-dead-letter-routing-key","error"); |
| return QueueBuilder.durable(queueNormalName).withArguments(map).build(); |
| } |
| |
| @Bean |
| public Queue queueDlx(){ |
| return QueueBuilder.durable(queueDlxName).build(); |
| } |
| |
| |
| @Bean |
| public Binding binding1(Queue queueNormal, DirectExchange topicNormalExchange){ |
| return BindingBuilder.bind(queueNormal).to(topicNormalExchange).with("order"); |
| } |
| |
| @Bean |
| public Binding binding2(Queue queueDlx, DirectExchange topicDlxExchange){ |
| return BindingBuilder.bind(queueDlx).to(topicDlxExchange).with("error"); |
| } |
| } |
③:发送消息
| @Service |
| @Slf4j |
| public class MessageService { |
| |
| @Value ("${rabbit.exchangeNormalName}") |
| private String exchangeNormalName; |
| |
| @Autowired |
| private RabbitTemplate rabbitTemplate; |
| |
| public void sendMsg(){ |
| Message message = MessageBuilder.withBody("这是一个带有过期时间的死信消息".getBytes(StandardCharsets.UTF_8)).build(); |
| |
| rabbitTemplate.convertAndSend(exchangeNormalName, "order", message); |
| log.info("消息发送成!{}", new Date()); |
| } |
| } |
④:配置启动程序后发送消息
| @SpringBootApplication |
| public class Rabbit03TopicApplication implements ApplicationRunner { |
| @Autowired |
| private MessageService messageService; |
| |
| public static void main (String[] args) { |
| SpringApplication.run(Rabbit03TopicApplication.class, args); |
| } |
| |
| @Override |
| public void run (ApplicationArguments args) throws Exception { |
| messageService.sendMsg(); |
| } |
| } |
⑤:效果



2. 消息过期
①:引入依赖和配置
引入依赖和配置
| |
| org.springframework.boot |
| spring-boot-starter-amqp |
| |
| server: |
| port: 8090 |
| |
| spring: |
| rabbitmq: |
| host: 192.168.100.134 |
| port: 5672 |
| username: Coke |
| password: 123456 |
| |
| rabbit: |
| exchangeNormalName: exchange.normal.2 |
| queueNormalName: queue.normal.2 |
| exchangeDlxName: exchange.dlx.2 |
| queueDlxName: queue.dlx.2 |

②:添加配置类
| @Configuration |
| public class RabbitConfig { |
| |
| @Value ("${rabbit.exchangeNormalName}") |
| private String exchangeNormalName; |
| |
| @Value ("${rabbit.queueNormalName}") |
| private String queueNormalName; |
| |
| @Value ("${rabbit.exchangeDlxName}") |
| private String exchangeDlxName; |
| |
| @Value ("${rabbit.queueDlxName}") |
| private String queueDlxName; |
| |
| |
| @Bean |
| public DirectExchange topicNormalExchange(){ |
| return ExchangeBuilder.directExchange(exchangeNormalName).build(); |
| } |
| |
| @Bean |
| public DirectExchange topicDlxExchange(){ |
| return ExchangeBuilder.directExchange(exchangeDlxName).build(); |
| } |
| |
| |
| @Bean |
| public Queue queueNormal(){ |
| HashMap map = new HashMap(); |
| map.put("x-dead-letter-exchange",exchangeDlxName); |
| map.put("x-dead-letter-routing-key","error"); |
| return QueueBuilder.durable(queueNormalName).withArguments(map).build(); |
| } |
| |
| @Bean |
| public Queue queueDlx(){ |
| return QueueBuilder.durable(queueDlxName).build(); |
| } |
| |
| |
| @Bean |
| public Binding binding1(Queue queueNormal, DirectExchange topicNormalExchange){ |
| return BindingBuilder.bind(queueNormal).to(topicNormalExchange).with("order"); |
| } |
| |
| @Bean |
| public Binding binding2(Queue queueDlx, DirectExchange topicDlxExchange){ |
| return BindingBuilder.bind(queueDlx).to(topicDlxExchange).with("error"); |
| } |
| } |
③:发送消息
| @Service |
| @Slf4j |
| public class MessageService { |
| |
| @Value ("${rabbit.exchangeNormalName}") |
| private String exchangeNormalName; |
| |
| @Autowired |
| private RabbitTemplate rabbitTemplate; |
| |
| public void sendMsg () { |
| MessageProperties properties = new MessageProperties(); |
| properties.setExpiration("20000"); |
| Message message = MessageBuilder.withBody("这是一个带有过期时间的死信消息".getBytes(StandardCharsets.UTF_8)).andProperties(properties).build(); |
| |
| rabbitTemplate.convertAndSend(exchangeNormalName, "order", message); |
| log.info("消息发送成!{}", new Date()); |
| } |
| } |
④:配置启动程序后发送消息
| @SpringBootApplication |
| public class Rabbit03TopicApplication implements ApplicationRunner { |
| @Autowired |
| private MessageService messageService; |
| |
| public static void main (String[] args) { |
| SpringApplication.run(Rabbit03TopicApplication.class, args); |
| } |
| |
| @Override |
| public void run (ApplicationArguments args) throws Exception { |
| messageService.sendMsg(); |
| } |
| } |
⑤:效果



3. 队列达到最大长度
①:引入依赖和配置
引入依赖和配置
| |
| org.springframework.boot |
| spring-boot-starter-amqp |
| |
| server: |
| port: 8090 |
| |
| spring: |
| rabbitmq: |
| host: 192.168.100.134 |
| port: 5672 |
| username: Coke |
| password: 123456 |
| |
| rabbit: |
| exchangeNormalName: exchange.normal.2 |
| queueNormalName: queue.normal.2 |
| exchangeDlxName: exchange.dlx.2 |
| queueDlxName: queue.dlx.2 |

②:添加配置类
| @Configuration |
| public class RabbitConfig { |
| |
| @Value ("${rabbit.exchangeNormalName}") |
| private String exchangeNormalName; |
| |
| @Value ("${rabbit.queueNormalName}") |
| private String queueNormalName; |
| |
| @Value ("${rabbit.exchangeDlxName}") |
| private String exchangeDlxName; |
| |
| @Value ("${rabbit.queueDlxName}") |
| private String queueDlxName; |
| |
| |
| @Bean |
| public DirectExchange topicNormalExchange(){ |
| return ExchangeBuilder.directExchange(exchangeNormalName).build(); |
| } |
| |
| @Bean |
| public DirectExchange topicDlxExchange(){ |
| return ExchangeBuilder.directExchange(exchangeDlxName).build(); |
| } |
| |
| |
| @Bean |
| public Queue queueNormal(){ |
| HashMap map = new HashMap(); |
| map.put("x-dead-letter-exchange",exchangeDlxName); |
| map.put("x-dead-letter-routing-key","error"); |
| return QueueBuilder.durable(queueNormalName).withArguments(map).build(); |
| } |
| |
| @Bean |
| public Queue queueDlx(){ |
| return QueueBuilder.durable(queueDlxName).build(); |
| } |
| |
| |
| @Bean |
| public Binding binding1(Queue queueNormal, DirectExchange topicNormalExchange){ |
| return BindingBuilder.bind(queueNormal).to(topicNormalExchange).with("order"); |
| } |
| |
| @Bean |
| public Binding binding2(Queue queueDlx, DirectExchange topicDlxExchange){ |
| return BindingBuilder.bind(queueDlx).to(topicDlxExchange).with("error"); |
| } |
| } |
③:发送消息
| @Service |
| @Slf4j |
| public class MessageService { |
| |
| @Value ("${rabbit.exchangeNormalName}") |
| private String exchangeNormalName; |
| |
| @Autowired |
| private RabbitTemplate rabbitTemplate; |
| |
| public void sendMsg () { |
| MessageProperties properties = new MessageProperties(); |
| properties.setExpiration("20000"); |
| Message message = MessageBuilder.withBody("这是一个带有过期时间的死信消息".getBytes(StandardCharsets.UTF_8)).andProperties(properties).build(); |
| |
| rabbitTemplate.convertAndSend(exchangeNormalName, "order", message); |
| log.info("消息发送成!{}", new Date()); |
| } |
| } |
④:配置启动程序后发送消息
| @SpringBootApplication |
| public class Rabbit03TopicApplication implements ApplicationRunner { |
| @Autowired |
| private MessageService messageService; |
| |
| public static void main (String[] args) { |
| SpringApplication.run(Rabbit03TopicApplication.class, args); |
| } |
| |
| @Override |
| public void run (ApplicationArguments args) throws Exception { |
| messageService.sendMsg(); |
| } |
| } |
⑤:效果

- 2. 有5条消息保留在正常队列中 有3条消息保留在死信队列中

- 3. 队列(先进先出)进入死信队列的3条消息是前3条

4. 消费者拒绝消息不进行重新投递
①:生产者配置
| |
| org.springframework.boot |
| spring-boot-starter-amqp |
| |
| server: |
| port: 8092 |
| |
| spring: |
| rabbitmq: |
| host: 192.168.100.134 |
| port: 5672 |
| username: Coke |
| password: 123456 |
| |
| rabbit: |
| exchangeNormalName: exchange.normal.4 |
| queueNormalName: queue.normal.4 |
| exchangeDlxName: exchange.dlx.4 |
| queueDlxName: queue.dlx.4 |
②:添加配置类
| package com.it.rabbit.config; |
| |
| |
| import org.springframework.amqp.core.*; |
| import org.springframework.beans.factory.annotation.Value; |
| import org.springframework.context.annotation.Bean; |
| import org.springframework.context.annotation.Configuration; |
| |
| import java.util.HashMap; |
| |
| |
| |
| |
| |
| |
| |
| @Configuration |
| public class RabbitConfig { |
| |
| @Value ("${rabbit.exchangeNormalName}") |
| private String exchangeNormalName; |
| |
| @Value ("${rabbit.queueNormalName}") |
| private String queueNormalName; |
| |
| @Value ("${rabbit.exchangeDlxName}") |
| private String exchangeDlxName; |
| |
| @Value ("${rabbit.queueDlxName}") |
| private String queueDlxName; |
| |
| |
| @Bean |
| public DirectExchange topicNormalExchange(){ |
| return ExchangeBuilder.directExchange(exchangeNormalName).build(); |
| } |
| |
| @Bean |
| public DirectExchange topicDlxExchange(){ |
| return ExchangeBuilder.directExchange(exchangeDlxName).build(); |
| } |
| |
| |
| @Bean |
| public Queue queueNormal(){ |
| HashMap map = new HashMap(); |
| map.put("x-dead-letter-exchange",exchangeDlxName); |
| map.put("x-dead-letter-routing-key","error"); |
| return QueueBuilder.durable(queueNormalName).withArguments(map).build(); |
| } |
| |
| @Bean |
| public Queue queueDlx(){ |
| return QueueBuilder.durable(queueDlxName).build(); |
| } |
| |
| |
| @Bean |
| public Binding binding1(Queue queueNormal, DirectExchange topicNormalExchange){ |
| return BindingBuilder.bind(queueNormal).to(topicNormalExchange).with("order"); |
| } |
| |
| @Bean |
| public Binding binding2(Queue queueDlx, DirectExchange topicDlxExchange){ |
| return BindingBuilder.bind(queueDlx).to(topicDlxExchange).with("error"); |
| } |
| } |
③:发送消息
| package com.it.rabbit.service; |
| |
| import lombok.SneakyThrows; |
| import lombok.extern.slf4j.Slf4j; |
| import org.springframework.amqp.core.Message; |
| import org.springframework.amqp.core.MessageBuilder; |
| import org.springframework.amqp.core.MessageProperties; |
| import org.springframework.amqp.rabbit.core.RabbitTemplate; |
| import org.springframework.beans.factory.annotation.Autowired; |
| import org.springframework.beans.factory.annotation.Value; |
| import org.springframework.stereotype.Service; |
| |
| import java.nio.charset.StandardCharsets; |
| import java.util.Date; |
| |
| |
| |
| |
| |
| |
| |
| @Service |
| @Slf4j |
| public class MessageService { |
| |
| @Value ("${rabbit.exchangeNormalName}") |
| private String exchangeNormalName; |
| |
| @Autowired |
| private RabbitTemplate rabbitTemplate; |
| |
| public void sendMsg () { |
| String sir = "这是第1个消息"; |
| Message message = MessageBuilder.withBody(sir.getBytes(StandardCharsets.UTF_8)).build(); |
| |
| rabbitTemplate.convertAndSend(exchangeNormalName, "order", message); |
| log.info("消息发送成!{}", new Date()); |
| } |
| } |
④:配置启动类发送消息
| @SpringBootApplication |
| public class Rabbit03TopicApplication implements ApplicationRunner { |
| @Autowired |
| private MessageService messageService; |
| |
| public static void main (String[] args) { |
| SpringApplication.run(Rabbit03TopicApplication.class, args); |
| } |
| |
| @Override |
| public void run (ApplicationArguments args) throws Exception { |
| messageService.sendMsg(); |
| } |
| } |
⑤:消费者配置
| |
| org.springframework.boot |
| spring-boot-starter-amqp |
| |
| server: |
| port: 8092 |
| |
| spring: |
| rabbitmq: |
| host: 192.168.100.134 |
| port: 5672 |
| username: Coke |
| password: 123456 |
| listener: |
| simple: |
| acknowledge-mode: manual |
⑥:接收消息
| package com.it.receiver.serive; |
| |
| import com.rabbitmq.client.Channel; |
| import lombok.extern.slf4j.Slf4j; |
| import org.springframework.amqp.core.ExchangeBuilder; |
| import org.springframework.amqp.core.Message; |
| import org.springframework.amqp.core.MessageProperties; |
| import org.springframework.amqp.rabbit.annotation.RabbitListener; |
| import org.springframework.beans.factory.annotation.Value; |
| import org.springframework.stereotype.Component; |
| |
| import javax.xml.crypto.Data; |
| import java.io.IOException; |
| import java.util.Date; |
| |
| |
| |
| |
| |
| |
| |
| |
| @Component |
| @Slf4j |
| public class ReceiverMsg { |
| @RabbitListener (queues = {"queue.normal.4"}) |
| public void receiverMsg (Message message, Channel channel) throws IOException { |
| |
| MessageProperties properties = message.getMessageProperties(); |
| |
| long tag = properties.getDeliveryTag(); |
| try { |
| byte[] body = message.getBody(); |
| String msg = new String(body); |
| log.info("接收到消息{},时间为:{}", msg, new Date()); |
| int i= 1/0; |
| |
| channel.basicAck(tag,false); |
| }catch (Exception e){ |
| e.printStackTrace(); |
| |
| channel.basicNack(tag,false,false); |
| } |
| } |
| } |
⑦:效果


5. 消费者拒绝消息

五、RabbitMQ延迟队列
1. 采用消息中间件(rabbitmq)
问题:先发送的消息,延时时间长,会影响到后面的消息

①:引入依赖和配置
| |
| org.springframework.boot |
| spring-boot-starter-amqp |
| |
| server: |
| port: 8093 |
| |
| spring: |
| rabbitmq: |
| host: 192.168.100.134 |
| port: 5672 |
| username: Coke |
| password: 123456 |
| |
| rabbit: |
| exchangeName: exchange.delay.1 |
| queueNormalName: queue.delay.normal.1 |
| queueDlxName: queue.delay.dlx.1 |
②:添加配置类
| package com.it.rabbit.config; |
| |
| |
| import org.springframework.amqp.core.*; |
| import org.springframework.beans.factory.annotation.Value; |
| import org.springframework.context.annotation.Bean; |
| import org.springframework.context.annotation.Configuration; |
| |
| |
| |
| |
| |
| |
| |
| @Configuration |
| public class RabbitConfig { |
| |
| @Value ("${rabbit.exchangeName}") |
| private String exchangeName; |
| |
| @Value ("${rabbit.queueNormalName}") |
| private String queueNormalName; |
| |
| @Value ("${rabbit.queueDlxName}") |
| private String queueDlxName; |
| |
| |
| @Bean |
| public TopicExchange topicExchange () { |
| return ExchangeBuilder |
| .topicExchange(exchangeName) |
| .build(); |
| } |
| |
| |
| @Bean |
| public Queue queueNormal () { |
| return QueueBuilder |
| .durable(queueNormalName) |
| .ttl(25000) |
| .deadLetterExchange(exchangeName) |
| .deadLetterRoutingKey("error") |
| .build(); |
| } |
| |
| |
| @Bean |
| public Queue queueDlx () { |
| return QueueBuilder.durable(queueDlxName).build(); |
| } |
| |
| |
| |
| @Bean |
| public Binding bindingNormal (Queue queueNormal, TopicExchange topicExchange) { |
| return BindingBuilder.bind(queueNormal).to(topicExchange).with("order"); |
| } |
| |
| |
| @Bean |
| public Binding bindingDlx (Queue queueDlx, TopicExchange topicExchange) { |
| return BindingBuilder.bind(queueDlx).to(topicExchange).with("error"); |
| } |
| |
| } |
③:发送消息
| package com.it.rabbit.service; |
| |
| import lombok.extern.slf4j.Slf4j; |
| import org.springframework.amqp.core.Message; |
| import org.springframework.amqp.core.MessageBuilder; |
| import org.springframework.amqp.core.MessageProperties; |
| import org.springframework.amqp.rabbit.core.RabbitTemplate; |
| import org.springframework.beans.factory.annotation.Autowired; |
| import org.springframework.beans.factory.annotation.Value; |
| import org.springframework.stereotype.Service; |
| |
| import java.nio.charset.StandardCharsets; |
| import java.util.Date; |
| |
| |
| |
| |
| |
| |
| |
| @Service |
| @Slf4j |
| public class MessageService { |
| |
| @Value ("${rabbit.exchangeName}") |
| private String exchangeName; |
| |
| @Autowired |
| private RabbitTemplate rabbitTemplate; |
| |
| public void sendMsg(){ |
| Message message = MessageBuilder.withBody("这是一个延时队列的消息".getBytes(StandardCharsets.UTF_8)).build(); |
| |
| rabbitTemplate.convertAndSend(exchangeName, "order", message); |
| log.info("消息发送成!{}", new Date()); |
| } |
| } |
④:配置启动类发送消息
| @SpringBootApplication |
| public class Rabbit03TopicApplication implements ApplicationRunner { |
| @Autowired |
| private MessageService messageService; |
| |
| public static void main (String[] args) { |
| SpringApplication.run(Rabbit03TopicApplication.class, args); |
| } |
| |
| @Override |
| public void run (ApplicationArguments args) throws Exception { |
| messageService.sendMsg(); |
| } |
| } |
⑤:效果


2. 使用插件(rabbitmq-delayed-message-exchange)

①:下载插件
1. 官网:github.com/rabbitmq/ra…


②:安装插件
1. 将插件复制到linux中rabbitmq的/plugins目录下

2. 解压 unzip rabbitmq_delayed_message_exchange-3.9.0.ez


③:启动插件
1. 可以先使用命令列出所有的插件名字 rabbitmq_delayed_message_exchange

2.启动插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchange


** 3.可以看到多了一种交换机类型**

④: 引入依赖与配置

| |
| org.springframework.boot |
| spring-boot-starter-amqp |
| |
| server: |
| port: 8094 |
| |
| spring: |
| rabbitmq: |
| host: 192.168.100.134 |
| port: 5672 |
| username: Coke |
| password: 123456 |
| |
| rabbit: |
| exchangeName: exchange.delay.2 |
| queueDelayName: queue.delay.2 |
⑤:添加配置类
| @Configuration |
| public class RabbitConfig { |
| |
| @Value ("${rabbit.exchangeName}") |
| private String exchangeName; |
| |
| @Value ("${rabbit.queueDelayName}") |
| private String queueDelayName; |
| |
| |
| @Bean |
| public CustomExchange customExchange () { |
| HashMap arguments = new HashMap(); |
| arguments.put("x-delayed-type", "direct"); |
| return new CustomExchange(exchangeName, "x-delayed-message", true, false, arguments); |
| } |
| |
| |
| @Bean |
| public Queue queueNormal () { |
| return QueueBuilder |
| .durable(queueDelayName) |
| .build(); |
| } |
| |
| |
| @Bean |
| public Binding bindingNormal (Queue queueNormal, CustomExchange customExchange) { |
| |
| return BindingBuilder.bind(queueNormal).to(customExchange).with("plugin").noargs(); |
| } |
| |
| } |
⑥:发送消息
| @Service |
| @Slf4j |
| public class MessageService { |
| |
| @Value ("${rabbit.exchangeName}") |
| private String exchangeName; |
| |
| @Autowired |
| private RabbitTemplate rabbitTemplate; |
| |
| public void sendMsg () { |
| { |
| MessageProperties properties = new MessageProperties(); |
| properties.setHeader("x-delay", 25000); |
| Message message = MessageBuilder.withBody("这是一个延时25秒的消息消息".getBytes(StandardCharsets.UTF_8)).andProperties(properties).build(); |
| |
| rabbitTemplate.convertAndSend(exchangeName, "plugin", message); |
| log.info("延时25秒消息发送成!{}", new Date()); |
| } |
| { |
| MessageProperties properties = new MessageProperties(); |
| properties.setHeader("x-delay", 15000); |
| Message message = MessageBuilder.withBody("这是一个延时15秒的消息消息".getBytes(StandardCharsets.UTF_8)).andProperties(properties).build(); |
| |
| rabbitTemplate.convertAndSend(exchangeName, "plugin", message); |
| log.info("延时15秒消息发送成!{}", new Date()); |
| } |
| |
| } |
| } |
⑦:接收消息
| @Component |
| @Slf4j |
| public class ReceiveMessage { |
| |
| @RabbitListener (queues = "queue.delay.2") |
| public void receiveMessage (Message message) { |
| byte[] body = message.getBody(); |
| String msg = new String(body); |
| log.info("接收到消息:{}, 时间为:{}", msg, new Date()); |
| } |
| } |
⑧:可以看到消息延时时间长的并没有影响到消息延时时间短的

六、消息可靠性(消息到达交换机)
1. confirm确认模式

①:配置文件(开启确认模式)
| |
| org.springframework.boot |
| spring-boot-starter-amqp |
| |
| server: |
| port: 8097 |
| |
| spring: |
| rabbitmq: |
| host: 192.168.100.134 |
| port: 5672 |
| username: Coke |
| password: 123456 |
| publisher-confirm-type: correlated |
| |
| rabbit: |
| exchangeName: exchange.confirm.1 |
| queueName: queue.confirm.1 |
②:添加配置类
| @Configuration |
| public class RabbitConfig { |
| |
| @Value ("${rabbit.exchangeName}") |
| private String exchangeName; |
| |
| |
| @Value ("${rabbit.queueName}") |
| private String queueName; |
| |
| |
| |
| @Bean |
| public TopicExchange topicExchange(){ |
| return ExchangeBuilder.topicExchange(exchangeName).build(); |
| } |
| |
| @Bean |
| public Queue queue(){ |
| return QueueBuilder.durable(queueName).build(); |
| } |
| |
| |
| |
| @Bean |
| public Binding binding1(Queue queue, TopicExchange topicExchange){ |
| return BindingBuilder.bind(queue).to(topicExchange).with("info"); |
| } |
| } |
| @Component |
| @Slf4j |
| public class MyConfirmCallBack implements RabbitTemplate.ConfirmCallback { |
| @Override |
| public void confirm (CorrelationData correlationData, boolean ack, String cause) { |
| if (ack){ |
| log.info("消息正确的达到交换机!"); |
| return; |
| } |
| log.error("消息没有到达交换机,原因为:{}", cause); |
| } |
| } |
③:发送消息
| @Service |
| @Slf4j |
| public class MessageService { |
| |
| @Value ("${rabbit.exchangeName}") |
| private String exchangeName; |
| |
| @Autowired |
| private RabbitTemplate rabbitTemplate; |
| |
| @Autowired |
| private MyConfirmCallBack myConfirmCallBack; |
| |
| @PostConstruct |
| public void init(){ |
| rabbitTemplate.setConfirmCallback(myConfirmCallBack); |
| } |
| |
| public void sendMsg(){ |
| Message message = MessageBuilder.withBody("这是一个带有过期时间的消息".getBytes(StandardCharsets.UTF_8)).build(); |
| CorrelationData correlationData = new CorrelationData(); |
| correlationData.setId("id_110"); |
| |
| rabbitTemplate.convertAndSend(exchangeName, "info", message,correlationData); |
| log.info("消息发送成!{}", new Date()); |
| } |
| } |
④:测试
1.发送消息时 故意将交换机名字写错 模拟错误

2.正常测试

2.confirm确认模式(lambda表达式写法)

MessageService(发送消息类)
| @Service |
| @Slf4j |
| public class MessageService { |
| |
| @Value ("${rabbit.exchangeName}") |
| private String exchangeName; |
| |
| @Autowired |
| private RabbitTemplate rabbitTemplate; |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| @PostConstruct |
| public void init () { |
| rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { |
| if (ack) { |
| log.info("消息正确的达到交换机!"); |
| return; |
| } |
| log.error("消息没有到达交换机,原因为:{}", cause); |
| }); |
| } |
| |
| public void sendMsg () { |
| Message message = MessageBuilder.withBody("这是一个带有过期时间的消息".getBytes(StandardCharsets.UTF_8)).build(); |
| CorrelationData correlationData = new CorrelationData(); |
| correlationData.setId("id_110"); |
| |
| rabbitTemplate.convertAndSend(exchangeName, "info", message, correlationData); |
| log.info("消息发送成!{}", new Date()); |
| } |
| } |
七、消息可靠性(交换机到队列)

1. Return模式

①:配置文件(开启确认模式)
| |
| org.springframework.boot |
| spring-boot-starter-amqp |
| |
| server: |
| port: 8098 |
| |
| spring: |
| rabbitmq: |
| host: 192.168.100.134 |
| port: 5672 |
| username: Coke |
| password: 123456 |
| publisher-returns: true |
| |
| rabbit: |
| exchangeName: exchange.return.1 |
| queueName: queue.return.1 |
②:添加配置类
| @Configuration |
| public class RabbitConfig { |
| |
| @Value ("${rabbit.exchangeName}") |
| private String exchangeName; |
| |
| |
| @Value ("${rabbit.queueName}") |
| private String queueName; |
| |
| |
| |
| @Bean |
| public TopicExchange topicExchange(){ |
| return ExchangeBuilder.topicExchange(exchangeName).build(); |
| } |
| |
| @Bean |
| public Queue queue(){ |
| return QueueBuilder.durable(queueName).build(); |
| } |
| |
| |
| |
| @Bean |
| public Binding binding1(Queue queue, TopicExchange topicExchange){ |
| return BindingBuilder.bind(queue).to(topicExchange).with("info"); |
| } |
| } |
| @Component |
| @Slf4j |
| public class MyReturnCallBack implements RabbitTemplate.ReturnsCallback { |
| |
| |
| @Override |
| public void returnedMessage (ReturnedMessage returned) { |
| log.error("消息从交换机未能正确到达队列,原因为:{}", returned.getReplyText()); |
| } |
| } |
③:发送消息
| @Service |
| @Slf4j |
| public class MessageService { |
| |
| @Value ("${rabbit.exchangeName}") |
| private String exchangeName; |
| |
| @Autowired |
| private RabbitTemplate rabbitTemplate; |
| |
| @Autowired |
| private MyReturnCallBack myReturnCallBack; |
| |
| @PostConstruct |
| public void init(){ |
| rabbitTemplate.setReturnsCallback(myReturnCallBack); |
| } |
| |
| public void sendMsg(){ |
| Message message = MessageBuilder.withBody("这是一个消息".getBytes(StandardCharsets.UTF_8)).build(); |
| |
| rabbitTemplate.convertAndSend(exchangeName, "info11", message); |
| log.info("消息发送成!{}", new Date()); |
| } |
| } |
④:测试
1.发送消息时 故意将路由key写错 模拟错误


2.正常测试(消息发送成功,没有报错)


2. Return模式(lambda表达式写法)

MessageService(发送消息类)
| @Service |
| @Slf4j |
| public class MessageService { |
| |
| @Value ("${rabbit.exchangeName}") |
| private String exchangeName; |
| |
| @Autowired |
| private RabbitTemplate rabbitTemplate; |
| |
| @PostConstruct |
| public void init () { |
| rabbitTemplate.setReturnsCallback( |
| returned -> log.error("消息从交换机未能正确到达队列,原因为:{}", returned.getReplyText()) |
| ); |
| } |
| |
| public void sendMsg () { |
| Message message = MessageBuilder.withBody("这是一个消息".getBytes(StandardCharsets.UTF_8)).build(); |
| |
| rabbitTemplate.convertAndSend(exchangeName, "info", message); |
| log.info("消息发送成!{}", new Date()); |
| } |
| } |
3. 交换机属性
| |
| @Bean |
| public TopicExchange topicExchange(){ |
| return ExchangeBuilder. |
| topicExchange(exchangeName) |
| .durable(false) |
| .autoDelete() |
| .build(); |
| } |

八、备用交换机


1. 实现
①:引入依赖与配置
| |
| org.springframework.boot |
| spring-boot-starter-amqp |
| |
| server: |
| port: 8110 |
| |
| spring: |
| rabbitmq: |
| host: 192.168.100.134 |
| port: 5672 |
| username: Coke |
| password: 123456 |
| |
| rabbit: |
| exchangeName: exchange.alternate.normal.1 |
| exchangeAlternateName: exchange.alternate.alternate.1 |
| queueName: queue.alternate.normal.1 |
| queueAlternateName: queue.alternate.alternate.1 |
②:添加配置类
| @Configuration |
| public class RabbitConfig { |
| |
| @Value ("${rabbit.exchangeName}") |
| private String exchangeName; |
| |
| @Value ("${rabbit.queueName}") |
| private String queueName; |
| |
| @Value ("${rabbit.exchangeAlternateName}") |
| private String exchangeAlternateName; |
| |
| @Value ("${rabbit.queueAlternateName}") |
| private String queueAlternateName; |
| |
| |
| |
| @Bean |
| public TopicExchange topicExchange(){ |
| return ExchangeBuilder. |
| topicExchange(exchangeName) |
| .alternate(exchangeAlternateName) |
| .build(); |
| } |
| |
| @Bean |
| public Queue queue(){ |
| return QueueBuilder.durable(queueName).build(); |
| } |
| |
| |
| @Bean |
| public Binding binding1(Queue queue, TopicExchange topicExchange){ |
| return BindingBuilder.bind(queue).to(topicExchange).with("info"); |
| } |
| |
| |
| @Bean |
| public FanoutExchange fanoutExchange(){ |
| return ExchangeBuilder.fanoutExchange(exchangeAlternateName).build(); |
| } |
| |
| |
| @Bean |
| public Queue queueAlternate(){ |
| return QueueBuilder.durable(queueAlternateName).build(); |
| } |
| |
| |
| @Bean |
| public Binding binding(Queue queueAlternate, FanoutExchange fanoutExchange){ |
| return BindingBuilder.bind(queueAlternate).to(fanoutExchange); |
| } |
| } |
③:发送消息
| @Service |
| @Slf4j |
| public class MessageService { |
| |
| @Value ("${rabbit.exchangeName}") |
| private String exchangeName; |
| |
| @Autowired |
| private RabbitTemplate rabbitTemplate; |
| |
| public void sendMsg(){ |
| Message message = MessageBuilder.withBody("这是一个消息".getBytes(StandardCharsets.UTF_8)).build(); |
| |
| rabbitTemplate.convertAndSend(exchangeName, "info", message); |
| log.info("消息发送成!{}", new Date()); |
| } |
| } |
④:测试效果
1. 正常测试(指定正确路由key)


2.非正常测试(指定错误的路由key)通过备用交换机进入备用队列中


