一、安装 RabbitMQ
①:安装
安装笔记:juejin.cn/post/726153…
②:RabbitMq相关命令
1. 创建用户
sudo rabbitmqctl add_user 用户名 密码
sudo rabbitmqctl add_user Coke 123456
2. 查看所有用户
sudo rabbitmqctl list_users
3. 设置用户角色
sudo rabbitmqctl set_user_tags Coke administrator
4. 设置用户权限
sudo rabbitmqctl set_permissions Coke 读 写 配置
sudo rabbitmqctl set_permissions Coke ".*" ".*" ".*"
5. 查看用户权限
sudo rabbitmqctl list_permissions
③:RabbitMQ工作模型
broker相当于mysq1服务器,virtual host相当于数据库(可以有多个数据库)
queue相当于表,消息相当于记录。
消息队列有三个核心要素:消息生产者、消息队列、消息消费者;
-
生产者(Producer):发送消息的应用;(java程序,也可能是别的语言写的程序)
-
消费者(Consumer):接收消息的应用;(java程序,也可能是别的语言写的程序)
-
代理(Broker):就是消息服务器,RabbitMQ Server就是Message Broker;
-
连接(Connection):连接RabbitMQ服务器的TCP长连接;
-
信道(Channel):连接中的一个虚拟通道,消息队列发送或者接收消息时,都是通过信道
进行的;
虚拟主机(Virtual host): 一个虚拟分组,在代码中就是一个字符串,当多个不同的用
户使用同一个RabbitMQ服务时,可以划分出多个Virtual host,每个用户在自己的Virtual
host创建exchange/queue等;(分类比较清晰、相互隔离)
交换机(Exchange): 交换机负责从生产者接收消息,并根据交换机类型分发到对应的消
息队列中,起到一个路由的作用;
路由键(Routing Key): 交换机根据路由键来决定消息分发到哪个队列,路由键是消息
的目的地址;
绑定(Binding): 绑定是队列和交换机的一个关联连接(关联关系);
队列(Queue): 存储消息的缓存;
消息(Message): 由生产者通过RabbitMQ发送给消费者的信息;(消息可以任何数据,
字符串、user对象,json串等等)
二、RabbitMQ -- 交换机
1:交换机类型
1、Fanout Exchange(扇形)
2、Direct Exchange(直连)
3、Topic Exchange(主题)
4、Headers Exchange(头部)
2:扇形交换机(Fanout)
Fanout扇形的,散开的;
扇形交换机投递到所有绑定的队列,不需要路由键,不需要进行路由键的匹配,相当于广播、群发;
①:导入RabbitMQ依赖
org.springframework.boot
spring-boot-starter-amqp
②:配置连接信息
server:
port: 8801
spring:
rabbitmq:
host: 192.168.100.134
port: 5672
username: Coke
password: 123456
③:添加配置类
定义交换机、队列以及绑定队列与交换机
@Configuration
public class RabbitConfig {
// 1. 定义交换机
// 2. 定义队列
// 3. 绑定队列
public static String FANOUT_EXCHANGE_NAME = "fanout_exchange_1";
public static String QUEUE_NAME_1 = "queue_name_1";
public static String QUEUE_NAME_2 = "queue_name_2";
/**
* 定于交换机
* @DateTime: 2023/8/6 22:48
*
* @return FanoutExchange
* @author: Coke
*/
@Bean
public FanoutExchange fanoutExchange(){
return ExchangeBuilder.fanoutExchange(FANOUT_EXCHANGE_NAME).build();
}
/**
* 定义队列1
* @DateTime: 2023/8/6 22:50
*
* @return Queue
* @author: Coke
*/
@Bean
public Queue queue1(){
return QueueBuilder.durable(QUEUE_NAME_1).build();
}
/**
* 定义队列2
* @DateTime: 2023/8/6 22:51
*
* @return Queue
* @author: Coke
*/
@Bean
public Queue queue2(){
return QueueBuilder.durable(QUEUE_NAME_2).build();
}
/**
* 交换机 与 队列1 绑定
* @DateTime: 2023/8/6 22:52
*
* @param fanoutExchange:
* @param queue1:
* @return Binding
* @author: Coke
*/
@Bean
public Binding binding1(FanoutExchange fanoutExchange, Queue queue1){
return BindingBuilder.bind(queue1).to(fanoutExchange);
}
/**
* 交换机 与 队列2 绑定
* @DateTime: 2023/8/6 22:52
*
* @param fanoutExchange:
* @param queue2:
* @return Binding
* @author: Coke
*/
@Bean
public Binding binding2(FanoutExchange fanoutExchange, Queue queue2){
return BindingBuilder.bind(queue2).to(fanoutExchange);
}
}
④:发送消息
@Component
@Slf4j
public class SendMessage {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(){
// 定义发送消息
Message message = new Message("这是扇形交换机发送的1条消息".getBytes());
rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE_NAME,"",message);
log.info("消息发送成功!{}",new Date());
}
}
⑤:配置启动程序后 发送消息
@SpringBootApplication
public class Rabbit01FanoutApplication implements ApplicationRunner {
@Autowired
private SendMessage sendMessage;
public static void main (String[] args) {
SpringApplication.run(Rabbit01FanoutApplication.class, args);
}
/**
* 程序启动时发送消息(程序一启动就会运行该方法)
* @DateTime: 2023/8/6 22:59
*
* @param args:
* @return void
* @author: Coke
*/
@Override
public void run (ApplicationArguments args) throws Exception {
sendMessage.sendMessage();
}
}
启动程序(消息发送成功)
每个队列中都有一条消息
⑥:接收消息
这里新建一个模块用来接收消息(一般情况生产者和消费者都是分开的)
01. 引入依赖和配置
引入依赖和配置和发送端一样的(改一下端口号即可)
02. 接收消息
创建ReceiveMessage接收消息的类
@Slf4j
public class ReceiveMessage {
public static final String QUEUE_NAME_1 = "queue_name_1";
public static final String QUEUE_NAME_2 = "queue_name_2";
/**
* 接收消息
* @DateTime: 2023/8/13 15:53
*
* @param message:
* @return void
* @author: Coke
*/
// queues = {QUEUE_NAME_1, QUEUE_NAME_2} 指定需要接收哪个队列中的消息(可以是多个)
@RabbitListener (queues = {QUEUE_NAME_1, QUEUE_NAME_2})
public void receiveMessage (Message message) {
byte[] body = message.getBody();
String msg = new String(body);
log.info("{} 接收到一条消息:{}", new Date(), msg);
}
}
启动项目即可
- 接收到了两条消息
- 队列中已经没有消息了
3:直连交换机(Direct)
①:引入依赖和配置
引入依赖和配置和扇形交换机一样的(改一下端口号即可)
②:添加配置类
定义交换机、队列以及绑定队列与交换机
@Configuration
public class RabbitConfig {
// 交换机和对队列名
public static String DIRECT_EXCHANGE_NAME = "direct_exchange_1";
public static String DIRECT_QUEUE_1 = "direct_queue_1";
public static String DIRECT_QUEUE_2 = "direct_queue_2";
/**
* 定义直连交换机
* @DateTime: 2023/8/13 21:22
*
* @return DirectExchange
* @author: Coke
*/
@Bean
public DirectExchange directExchange(){
return ExchangeBuilder.directExchange(DIRECT_EXCHANGE_NAME).build();
}
/**
* 定义队列1
* @DateTime: 2023/8/13 21:23
*
* @return Queue
* @author: Coke
*/
@Bean
public Queue queue1(){
return QueueBuilder.durable(DIRECT_QUEUE_1).build();
}
/**
* 定义队列2
* @DateTime: 2023/8/13 21:23
*
* @return Queue
* @author: Coke
*/
@Bean
public Queue queue2(){
return QueueBuilder.durable(DIRECT_QUEUE_2).build();
}
// 绑定交换机与队列1 key= "error"
@Bean
public Binding bindingQueue1(DirectExchange directExchange, Queue queue1){
return BindingBuilder.bind(queue1).to(directExchange).with("error");
}
// 绑定交换机与队列2 key= "info"
@Bean
public Binding bindingQueue2(DirectExchange directExchange, Queue queue2){
return BindingBuilder.bind(queue2).to(directExchange).with("info");
}
// 绑定交换机与队列2 key= "error"
@Bean
public Binding bindingQueue3(DirectExchange directExchange, Queue queue2){
return BindingBuilder.bind(queue2).to(directExchange).with("error");
}
// 绑定交换机与队列2 key= "warning"
@Bean
public Binding bindingQueue4(DirectExchange directExchange, Queue queue2){
return BindingBuilder.bind(queue2).to(directExchange).with("warning");
}
}
③:发送消息
@Slf4j
@Component
public class SendMessage {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(){
// 创建发送的消息体
String msg = "这是直连交换机的一条消息";
Message message = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8)).build();
// 发送
// convertAndSend(String exchange, String routingKey, Object message)
rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE_NAME, "error", message);
log.info("消息发送成功:{}", new Date());
}
}
④:配置启动程序后 发送消息
@SpringBootApplication
public class Rabbit02DirectApplication implements ApplicationRunner {
@Autowired
private SendMessage sendMessage;
public static void main (String[] args) {
SpringApplication.run(Rabbit02DirectApplication.class, args);
}
@Override
public void run (ApplicationArguments args) throws Exception {
sendMessage.sendMsg();
}
}
消息发送成功,配置的key="error" 所以两个队列都可以接收到消息"
⑤:改变key="info"再次测试
消息发送成功,配置的key="info" 所以只有队列2可以接收到消息"
4:主题交换机(Topic)
①:引入依赖和配置
引入依赖和配置
org.springframework.boot
spring-boot-starter-amqp
server:
port: 8084
spring:
rabbitmq:
host: 192.168.100.134
port: 5672
username: Coke
password: 123456
rabbit:
exchangeName: exchange.topic
queueName1: queue.topic.1
queueName2: queue.topic.2
②:添加配置类
@Configuration
public class RabbitConfig {
@Value ("rabbit.exchangeName")
private String exchangeName;
@Value ("${rabbit.queueName1}")
private String queueName1;
@Value ("${rabbit.queueName2}")
private String queueName2;
// 定义交换机
@Bean
public TopicExchange topicExchange(){
return ExchangeBuilder.topicExchange(exchangeName).build();
}
// 定义队列1
@Bean
public Queue queue1(){
return QueueBuilder.durable(queueName1).build();
}
// 定义队列2
@Bean
public Queue queue2(){
return QueueBuilder.durable(queueName2).build();
}
// 绑定队列与交换机
@Bean
public Binding binding1(Queue queue1, TopicExchange topicExchange){
return BindingBuilder.bind(queue1).to(topicExchange).with("*.orange.*");
}
@Bean
public Binding binding2(Queue queue2, TopicExchange topicExchange){
return BindingBuilder.bind(queue2).to(topicExchange).with("*.*.rabbit");
}
@Bean
public Binding binding3(Queue queue2, TopicExchange topicExchange){
return BindingBuilder.bind(queue2).to(topicExchange).with("lazy.#");
}
}
③:发送消息
@Service
@Slf4j
public class MessageService {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(){
Message message = MessageBuilder.withBody("主题交换机".getBytes(StandardCharsets.UTF_8)).build();
// 参数 (交换机 路由key 消息体)
rabbitTemplate.convertAndSend(exchangeName, "lazy.orange.rabbit", message);
log.info("消息发送成!");
}
}
④:配置启动程序后 发送消息
@SpringBootApplication
public class Rabbit03TopicApplication implements ApplicationRunner {
@Autowired
private MessageService messageService;
public static void main (String[] args) {
SpringApplication.run(Rabbit03TopicApplication.class, args);
}
@Override
public void run (ApplicationArguments args) throws Exception {
messageService.sendMsg();
}
}
消息发送成功,配置的key="lazy.orange.rabbit" 所以两个队列都可以接收到消息"
5:头部交换机(Headers)
①:引入依赖和配置
引入依赖和配置
org.springframework.boot
spring-boot-starter-amqp
server:
port: 8085
spring:
rabbitmq:
host: 192.168.100.134
port: 5672
username: Coke
password: 123456
rabbit:
exchangeName: exchange.headers
queueName1: queue.headers.1
queueName2: queue.headers.2
②:添加配置类
@Configuration
public class RabbitConfig {
@Value("${rabbit.exchangeName}")
private String exchangeName;
@Value("${rabbit.queueName1}")
private String queueName1;
@Value("${rabbit.queueName2}")
private String queueName2;
@Bean
public HeadersExchange headersExchange(){
return ExchangeBuilder.headersExchange(exchangeName).build();
}
@Bean
public Queue queue1(){
return QueueBuilder.durable(queueName1).build();
}
@Bean
public Queue queue2(){
return QueueBuilder.durable(queueName2).build();
}
@Bean
public Binding bindingA(HeadersExchange headersExchange, Queue queue1){
HashMap map = new HashMap();
map.put("type", "m");
map.put("status",1);
return BindingBuilder.bind(queue1).to(headersExchange).whereAll(map).match();
}
@Bean
public Binding bindingB(HeadersExchange headersExchange, Queue queue2){
HashMap map = new HashMap();
map.put("type", "s");
map.put("status",0);
return BindingBuilder.bind(queue2).to(headersExchange).whereAll(map).match();
}
}
③:发送消息
@Service
public class MessageService {
@Value("${rabbit.exchangeName}")
private String exchangeName;
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg () {
// 设置消息头属性与队列 2 一样
MessageProperties properties = new MessageProperties();
properties.setHeader("type", "s");
properties.setHeader("status", 0);
Message message = MessageBuilder.withBody("这是头部交换机的一个消息".getBytes())
.andProperties(properties).build();
// 头部交换机 可以不设置路由key
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
}
④:配置启动程序后 发送消息
@SpringBootApplication
@Slf4j
public class Rabbit04HeadersApplication implements ApplicationRunner {
@Autowired
private MessageService messageService;
public static void main (String[] args) {
SpringApplication.run(Rabbit04HeadersApplication.class, args);
}
@Override
public void run (ApplicationArguments args) throws Exception {
messageService.sendMsg();
log.info("消息发送成功!{}", new Date());
}
}
⑤:效果
只有队列2收到了消息
三、RabbitMQ过期消息
1. 设置单条消息的过期时间
①:引入依赖和配置
引入依赖和配置
org.springframework.boot
spring-boot-starter-amqp
server:
port: 8086
spring:
rabbitmq:
host: 192.168.100.134
port: 5672
username: Coke
password: 123456
rabbit:
exchangeName: exchange.ttl1
queueName: queue.ttl.1
②:添加配置类
@Configuration
public class RabbitConfig {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Value ("${rabbit.queueName}")
private String queueName;
// 定义交换机
@Bean
public TopicExchange topicExchange(){
return ExchangeBuilder.topicExchange(exchangeName).build();
}
// 定义队列
@Bean
public Queue queue(){
return QueueBuilder.durable(queueName).build();
}
// 绑定队列与交换机
@Bean
public Binding binding1(Queue queue, TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("info");
}
}
③:发送消息
@Service
@Slf4j
public class MessageService {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(){
MessageProperties properties = new MessageProperties();
// 设置消息过期时间为20秒
properties.setExpiration("20000");
Message message = MessageBuilder.withBody("这是一个带有过期时间的消息".getBytes(StandardCharsets.UTF_8))
.andProperties(properties).build();
// 参数 (交换机 路由key 消息体)
rabbitTemplate.convertAndSend(exchangeName, "info", message);
log.info("消息发送成!{}", new Date());
}
}
④:配置启动程序后发送消息
@SpringBootApplication
public class Rabbit03TopicApplication implements ApplicationRunner {
@Autowired
private MessageService messageService;
public static void main (String[] args) {
SpringApplication.run(Rabbit03TopicApplication.class, args);
}
@Override
public void run (ApplicationArguments args) throws Exception {
messageService.sendMsg();
}
}
⑤:效果
1.可以看到队列中有一条消息
2. 查看消息的属性
3.时间到后如果消息没有被取出来就会自动删除
2. 通过队列属性设置消息的过期时间
①:引入依赖和配置
引入依赖和配置
org.springframework.boot
spring-boot-starter-amqp
server:
port: 8087
spring:
rabbitmq:
host: 192.168.100.134
port: 5672
username: Coke
password: 123456
rabbit:
exchangeName: exchange.ttl2
queueName: queue.ttl.2
②:添加配置类
@Configuration
public class RabbitConfig {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Value ("${rabbit.queueName}")
private String queueName;
// 定义交换机
@Bean
public TopicExchange topicExchange(){
return ExchangeBuilder.topicExchange(exchangeName).build();
}
// 定义队列
@Bean
public Queue queue(){
return QueueBuilder.durable(queueName).ttl(20000).build();
}
// 绑定队列与交换机
@Bean
public Binding binding1(Queue queue, TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("info");
}
}
③:发送消息
@Service
@Slf4j
public class MessageService {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(){
MessageProperties properties = new MessageProperties();
// 设置消息过期时间为20秒
properties.setExpiration("20000");
Message message = MessageBuilder.withBody("这是一个带有过期时间的消息".getBytes(StandardCharsets.UTF_8))
.andProperties(properties).build();
// 参数 (交换机 路由key 消息体)
rabbitTemplate.convertAndSend(exchangeName, "info", message);
log.info("消息发送成!{}", new Date());
}
}
④:配置启动程序后发送消息
@SpringBootApplication
public class Rabbit03TopicApplication implements ApplicationRunner {
@Autowired
private MessageService messageService;
public static void main (String[] args) {
SpringApplication.run(Rabbit03TopicApplication.class, args);
}
@Override
public void run (ApplicationArguments args) throws Exception {
messageService.sendMsg();
}
}
⑤:效果
1.可以看到队列中有一条消息
2. 查看队列和消息的属性
3.时间到后如果消息没有被取出来就会自动删除
四、RabbitMQ死信队列
1. 队列过期
①:引入依赖和配置
引入依赖和配置
org.springframework.boot
spring-boot-starter-amqp
server:
port: 8089
spring:
rabbitmq:
host: 192.168.100.134
port: 5672
username: Coke
password: 123456
rabbit:
exchangeNormalName: exchange.normal.1 # 正常交换机
queueNormalName: queue.normal.1 # 正常队列
exchangeDlxName: exchange.dlx.1 # 死信交换机
queueDlxName: queue.dlx.1 # 死信队列
②:添加配置类
@Configuration
public class RabbitConfig {
@Value ("${rabbit.exchangeNormalName}")
private String exchangeNormalName;
@Value ("${rabbit.queueNormalName}")
private String queueNormalName;
@Value ("${rabbit.exchangeDlxName}")
private String exchangeDlxName;
@Value ("${rabbit.queueDlxName}")
private String queueDlxName;
// 定义交换机
@Bean
public DirectExchange topicNormalExchange(){
return ExchangeBuilder.directExchange(exchangeNormalName).build();
}
@Bean
public DirectExchange topicDlxExchange(){
return ExchangeBuilder.directExchange(exchangeDlxName).build();
}
// 定义队列
@Bean
public Queue queueNormal(){
HashMap map = new HashMap();
map.put("x-message-ttl",20000); // 设置队列的过期时间 (20秒)
map.put("x-dead-letter-exchange",exchangeDlxName); // 设置 队列的死信交换机(消息过期后进入死信交换机)
map.put("x-dead-letter-routing-key","error"); // 设置死信路由key
return QueueBuilder.durable(queueNormalName).withArguments(map).build();
}
@Bean
public Queue queueDlx(){
return QueueBuilder.durable(queueDlxName).build();
}
// 绑定队列与交换机
@Bean
public Binding binding1(Queue queueNormal, DirectExchange topicNormalExchange){
return BindingBuilder.bind(queueNormal).to(topicNormalExchange).with("order");
}
@Bean
public Binding binding2(Queue queueDlx, DirectExchange topicDlxExchange){
return BindingBuilder.bind(queueDlx).to(topicDlxExchange).with("error");
}
}
③:发送消息
@Service
@Slf4j
public class MessageService {
@Value ("${rabbit.exchangeNormalName}")
private String exchangeNormalName;
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(){
Message message = MessageBuilder.withBody("这是一个带有过期时间的死信消息".getBytes(StandardCharsets.UTF_8)).build();
// 参数 (交换机 路由key 消息体)
rabbitTemplate.convertAndSend(exchangeNormalName, "order", message);
log.info("消息发送成!{}", new Date());
}
}
④:配置启动程序后发送消息
@SpringBootApplication
public class Rabbit03TopicApplication implements ApplicationRunner {
@Autowired
private MessageService messageService;
public static void main (String[] args) {
SpringApplication.run(Rabbit03TopicApplication.class, args);
}
@Override
public void run (ApplicationArguments args) throws Exception {
messageService.sendMsg();
}
}
⑤:效果
- 1. 消息发送成功
- 2. 消息进入到正常队列中
- 3. 20秒后消息进入到死信队列中
2. 消息过期
①:引入依赖和配置
引入依赖和配置
org.springframework.boot
spring-boot-starter-amqp
server:
port: 8090
spring:
rabbitmq:
host: 192.168.100.134
port: 5672
username: Coke
password: 123456
rabbit:
exchangeNormalName: exchange.normal.2 # 正常交换机
queueNormalName: queue.normal.2 # 正常队列
exchangeDlxName: exchange.dlx.2 # 死信交换机
queueDlxName: queue.dlx.2 # 死信队列
②:添加配置类
@Configuration
public class RabbitConfig {
@Value ("${rabbit.exchangeNormalName}")
private String exchangeNormalName;
@Value ("${rabbit.queueNormalName}")
private String queueNormalName;
@Value ("${rabbit.exchangeDlxName}")
private String exchangeDlxName;
@Value ("${rabbit.queueDlxName}")
private String queueDlxName;
// 定义交换机
@Bean
public DirectExchange topicNormalExchange(){
return ExchangeBuilder.directExchange(exchangeNormalName).build();
}
@Bean
public DirectExchange topicDlxExchange(){
return ExchangeBuilder.directExchange(exchangeDlxName).build();
}
// 定义队列
@Bean
public Queue queueNormal(){
HashMap map = new HashMap();
map.put("x-dead-letter-exchange",exchangeDlxName); // 设置 队列的死信交换机(消息过期后进入死信交换机)
map.put("x-dead-letter-routing-key","error"); // 设置死信路由key
return QueueBuilder.durable(queueNormalName).withArguments(map).build();
}
@Bean
public Queue queueDlx(){
return QueueBuilder.durable(queueDlxName).build();
}
// 绑定队列与交换机
@Bean
public Binding binding1(Queue queueNormal, DirectExchange topicNormalExchange){
return BindingBuilder.bind(queueNormal).to(topicNormalExchange).with("order");
}
@Bean
public Binding binding2(Queue queueDlx, DirectExchange topicDlxExchange){
return BindingBuilder.bind(queueDlx).to(topicDlxExchange).with("error");
}
}
③:发送消息
@Service
@Slf4j
public class MessageService {
@Value ("${rabbit.exchangeNormalName}")
private String exchangeNormalName;
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg () {
MessageProperties properties = new MessageProperties();
properties.setExpiration("20000"); // 设置消息过期时间为20秒
Message message = MessageBuilder.withBody("这是一个带有过期时间的死信消息".getBytes(StandardCharsets.UTF_8)).andProperties(properties).build();
// 参数 (交换机 路由key 消息体)
rabbitTemplate.convertAndSend(exchangeNormalName, "order", message);
log.info("消息发送成!{}", new Date());
}
}
④:配置启动程序后发送消息
@SpringBootApplication
public class Rabbit03TopicApplication implements ApplicationRunner {
@Autowired
private MessageService messageService;
public static void main (String[] args) {
SpringApplication.run(Rabbit03TopicApplication.class, args);
}
@Override
public void run (ApplicationArguments args) throws Exception {
messageService.sendMsg();
}
}
⑤:效果
- 1. 消息发送成功
- 2. 消息进入到正常队列中
- 3. 20秒后消息进入到死信队列中
3. 队列达到最大长度
①:引入依赖和配置
引入依赖和配置
org.springframework.boot
spring-boot-starter-amqp
server:
port: 8090
spring:
rabbitmq:
host: 192.168.100.134
port: 5672
username: Coke
password: 123456
rabbit:
exchangeNormalName: exchange.normal.2 # 正常交换机
queueNormalName: queue.normal.2 # 正常队列
exchangeDlxName: exchange.dlx.2 # 死信交换机
queueDlxName: queue.dlx.2 # 死信队列
②:添加配置类
@Configuration
public class RabbitConfig {
@Value ("${rabbit.exchangeNormalName}")
private String exchangeNormalName;
@Value ("${rabbit.queueNormalName}")
private String queueNormalName;
@Value ("${rabbit.exchangeDlxName}")
private String exchangeDlxName;
@Value ("${rabbit.queueDlxName}")
private String queueDlxName;
// 定义交换机
@Bean
public DirectExchange topicNormalExchange(){
return ExchangeBuilder.directExchange(exchangeNormalName).build();
}
@Bean
public DirectExchange topicDlxExchange(){
return ExchangeBuilder.directExchange(exchangeDlxName).build();
}
// 定义队列
@Bean
public Queue queueNormal(){
HashMap map = new HashMap();
map.put("x-dead-letter-exchange",exchangeDlxName); // 设置 队列的死信交换机(消息过期后进入死信交换机)
map.put("x-dead-letter-routing-key","error"); // 设置死信路由key
return QueueBuilder.durable(queueNormalName).withArguments(map).build();
}
@Bean
public Queue queueDlx(){
return QueueBuilder.durable(queueDlxName).build();
}
// 绑定队列与交换机
@Bean
public Binding binding1(Queue queueNormal, DirectExchange topicNormalExchange){
return BindingBuilder.bind(queueNormal).to(topicNormalExchange).with("order");
}
@Bean
public Binding binding2(Queue queueDlx, DirectExchange topicDlxExchange){
return BindingBuilder.bind(queueDlx).to(topicDlxExchange).with("error");
}
}
③:发送消息
@Service
@Slf4j
public class MessageService {
@Value ("${rabbit.exchangeNormalName}")
private String exchangeNormalName;
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg () {
MessageProperties properties = new MessageProperties();
properties.setExpiration("20000"); // 设置消息过期时间为20秒
Message message = MessageBuilder.withBody("这是一个带有过期时间的死信消息".getBytes(StandardCharsets.UTF_8)).andProperties(properties).build();
// 参数 (交换机 路由key 消息体)
rabbitTemplate.convertAndSend(exchangeNormalName, "order", message);
log.info("消息发送成!{}", new Date());
}
}
④:配置启动程序后发送消息
@SpringBootApplication
public class Rabbit03TopicApplication implements ApplicationRunner {
@Autowired
private MessageService messageService;
public static void main (String[] args) {
SpringApplication.run(Rabbit03TopicApplication.class, args);
}
@Override
public void run (ApplicationArguments args) throws Exception {
messageService.sendMsg();
}
}
⑤:效果
- 1. 消息发送成功
- 2. 有5条消息保留在正常队列中 有3条消息保留在死信队列中
- 3. 队列(先进先出)进入死信队列的3条消息是前3条
4. 消费者拒绝消息不进行重新投递
①:生产者配置
org.springframework.boot
spring-boot-starter-amqp
server:
port: 8092
spring:
rabbitmq:
host: 192.168.100.134
port: 5672
username: Coke
password: 123456
rabbit:
exchangeNormalName: exchange.normal.4 # 正常交换机
queueNormalName: queue.normal.4 # 正常队列
exchangeDlxName: exchange.dlx.4 # 私信交换机
queueDlxName: queue.dlx.4 # 私信队列
②:添加配置类
package com.it.rabbit.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
/**
* TODO
*
* @author: Coke
* @DateTime: 2023/08/20/19:15
**/
@Configuration
public class RabbitConfig {
@Value ("${rabbit.exchangeNormalName}")
private String exchangeNormalName;
@Value ("${rabbit.queueNormalName}")
private String queueNormalName;
@Value ("${rabbit.exchangeDlxName}")
private String exchangeDlxName;
@Value ("${rabbit.queueDlxName}")
private String queueDlxName;
// 定义交换机
@Bean
public DirectExchange topicNormalExchange(){
return ExchangeBuilder.directExchange(exchangeNormalName).build();
}
@Bean
public DirectExchange topicDlxExchange(){
return ExchangeBuilder.directExchange(exchangeDlxName).build();
}
// 定义队列
@Bean
public Queue queueNormal(){
HashMap map = new HashMap();
map.put("x-dead-letter-exchange",exchangeDlxName); // 设置 队列的死信交换机(消息过期后进入私信交换机)
map.put("x-dead-letter-routing-key","error"); // 设置死信路由key
return QueueBuilder.durable(queueNormalName).withArguments(map).build();
}
@Bean
public Queue queueDlx(){
return QueueBuilder.durable(queueDlxName).build();
}
// 绑定队列与交换机
@Bean
public Binding binding1(Queue queueNormal, DirectExchange topicNormalExchange){
return BindingBuilder.bind(queueNormal).to(topicNormalExchange).with("order");
}
@Bean
public Binding binding2(Queue queueDlx, DirectExchange topicDlxExchange){
return BindingBuilder.bind(queueDlx).to(topicDlxExchange).with("error");
}
}
③:发送消息
package com.it.rabbit.service;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.Date;
/**
* TODO
*
* @author: Coke
* @DateTime: 2023/08/20/19:25
**/
@Service
@Slf4j
public class MessageService {
@Value ("${rabbit.exchangeNormalName}")
private String exchangeNormalName;
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg () {
String sir = "这是第1个消息";
Message message = MessageBuilder.withBody(sir.getBytes(StandardCharsets.UTF_8)).build();
// 参数 (交换机 路由key 消息体)
rabbitTemplate.convertAndSend(exchangeNormalName, "order", message);
log.info("消息发送成!{}", new Date());
}
}
④:配置启动类发送消息
@SpringBootApplication
public class Rabbit03TopicApplication implements ApplicationRunner {
@Autowired
private MessageService messageService;
public static void main (String[] args) {
SpringApplication.run(Rabbit03TopicApplication.class, args);
}
@Override
public void run (ApplicationArguments args) throws Exception {
messageService.sendMsg();
}
}
⑤:消费者配置
org.springframework.boot
spring-boot-starter-amqp
server:
port: 8092
spring:
rabbitmq:
host: 192.168.100.134
port: 5672
username: Coke
password: 123456
listener:
simple:
acknowledge-mode: manual # 开启消费者手动确认
⑥:接收消息
package com.it.receiver.serive;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.xml.crypto.Data;
import java.io.IOException;
import java.util.Date;
/**
* 接收消息
*
* @author: Coke
* @DateTime: 2023/09/16/16:33
**/
@Component
@Slf4j
public class ReceiverMsg {
@RabbitListener (queues = {"queue.normal.4"})
public void receiverMsg (Message message, Channel channel) throws IOException {
// 获取消息属性
MessageProperties properties = message.getMessageProperties();
// 获取消息的唯一标识(类似身份证号)
long tag = properties.getDeliveryTag();
try {
byte[] body = message.getBody();
String msg = new String(body);
log.info("接收到消息{},时间为:{}", msg, new Date());
int i= 1/0;
// 消费者手动确认 tag: 消息唯一标识 false: 只确认当前消息(true:批量确认)
channel.basicAck(tag,false);
}catch (Exception e){
e.printStackTrace();
// tag: 消息唯一标识 false: 只确认当前消息 false: 不重新入队
channel.basicNack(tag,false,false);
}
}
}
⑦:效果
5. 消费者拒绝消息
-
只需要将接收消费中的channel.basicNack(tag,false,false);改为channel.basicReject(tag, false);
-
可参考:4. 消费者拒绝消息不进行重新投递
五、RabbitMQ延迟队列
1. 采用消息中间件(rabbitmq)
问题:先发送的消息,延时时间长,会影响到后面的消息
①:引入依赖和配置
org.springframework.boot
spring-boot-starter-amqp
server:
port: 8093
spring:
rabbitmq:
host: 192.168.100.134
port: 5672
username: Coke
password: 123456
rabbit:
exchangeName: exchange.delay.1 # 既是正常交换机 又是 死信交换机
queueNormalName: queue.delay.normal.1
queueDlxName: queue.delay.dlx.1
②:添加配置类
package com.it.rabbit.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* TODO
*
* @author: Coke
* @DateTime: 2023/08/20/19:15
**/
@Configuration
public class RabbitConfig {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Value ("${rabbit.queueNormalName}")
private String queueNormalName;
@Value ("${rabbit.queueDlxName}")
private String queueDlxName;
// 定义交换机
@Bean
public TopicExchange topicExchange () {
return ExchangeBuilder
.topicExchange(exchangeName)
.build();
}
// 定义正常队列
@Bean
public Queue queueNormal () {
return QueueBuilder
.durable(queueNormalName) // 队列名称
.ttl(25000) // 队列过期时间
.deadLetterExchange(exchangeName) // 设置死信交换机
.deadLetterRoutingKey("error") // 设置死信路由key
.build();
}
// 定义死信队列
@Bean
public Queue queueDlx () {
return QueueBuilder.durable(queueDlxName).build();
}
// 绑定正常队列与交换机
@Bean
public Binding bindingNormal (Queue queueNormal, TopicExchange topicExchange) {
return BindingBuilder.bind(queueNormal).to(topicExchange).with("order");
}
// 绑定正常死信与交换机
@Bean
public Binding bindingDlx (Queue queueDlx, TopicExchange topicExchange) {
return BindingBuilder.bind(queueDlx).to(topicExchange).with("error");
}
}
③:发送消息
package com.it.rabbit.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.Date;
/**
* TODO
*
* @author: Coke
* @DateTime: 2023/08/20/19:25
**/
@Service
@Slf4j
public class MessageService {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(){
Message message = MessageBuilder.withBody("这是一个延时队列的消息".getBytes(StandardCharsets.UTF_8)).build();
// 参数 (交换机 路由key 消息体)
rabbitTemplate.convertAndSend(exchangeName, "order", message);
log.info("消息发送成!{}", new Date());
}
}
④:配置启动类发送消息
@SpringBootApplication
public class Rabbit03TopicApplication implements ApplicationRunner {
@Autowired
private MessageService messageService;
public static void main (String[] args) {
SpringApplication.run(Rabbit03TopicApplication.class, args);
}
@Override
public void run (ApplicationArguments args) throws Exception {
messageService.sendMsg();
}
}
⑤:效果
- 正常队列中有一条消息
- 25秒后消息进入到死信队列中
2. 使用插件(rabbitmq-delayed-message-exchange)
①:下载插件
1. 官网:github.com/rabbitmq/ra…
②:安装插件
1. 将插件复制到linux中rabbitmq的/plugins目录下
2. 解压
unzip rabbitmq_delayed_message_exchange-3.9.0.ez
- 解压后
③:启动插件
1. 可以先使用命令列出所有的插件名字
rabbitmq_delayed_message_exchange
2.启动插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
** 3.可以看到多了一种交换机类型**
④: 引入依赖与配置
org.springframework.boot
spring-boot-starter-amqp
server:
port: 8094
spring:
rabbitmq:
host: 192.168.100.134
port: 5672
username: Coke
password: 123456
rabbit:
exchangeName: exchange.delay.2 # 正常交换机
queueDelayName: queue.delay.2 # 正常队列
⑤:添加配置类
@Configuration
public class RabbitConfig {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Value ("${rabbit.queueDelayName}")
private String queueDelayName;
// 定义交换机 (自定义交换机)
@Bean
public CustomExchange customExchange () {
HashMap arguments = new HashMap();
arguments.put("x-delayed-type", "direct"); // 放一个参数
return new CustomExchange(exchangeName, "x-delayed-message", true, false, arguments);
}
// 定义正常队列
@Bean
public Queue queueNormal () {
return QueueBuilder
.durable(queueDelayName) // 队列名称
.build();
}
// 绑定正常队列与交换机
@Bean
public Binding bindingNormal (Queue queueNormal, CustomExchange customExchange) {
// 绑定 需要指定路由key
return BindingBuilder.bind(queueNormal).to(customExchange).with("plugin").noargs();
}
}
⑥:发送消息
@Service
@Slf4j
public class MessageService {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg () {
{
MessageProperties properties = new MessageProperties();
properties.setHeader("x-delay", 25000); // 延时25秒
Message message = MessageBuilder.withBody("这是一个延时25秒的消息消息".getBytes(StandardCharsets.UTF_8)).andProperties(properties).build();
// 参数 (交换机 路由key 消息体)
rabbitTemplate.convertAndSend(exchangeName, "plugin", message);
log.info("延时25秒消息发送成!{}", new Date());
}
{
MessageProperties properties = new MessageProperties();
properties.setHeader("x-delay", 15000); // 延时15秒
Message message = MessageBuilder.withBody("这是一个延时15秒的消息消息".getBytes(StandardCharsets.UTF_8)).andProperties(properties).build();
// 参数 (交换机 路由key 消息体)
rabbitTemplate.convertAndSend(exchangeName, "plugin", message);
log.info("延时15秒消息发送成!{}", new Date());
}
}
}
⑦:接收消息
@Component
@Slf4j
public class ReceiveMessage {
@RabbitListener (queues = "queue.delay.2")
public void receiveMessage (Message message) {
byte[] body = message.getBody();
String msg = new String(body);
log.info("接收到消息:{}, 时间为:{}", msg, new Date());
}
}
⑧:可以看到消息延时时间长的并没有影响到消息延时时间短的
六、消息可靠性(消息到达交换机)
1. confirm确认模式
①:配置文件(开启确认模式)
org.springframework.boot
spring-boot-starter-amqp
server:
port: 8097
spring:
rabbitmq:
host: 192.168.100.134
port: 5672
username: Coke
password: 123456
publisher-confirm-type: correlated # 开启生产者的消息确认模式
rabbit:
exchangeName: exchange.confirm.1
queueName: queue.confirm.1
②:添加配置类
@Configuration
public class RabbitConfig {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Value ("${rabbit.queueName}")
private String queueName;
// 定义交换机
@Bean
public TopicExchange topicExchange(){
return ExchangeBuilder.topicExchange(exchangeName).build();
}
// 定义队列
@Bean
public Queue queue(){
return QueueBuilder.durable(queueName).build();
}
// 绑定队列与交换机
@Bean
public Binding binding1(Queue queue, TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("info");
}
}
@Component
@Slf4j
public class MyConfirmCallBack implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm (CorrelationData correlationData, boolean ack, String cause) {
if (ack){
log.info("消息正确的达到交换机!");
return;
}
log.error("消息没有到达交换机,原因为:{}", cause);
}
}
③:发送消息
@Service
@Slf4j
public class MessageService {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MyConfirmCallBack myConfirmCallBack;
@PostConstruct // 该注解会在构造方法执行后执行,相当于初始化作用
public void init(){
rabbitTemplate.setConfirmCallback(myConfirmCallBack);
}
public void sendMsg(){
Message message = MessageBuilder.withBody("这是一个带有过期时间的消息".getBytes(StandardCharsets.UTF_8)).build();
CorrelationData correlationData = new CorrelationData();
correlationData.setId("id_110"); // 设置消息的关联id
// 参数 (交换机 路由key 消息体 相关生产者确认的数据)
rabbitTemplate.convertAndSend(exchangeName, "info", message,correlationData);
log.info("消息发送成!{}", new Date());
}
}
④:测试
1.发送消息时 故意将交换机名字写错 模拟错误
2.正常测试
2.confirm确认模式(lambda表达式写法)
MessageService(发送消息类)
@Service
@Slf4j
public class MessageService {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Autowired
private RabbitTemplate rabbitTemplate;
/* @PostConstruct // 该注解会在构造方法执行后执行,相当于初始化作用
public void init(){
rabbitTemplate.setConfirmCallback(
new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm (CorrelationData correlationData, boolean ack, String cause) {
if (ack){
log.info("消息正确的达到交换机!");
return;
}
log.error("消息没有到达交换机,原因为:{}", cause);
}
}
);
}*/
@PostConstruct // 该注解会在构造方法执行后执行,相当于初始化作用
public void init () {
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息正确的达到交换机!");
return;
}
log.error("消息没有到达交换机,原因为:{}", cause);
});
}
public void sendMsg () {
Message message = MessageBuilder.withBody("这是一个带有过期时间的消息".getBytes(StandardCharsets.UTF_8)).build();
CorrelationData correlationData = new CorrelationData();
correlationData.setId("id_110"); // 设置消息的关联id
// 参数 (交换机 路由key 消息体 相关生产者确认的数据)
rabbitTemplate.convertAndSend(exchangeName, "info", message, correlationData);
log.info("消息发送成!{}", new Date());
}
}
七、消息可靠性(交换机到队列)
1. Return模式
①:配置文件(开启确认模式)
org.springframework.boot
spring-boot-starter-amqp
server:
port: 8098
spring:
rabbitmq:
host: 192.168.100.134
port: 5672
username: Coke
password: 123456
publisher-returns: true # 开启return模式
rabbit:
exchangeName: exchange.return.1
queueName: queue.return.1
②:添加配置类
@Configuration
public class RabbitConfig {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Value ("${rabbit.queueName}")
private String queueName;
// 定义交换机
@Bean
public TopicExchange topicExchange(){
return ExchangeBuilder.topicExchange(exchangeName).build();
}
// 定义队列
@Bean
public Queue queue(){
return QueueBuilder.durable(queueName).build();
}
// 绑定队列与交换机
@Bean
public Binding binding1(Queue queue, TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("info");
}
}
@Component
@Slf4j
public class MyReturnCallBack implements RabbitTemplate.ReturnsCallback {
// 只有消息从交换机没有正确到达队列时才会回调
@Override
public void returnedMessage (ReturnedMessage returned) {
log.error("消息从交换机未能正确到达队列,原因为:{}", returned.getReplyText());
}
}
③:发送消息
@Service
@Slf4j
public class MessageService {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MyReturnCallBack myReturnCallBack;
@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(myReturnCallBack);
}
public void sendMsg(){
Message message = MessageBuilder.withBody("这是一个消息".getBytes(StandardCharsets.UTF_8)).build();
// 参数 (交换机 路由key 消息体)
rabbitTemplate.convertAndSend(exchangeName, "info11", message);
log.info("消息发送成!{}", new Date());
}
}
④:测试
1.发送消息时 故意将路由key写错 模拟错误
2.正常测试(消息发送成功,没有报错)
2. Return模式(lambda表达式写法)
MessageService(发送消息类)
@Service
@Slf4j
public class MessageService {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init () {
rabbitTemplate.setReturnsCallback(
returned -> log.error("消息从交换机未能正确到达队列,原因为:{}", returned.getReplyText())
);
}
public void sendMsg () {
Message message = MessageBuilder.withBody("这是一个消息".getBytes(StandardCharsets.UTF_8)).build();
// 参数 (交换机 路由key 消息体)
rabbitTemplate.convertAndSend(exchangeName, "info", message);
log.info("消息发送成!{}", new Date());
}
}
3. 交换机属性
// 定义交换机
@Bean
public TopicExchange topicExchange(){
return ExchangeBuilder.
topicExchange(exchangeName)
.durable(false) // isDurable – 持久标志(默认为 true)
.autoDelete() // 设置自动删除 (默认不是自动删除)
.build();
}
八、备用交换机
1. 实现
①:引入依赖与配置
org.springframework.boot
spring-boot-starter-amqp
server:
port: 8110
spring:
rabbitmq:
host: 192.168.100.134
port: 5672
username: Coke
password: 123456
rabbit:
exchangeName: exchange.alternate.normal.1 # 正常交换机
exchangeAlternateName: exchange.alternate.alternate.1 # 备用交换机
queueName: queue.alternate.normal.1 # 正常队列
queueAlternateName: queue.alternate.alternate.1 # 备用队列
②:添加配置类
@Configuration
public class RabbitConfig {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Value ("${rabbit.queueName}")
private String queueName;
@Value ("${rabbit.exchangeAlternateName}")
private String exchangeAlternateName;
@Value ("${rabbit.queueAlternateName}")
private String queueAlternateName;
// 正常交换机
@Bean
public TopicExchange topicExchange(){
return ExchangeBuilder.
topicExchange(exchangeName)
.alternate(exchangeAlternateName) // 设置备用交换机
.build();
}
// 正常队列
@Bean
public Queue queue(){
return QueueBuilder.durable(queueName).build();
}
// 绑定正常队列与交换机
@Bean
public Binding binding1(Queue queue, TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("info");
}
// 备用交换机
@Bean
public FanoutExchange fanoutExchange(){
return ExchangeBuilder.fanoutExchange(exchangeAlternateName).build();
}
// 备用队列
@Bean
public Queue queueAlternate(){
return QueueBuilder.durable(queueAlternateName).build();
}
// 绑定备用交换机与队列
@Bean
public Binding binding(Queue queueAlternate, FanoutExchange fanoutExchange){
return BindingBuilder.bind(queueAlternate).to(fanoutExchange);
}
}
③:发送消息
@Service
@Slf4j
public class MessageService {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(){
Message message = MessageBuilder.withBody("这是一个消息".getBytes(StandardCharsets.UTF_8)).build();
// 参数 (交换机 路由key 消息体)
rabbitTemplate.convertAndSend(exchangeName, "info", message);
log.info("消息发送成!{}", new Date());
}
}
④:测试效果
1. 正常测试(指定正确路由key)
2.非正常测试(指定错误的路由key)通过备用交换机进入备用队列中