RabbitMQ消息中间件

一、安装 RabbitMQ

①:安装

安装笔记:juejin.cn/post/726153…

②:RabbitMq相关命令

1. 创建用户

sudo rabbitmqctl add_user 用户名 密码
sudo rabbitmqctl add_user Coke 123456

image.png

2. 查看所有用户

sudo rabbitmqctl list_users

image.png

3. 设置用户角色

sudo rabbitmqctl set_user_tags Coke administrator

image.png

4. 设置用户权限

sudo rabbitmqctl set_permissions Coke 读 写 配置
sudo rabbitmqctl set_permissions Coke ".*" ".*" ".*"

image.png

5. 查看用户权限

sudo rabbitmqctl list_permissions

image.png

③:RabbitMQ工作模型

image.png

broker相当于mysq1服务器,virtual host相当于数据库(可以有多个数据库)
queue相当于表,消息相当于记录。

消息队列有三个核心要素:消息生产者、消息队列、消息消费者;

  • 生产者(Producer):发送消息的应用;(java程序,也可能是别的语言写的程序)

  • 消费者(Consumer):接收消息的应用;(java程序,也可能是别的语言写的程序)

  • 代理(Broker):就是消息服务器,RabbitMQ Server就是Message Broker;

  • 连接(Connection):连接RabbitMQ服务器的TCP长连接;

  • 信道(Channel):连接中的一个虚拟通道,消息队列发送或者接收消息时,都是通过信道
    进行的;

虚拟主机(Virtual host): 一个虚拟分组,在代码中就是一个字符串,当多个不同的用
户使用同一个RabbitMQ服务时,可以划分出多个Virtual host,每个用户在自己的Virtual
host创建exchange/queue等;(分类比较清晰、相互隔离)

交换机(Exchange): 交换机负责从生产者接收消息,并根据交换机类型分发到对应的消
息队列中,起到一个路由的作用;

路由键(Routing Key): 交换机根据路由键来决定消息分发到哪个队列,路由键是消息
的目的地址;

绑定(Binding): 绑定是队列和交换机的一个关联连接(关联关系);

队列(Queue): 存储消息的缓存;

消息(Message): 由生产者通过RabbitMQ发送给消费者的信息;(消息可以任何数据,
字符串、user对象,json串等等)

二、RabbitMQ -- 交换机

1:交换机类型

1、Fanout Exchange(扇形)

2、Direct Exchange(直连)

3、Topic Exchange(主题)

4、Headers Exchange(头部)

2:扇形交换机(Fanout)

Fanout扇形的,散开的;
扇形交换机投递到所有绑定的队列,不需要路由键,不需要进行路由键的匹配,相当于广播、群发;

image.png

①:导入RabbitMQ依赖

org.springframework.boot
spring-boot-starter-amqp

②:配置连接信息

server:
port: 8801
spring:
rabbitmq:
host: 192.168.100.134
port: 5672
username: Coke
password: 123456

③:添加配置类

定义交换机、队列以及绑定队列与交换机

image.png

@Configuration
public class RabbitConfig {
// 1. 定义交换机
// 2. 定义队列
// 3. 绑定队列
public static String FANOUT_EXCHANGE_NAME = "fanout_exchange_1";
public static String QUEUE_NAME_1 = "queue_name_1";
public static String QUEUE_NAME_2 = "queue_name_2";
/**
* 定于交换机
* @DateTime: 2023/8/6 22:48
*
* @return FanoutExchange
* @author: Coke
*/
@Bean
public FanoutExchange fanoutExchange(){
return ExchangeBuilder.fanoutExchange(FANOUT_EXCHANGE_NAME).build();
}
/**
* 定义队列1
* @DateTime: 2023/8/6 22:50
*
* @return Queue
* @author: Coke
*/
@Bean
public Queue queue1(){
return QueueBuilder.durable(QUEUE_NAME_1).build();
}
/**
* 定义队列2
* @DateTime: 2023/8/6 22:51
*
* @return Queue
* @author: Coke
*/
@Bean
public Queue queue2(){
return QueueBuilder.durable(QUEUE_NAME_2).build();
}
/**
* 交换机 与 队列1 绑定
* @DateTime: 2023/8/6 22:52
*
* @param fanoutExchange:
* @param queue1:
* @return Binding
* @author: Coke
*/
@Bean
public Binding binding1(FanoutExchange fanoutExchange, Queue queue1){
return BindingBuilder.bind(queue1).to(fanoutExchange);
}
/**
* 交换机 与 队列2 绑定
* @DateTime: 2023/8/6 22:52
*
* @param fanoutExchange:
* @param queue2:
* @return Binding
* @author: Coke
*/
@Bean
public Binding binding2(FanoutExchange fanoutExchange, Queue queue2){
return BindingBuilder.bind(queue2).to(fanoutExchange);
}
}

④:发送消息

RabbitMQ消息中间件-每日运维

@Component
@Slf4j
public class SendMessage {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(){
// 定义发送消息
Message message = new Message("这是扇形交换机发送的1条消息".getBytes());
rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE_NAME,"",message);
log.info("消息发送成功!{}",new Date());
}
}

⑤:配置启动程序后 发送消息

@SpringBootApplication
public class Rabbit01FanoutApplication implements ApplicationRunner {
@Autowired
private SendMessage sendMessage;
public static void main (String[] args) {
SpringApplication.run(Rabbit01FanoutApplication.class, args);
}
/**
* 程序启动时发送消息(程序一启动就会运行该方法)
* @DateTime: 2023/8/6 22:59
*
* @param args:
* @return void
* @author: Coke
*/
@Override
public void run (ApplicationArguments args) throws Exception {
sendMessage.sendMessage();
}
}

启动程序(消息发送成功)

image.png

每个队列中都有一条消息

image.png

⑥:接收消息

这里新建一个模块用来接收消息(一般情况生产者和消费者都是分开的)

RabbitMQ消息中间件-每日运维

01. 引入依赖和配置

引入依赖和配置和发送端一样的(改一下端口号即可)

02. 接收消息

创建ReceiveMessage接收消息的类

@Slf4j
public class ReceiveMessage {
public static final String QUEUE_NAME_1 = "queue_name_1";
public static final String QUEUE_NAME_2 = "queue_name_2";
/**
* 接收消息
* @DateTime: 2023/8/13 15:53
*
* @param message:
* @return void
* @author: Coke
*/
// queues = {QUEUE_NAME_1, QUEUE_NAME_2} 指定需要接收哪个队列中的消息(可以是多个)
@RabbitListener (queues = {QUEUE_NAME_1, QUEUE_NAME_2})
public void receiveMessage (Message message) {
byte[] body = message.getBody();
String msg = new String(body);
log.info("{} 接收到一条消息:{}", new Date(), msg);
}
}

启动项目即可

  • 接收到了两条消息

image.png

  • 队列中已经没有消息了

image.png

3:直连交换机(Direct)

RabbitMQ消息中间件-每日运维

RabbitMQ消息中间件-每日运维

①:引入依赖和配置

引入依赖和配置和扇形交换机一样的(改一下端口号即可)

②:添加配置类

定义交换机、队列以及绑定队列与交换机

@Configuration
public class RabbitConfig {
// 交换机和对队列名
public static String DIRECT_EXCHANGE_NAME = "direct_exchange_1";
public static String DIRECT_QUEUE_1 = "direct_queue_1";
public static String DIRECT_QUEUE_2 = "direct_queue_2";
/**
* 定义直连交换机
* @DateTime: 2023/8/13 21:22
*
* @return DirectExchange
* @author: Coke
*/
@Bean
public DirectExchange directExchange(){
return ExchangeBuilder.directExchange(DIRECT_EXCHANGE_NAME).build();
}
/**
* 定义队列1
* @DateTime: 2023/8/13 21:23
*
* @return Queue
* @author: Coke
*/
@Bean
public Queue queue1(){
return QueueBuilder.durable(DIRECT_QUEUE_1).build();
}
/**
* 定义队列2
* @DateTime: 2023/8/13 21:23
*
* @return Queue
* @author: Coke
*/
@Bean
public Queue queue2(){
return QueueBuilder.durable(DIRECT_QUEUE_2).build();
}
// 绑定交换机与队列1 key= "error"
@Bean
public Binding bindingQueue1(DirectExchange directExchange, Queue queue1){
return BindingBuilder.bind(queue1).to(directExchange).with("error");
}
// 绑定交换机与队列2 key= "info"
@Bean
public Binding bindingQueue2(DirectExchange directExchange, Queue queue2){
return BindingBuilder.bind(queue2).to(directExchange).with("info");
}
// 绑定交换机与队列2 key= "error"
@Bean
public Binding bindingQueue3(DirectExchange directExchange, Queue queue2){
return BindingBuilder.bind(queue2).to(directExchange).with("error");
}
// 绑定交换机与队列2 key= "warning"
@Bean
public Binding bindingQueue4(DirectExchange directExchange, Queue queue2){
return BindingBuilder.bind(queue2).to(directExchange).with("warning");
}
}

③:发送消息

RabbitMQ消息中间件-每日运维

@Slf4j
@Component
public class SendMessage {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(){
// 创建发送的消息体
String msg = "这是直连交换机的一条消息";
Message message = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8)).build();
// 发送
// convertAndSend(String exchange, String routingKey, Object message)
rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE_NAME, "error", message);
log.info("消息发送成功:{}", new Date());
}
}

④:配置启动程序后 发送消息

@SpringBootApplication
public class Rabbit02DirectApplication implements ApplicationRunner {
@Autowired
private SendMessage sendMessage;
public static void main (String[] args) {
SpringApplication.run(Rabbit02DirectApplication.class, args);
}
@Override
public void run (ApplicationArguments args) throws Exception {
sendMessage.sendMsg();
}
}

消息发送成功,配置的key="error" 所以两个队列都可以接收到消息"

RabbitMQ消息中间件-每日运维

RabbitMQ消息中间件-每日运维

⑤:改变key="info"再次测试

RabbitMQ消息中间件-每日运维

消息发送成功,配置的key="info" 所以只有队列2可以接收到消息"
RabbitMQ消息中间件-每日运维

RabbitMQ消息中间件-每日运维

4:主题交换机(Topic)

RabbitMQ消息中间件-每日运维

image.png

①:引入依赖和配置

引入依赖和配置

org.springframework.boot
spring-boot-starter-amqp
server:
port: 8084
spring:
rabbitmq:
host: 192.168.100.134
port: 5672
username: Coke
password: 123456
rabbit:
exchangeName: exchange.topic
queueName1: queue.topic.1
queueName2: queue.topic.2

RabbitMQ消息中间件-每日运维

②:添加配置类

@Configuration
public class RabbitConfig {
@Value ("rabbit.exchangeName")
private String exchangeName;
@Value ("${rabbit.queueName1}")
private String queueName1;
@Value ("${rabbit.queueName2}")
private String queueName2;
// 定义交换机
@Bean
public TopicExchange topicExchange(){
return ExchangeBuilder.topicExchange(exchangeName).build();
}
// 定义队列1
@Bean
public Queue queue1(){
return QueueBuilder.durable(queueName1).build();
}
// 定义队列2
@Bean
public Queue queue2(){
return QueueBuilder.durable(queueName2).build();
}
// 绑定队列与交换机
@Bean
public Binding binding1(Queue queue1, TopicExchange topicExchange){
return BindingBuilder.bind(queue1).to(topicExchange).with("*.orange.*");
}
@Bean
public Binding binding2(Queue queue2, TopicExchange topicExchange){
return BindingBuilder.bind(queue2).to(topicExchange).with("*.*.rabbit");
}
@Bean
public Binding binding3(Queue queue2, TopicExchange topicExchange){
return BindingBuilder.bind(queue2).to(topicExchange).with("lazy.#");
}
}

③:发送消息

@Service
@Slf4j
public class MessageService {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(){
Message message = MessageBuilder.withBody("主题交换机".getBytes(StandardCharsets.UTF_8)).build();
// 参数 (交换机 路由key 消息体)
rabbitTemplate.convertAndSend(exchangeName, "lazy.orange.rabbit", message);
log.info("消息发送成!");
}
}

④:配置启动程序后 发送消息

@SpringBootApplication
public class Rabbit03TopicApplication implements ApplicationRunner {
@Autowired
private MessageService messageService;
public static void main (String[] args) {
SpringApplication.run(Rabbit03TopicApplication.class, args);
}
@Override
public void run (ApplicationArguments args) throws Exception {
messageService.sendMsg();
}
}

消息发送成功,配置的key="lazy.orange.rabbit" 所以两个队列都可以接收到消息"

RabbitMQ消息中间件-每日运维

RabbitMQ消息中间件-每日运维

5:头部交换机(Headers)

RabbitMQ消息中间件-每日运维

①:引入依赖和配置

引入依赖和配置

org.springframework.boot
spring-boot-starter-amqp
server:
port: 8085
spring:
rabbitmq:
host: 192.168.100.134
port: 5672
username: Coke
password: 123456
rabbit:
exchangeName: exchange.headers
queueName1: queue.headers.1
queueName2: queue.headers.2

RabbitMQ消息中间件-每日运维

②:添加配置类

@Configuration
public class RabbitConfig {
@Value("${rabbit.exchangeName}")
private String exchangeName;
@Value("${rabbit.queueName1}")
private String queueName1;
@Value("${rabbit.queueName2}")
private String queueName2;
@Bean
public HeadersExchange headersExchange(){
return ExchangeBuilder.headersExchange(exchangeName).build();
}
@Bean
public Queue queue1(){
return QueueBuilder.durable(queueName1).build();
}
@Bean
public Queue queue2(){
return QueueBuilder.durable(queueName2).build();
}
@Bean
public Binding bindingA(HeadersExchange headersExchange, Queue queue1){
HashMap map = new HashMap();
map.put("type", "m");
map.put("status",1);
return BindingBuilder.bind(queue1).to(headersExchange).whereAll(map).match();
}
@Bean
public Binding bindingB(HeadersExchange headersExchange, Queue queue2){
HashMap map = new HashMap();
map.put("type", "s");
map.put("status",0);
return BindingBuilder.bind(queue2).to(headersExchange).whereAll(map).match();
}
}

③:发送消息

@Service
public class MessageService {
@Value("${rabbit.exchangeName}")
private String exchangeName;
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg () {
// 设置消息头属性与队列 2 一样
MessageProperties properties = new MessageProperties();
properties.setHeader("type", "s");
properties.setHeader("status", 0);
Message message = MessageBuilder.withBody("这是头部交换机的一个消息".getBytes())
.andProperties(properties).build();
// 头部交换机 可以不设置路由key
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
}

④:配置启动程序后 发送消息

@SpringBootApplication
@Slf4j
public class Rabbit04HeadersApplication implements ApplicationRunner {
@Autowired
private MessageService messageService;
public static void main (String[] args) {
SpringApplication.run(Rabbit04HeadersApplication.class, args);
}
@Override
public void run (ApplicationArguments args) throws Exception {
messageService.sendMsg();
log.info("消息发送成功!{}", new Date());
}
}

⑤:效果

只有队列2收到了消息

RabbitMQ消息中间件-每日运维

三、RabbitMQ过期消息

1. 设置单条消息的过期时间

①:引入依赖和配置

引入依赖和配置

org.springframework.boot
spring-boot-starter-amqp
server:
port: 8086
spring:
rabbitmq:
host: 192.168.100.134
port: 5672
username: Coke
password: 123456
rabbit:
exchangeName: exchange.ttl1
queueName: queue.ttl.1

image.png

②:添加配置类

@Configuration
public class RabbitConfig {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Value ("${rabbit.queueName}")
private String queueName;
// 定义交换机
@Bean
public TopicExchange topicExchange(){
return ExchangeBuilder.topicExchange(exchangeName).build();
}
// 定义队列
@Bean
public Queue queue(){
return QueueBuilder.durable(queueName).build();
}
// 绑定队列与交换机
@Bean
public Binding binding1(Queue queue, TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("info");
}
}

③:发送消息

@Service
@Slf4j
public class MessageService {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(){
MessageProperties properties = new MessageProperties();
// 设置消息过期时间为20秒
properties.setExpiration("20000");
Message message = MessageBuilder.withBody("这是一个带有过期时间的消息".getBytes(StandardCharsets.UTF_8))
.andProperties(properties).build();
// 参数 (交换机 路由key 消息体)
rabbitTemplate.convertAndSend(exchangeName, "info", message);
log.info("消息发送成!{}", new Date());
}
}

④:配置启动程序后发送消息

@SpringBootApplication
public class Rabbit03TopicApplication implements ApplicationRunner {
@Autowired
private MessageService messageService;
public static void main (String[] args) {
SpringApplication.run(Rabbit03TopicApplication.class, args);
}
@Override
public void run (ApplicationArguments args) throws Exception {
messageService.sendMsg();
}
}

⑤:效果

RabbitMQ消息中间件-每日运维

1.可以看到队列中有一条消息

RabbitMQ消息中间件-每日运维

2. 查看消息的属性

RabbitMQ消息中间件-每日运维

3.时间到后如果消息没有被取出来就会自动删除

image.png

2. 通过队列属性设置消息的过期时间

①:引入依赖和配置

引入依赖和配置

org.springframework.boot
spring-boot-starter-amqp
server:
port: 8087
spring:
rabbitmq:
host: 192.168.100.134
port: 5672
username: Coke
password: 123456
rabbit:
exchangeName: exchange.ttl2
queueName: queue.ttl.2

image.png

②:添加配置类

@Configuration
public class RabbitConfig {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Value ("${rabbit.queueName}")
private String queueName;
// 定义交换机
@Bean
public TopicExchange topicExchange(){
return ExchangeBuilder.topicExchange(exchangeName).build();
}
// 定义队列
@Bean
public Queue queue(){
return QueueBuilder.durable(queueName).ttl(20000).build();
}
// 绑定队列与交换机
@Bean
public Binding binding1(Queue queue, TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("info");
}
}

③:发送消息

@Service
@Slf4j
public class MessageService {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(){
MessageProperties properties = new MessageProperties();
// 设置消息过期时间为20秒
properties.setExpiration("20000");
Message message = MessageBuilder.withBody("这是一个带有过期时间的消息".getBytes(StandardCharsets.UTF_8))
.andProperties(properties).build();
// 参数 (交换机 路由key 消息体)
rabbitTemplate.convertAndSend(exchangeName, "info", message);
log.info("消息发送成!{}", new Date());
}
}

④:配置启动程序后发送消息

@SpringBootApplication
public class Rabbit03TopicApplication implements ApplicationRunner {
@Autowired
private MessageService messageService;
public static void main (String[] args) {
SpringApplication.run(Rabbit03TopicApplication.class, args);
}
@Override
public void run (ApplicationArguments args) throws Exception {
messageService.sendMsg();
}
}

⑤:效果

image.png

1.可以看到队列中有一条消息

RabbitMQ消息中间件-每日运维

2. 查看队列和消息的属性

RabbitMQ消息中间件-每日运维

RabbitMQ消息中间件-每日运维

3.时间到后如果消息没有被取出来就会自动删除

RabbitMQ消息中间件-每日运维

四、RabbitMQ死信队列

image.png

1. 队列过期

①:引入依赖和配置

引入依赖和配置

org.springframework.boot
spring-boot-starter-amqp
server:
port: 8089
spring:
rabbitmq:
host: 192.168.100.134
port: 5672
username: Coke
password: 123456
rabbit:
exchangeNormalName: exchange.normal.1 # 正常交换机
queueNormalName: queue.normal.1 # 正常队列
exchangeDlxName: exchange.dlx.1 # 死信交换机
queueDlxName: queue.dlx.1 # 死信队列

image.png

②:添加配置类

@Configuration
public class RabbitConfig {
@Value ("${rabbit.exchangeNormalName}")
private String exchangeNormalName;
@Value ("${rabbit.queueNormalName}")
private String queueNormalName;
@Value ("${rabbit.exchangeDlxName}")
private String exchangeDlxName;
@Value ("${rabbit.queueDlxName}")
private String queueDlxName;
// 定义交换机
@Bean
public DirectExchange topicNormalExchange(){
return ExchangeBuilder.directExchange(exchangeNormalName).build();
}
@Bean
public DirectExchange topicDlxExchange(){
return ExchangeBuilder.directExchange(exchangeDlxName).build();
}
// 定义队列
@Bean
public Queue queueNormal(){
HashMap map = new HashMap();
map.put("x-message-ttl",20000); // 设置队列的过期时间 (20秒)
map.put("x-dead-letter-exchange",exchangeDlxName); // 设置 队列的死信交换机(消息过期后进入死信交换机)
map.put("x-dead-letter-routing-key","error"); // 设置死信路由key
return QueueBuilder.durable(queueNormalName).withArguments(map).build();
}
@Bean
public Queue queueDlx(){
return QueueBuilder.durable(queueDlxName).build();
}
// 绑定队列与交换机
@Bean
public Binding binding1(Queue queueNormal, DirectExchange topicNormalExchange){
return BindingBuilder.bind(queueNormal).to(topicNormalExchange).with("order");
}
@Bean
public Binding binding2(Queue queueDlx, DirectExchange topicDlxExchange){
return BindingBuilder.bind(queueDlx).to(topicDlxExchange).with("error");
}
}

③:发送消息

@Service
@Slf4j
public class MessageService {
@Value ("${rabbit.exchangeNormalName}")
private String exchangeNormalName;
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(){
Message message = MessageBuilder.withBody("这是一个带有过期时间的死信消息".getBytes(StandardCharsets.UTF_8)).build();
// 参数 (交换机 路由key 消息体)
rabbitTemplate.convertAndSend(exchangeNormalName, "order", message);
log.info("消息发送成!{}", new Date());
}
}

④:配置启动程序后发送消息

@SpringBootApplication
public class Rabbit03TopicApplication implements ApplicationRunner {
@Autowired
private MessageService messageService;
public static void main (String[] args) {
SpringApplication.run(Rabbit03TopicApplication.class, args);
}
@Override
public void run (ApplicationArguments args) throws Exception {
messageService.sendMsg();
}
}

⑤:效果

  • 1. 消息发送成功

RabbitMQ消息中间件-每日运维

  • 2. 消息进入到正常队列中

RabbitMQ消息中间件-每日运维

  • 3. 20秒后消息进入到死信队列中

image.png

2. 消息过期

①:引入依赖和配置

引入依赖和配置

org.springframework.boot
spring-boot-starter-amqp
server:
port: 8090
spring:
rabbitmq:
host: 192.168.100.134
port: 5672
username: Coke
password: 123456
rabbit:
exchangeNormalName: exchange.normal.2 # 正常交换机
queueNormalName: queue.normal.2 # 正常队列
exchangeDlxName: exchange.dlx.2 # 死信交换机
queueDlxName: queue.dlx.2 # 死信队列

image.png

②:添加配置类

@Configuration
public class RabbitConfig {
@Value ("${rabbit.exchangeNormalName}")
private String exchangeNormalName;
@Value ("${rabbit.queueNormalName}")
private String queueNormalName;
@Value ("${rabbit.exchangeDlxName}")
private String exchangeDlxName;
@Value ("${rabbit.queueDlxName}")
private String queueDlxName;
// 定义交换机
@Bean
public DirectExchange topicNormalExchange(){
return ExchangeBuilder.directExchange(exchangeNormalName).build();
}
@Bean
public DirectExchange topicDlxExchange(){
return ExchangeBuilder.directExchange(exchangeDlxName).build();
}
// 定义队列
@Bean
public Queue queueNormal(){
HashMap map = new HashMap();
map.put("x-dead-letter-exchange",exchangeDlxName); // 设置 队列的死信交换机(消息过期后进入死信交换机)
map.put("x-dead-letter-routing-key","error"); // 设置死信路由key
return QueueBuilder.durable(queueNormalName).withArguments(map).build();
}
@Bean
public Queue queueDlx(){
return QueueBuilder.durable(queueDlxName).build();
}
// 绑定队列与交换机
@Bean
public Binding binding1(Queue queueNormal, DirectExchange topicNormalExchange){
return BindingBuilder.bind(queueNormal).to(topicNormalExchange).with("order");
}
@Bean
public Binding binding2(Queue queueDlx, DirectExchange topicDlxExchange){
return BindingBuilder.bind(queueDlx).to(topicDlxExchange).with("error");
}
}

③:发送消息

@Service
@Slf4j
public class MessageService {
@Value ("${rabbit.exchangeNormalName}")
private String exchangeNormalName;
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg () {
MessageProperties properties = new MessageProperties();
properties.setExpiration("20000"); // 设置消息过期时间为20秒
Message message = MessageBuilder.withBody("这是一个带有过期时间的死信消息".getBytes(StandardCharsets.UTF_8)).andProperties(properties).build();
// 参数 (交换机 路由key 消息体)
rabbitTemplate.convertAndSend(exchangeNormalName, "order", message);
log.info("消息发送成!{}", new Date());
}
}

④:配置启动程序后发送消息

@SpringBootApplication
public class Rabbit03TopicApplication implements ApplicationRunner {
@Autowired
private MessageService messageService;
public static void main (String[] args) {
SpringApplication.run(Rabbit03TopicApplication.class, args);
}
@Override
public void run (ApplicationArguments args) throws Exception {
messageService.sendMsg();
}
}

⑤:效果

  • 1. 消息发送成功

RabbitMQ消息中间件-每日运维

  • 2. 消息进入到正常队列中

image.png

  • 3. 20秒后消息进入到死信队列中

RabbitMQ消息中间件-每日运维

3. 队列达到最大长度

①:引入依赖和配置

引入依赖和配置

org.springframework.boot
spring-boot-starter-amqp
server:
port: 8090
spring:
rabbitmq:
host: 192.168.100.134
port: 5672
username: Coke
password: 123456
rabbit:
exchangeNormalName: exchange.normal.2 # 正常交换机
queueNormalName: queue.normal.2 # 正常队列
exchangeDlxName: exchange.dlx.2 # 死信交换机
queueDlxName: queue.dlx.2 # 死信队列

image.png

②:添加配置类

@Configuration
public class RabbitConfig {
@Value ("${rabbit.exchangeNormalName}")
private String exchangeNormalName;
@Value ("${rabbit.queueNormalName}")
private String queueNormalName;
@Value ("${rabbit.exchangeDlxName}")
private String exchangeDlxName;
@Value ("${rabbit.queueDlxName}")
private String queueDlxName;
// 定义交换机
@Bean
public DirectExchange topicNormalExchange(){
return ExchangeBuilder.directExchange(exchangeNormalName).build();
}
@Bean
public DirectExchange topicDlxExchange(){
return ExchangeBuilder.directExchange(exchangeDlxName).build();
}
// 定义队列
@Bean
public Queue queueNormal(){
HashMap map = new HashMap();
map.put("x-dead-letter-exchange",exchangeDlxName); // 设置 队列的死信交换机(消息过期后进入死信交换机)
map.put("x-dead-letter-routing-key","error"); // 设置死信路由key
return QueueBuilder.durable(queueNormalName).withArguments(map).build();
}
@Bean
public Queue queueDlx(){
return QueueBuilder.durable(queueDlxName).build();
}
// 绑定队列与交换机
@Bean
public Binding binding1(Queue queueNormal, DirectExchange topicNormalExchange){
return BindingBuilder.bind(queueNormal).to(topicNormalExchange).with("order");
}
@Bean
public Binding binding2(Queue queueDlx, DirectExchange topicDlxExchange){
return BindingBuilder.bind(queueDlx).to(topicDlxExchange).with("error");
}
}

③:发送消息

@Service
@Slf4j
public class MessageService {
@Value ("${rabbit.exchangeNormalName}")
private String exchangeNormalName;
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg () {
MessageProperties properties = new MessageProperties();
properties.setExpiration("20000"); // 设置消息过期时间为20秒
Message message = MessageBuilder.withBody("这是一个带有过期时间的死信消息".getBytes(StandardCharsets.UTF_8)).andProperties(properties).build();
// 参数 (交换机 路由key 消息体)
rabbitTemplate.convertAndSend(exchangeNormalName, "order", message);
log.info("消息发送成!{}", new Date());
}
}

④:配置启动程序后发送消息

@SpringBootApplication
public class Rabbit03TopicApplication implements ApplicationRunner {
@Autowired
private MessageService messageService;
public static void main (String[] args) {
SpringApplication.run(Rabbit03TopicApplication.class, args);
}
@Override
public void run (ApplicationArguments args) throws Exception {
messageService.sendMsg();
}
}

⑤:效果

  • 1. 消息发送成功

image.png

  • 2. 有5条消息保留在正常队列中 有3条消息保留在死信队列中

RabbitMQ消息中间件-每日运维

  • 3. 队列(先进先出)进入死信队列的3条消息是前3条

RabbitMQ消息中间件-每日运维

4. 消费者拒绝消息不进行重新投递

①:生产者配置

org.springframework.boot
spring-boot-starter-amqp
server:
port: 8092
spring:
rabbitmq:
host: 192.168.100.134
port: 5672
username: Coke
password: 123456
rabbit:
exchangeNormalName: exchange.normal.4 # 正常交换机
queueNormalName: queue.normal.4 # 正常队列
exchangeDlxName: exchange.dlx.4 # 私信交换机
queueDlxName: queue.dlx.4 # 私信队列

②:添加配置类

package com.it.rabbit.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
/**
* TODO
*
* @author: Coke
* @DateTime: 2023/08/20/19:15
**/
@Configuration
public class RabbitConfig {
@Value ("${rabbit.exchangeNormalName}")
private String exchangeNormalName;
@Value ("${rabbit.queueNormalName}")
private String queueNormalName;
@Value ("${rabbit.exchangeDlxName}")
private String exchangeDlxName;
@Value ("${rabbit.queueDlxName}")
private String queueDlxName;
// 定义交换机
@Bean
public DirectExchange topicNormalExchange(){
return ExchangeBuilder.directExchange(exchangeNormalName).build();
}
@Bean
public DirectExchange topicDlxExchange(){
return ExchangeBuilder.directExchange(exchangeDlxName).build();
}
// 定义队列
@Bean
public Queue queueNormal(){
HashMap map = new HashMap();
map.put("x-dead-letter-exchange",exchangeDlxName); // 设置 队列的死信交换机(消息过期后进入私信交换机)
map.put("x-dead-letter-routing-key","error"); // 设置死信路由key
return QueueBuilder.durable(queueNormalName).withArguments(map).build();
}
@Bean
public Queue queueDlx(){
return QueueBuilder.durable(queueDlxName).build();
}
// 绑定队列与交换机
@Bean
public Binding binding1(Queue queueNormal, DirectExchange topicNormalExchange){
return BindingBuilder.bind(queueNormal).to(topicNormalExchange).with("order");
}
@Bean
public Binding binding2(Queue queueDlx, DirectExchange topicDlxExchange){
return BindingBuilder.bind(queueDlx).to(topicDlxExchange).with("error");
}
}

③:发送消息

package com.it.rabbit.service;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.Date;
/**
* TODO
*
* @author: Coke
* @DateTime: 2023/08/20/19:25
**/
@Service
@Slf4j
public class MessageService {
@Value ("${rabbit.exchangeNormalName}")
private String exchangeNormalName;
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg () {
String sir = "这是第1个消息";
Message message = MessageBuilder.withBody(sir.getBytes(StandardCharsets.UTF_8)).build();
// 参数 (交换机 路由key 消息体)
rabbitTemplate.convertAndSend(exchangeNormalName, "order", message);
log.info("消息发送成!{}", new Date());
}
}

④:配置启动类发送消息

@SpringBootApplication
public class Rabbit03TopicApplication implements ApplicationRunner {
@Autowired
private MessageService messageService;
public static void main (String[] args) {
SpringApplication.run(Rabbit03TopicApplication.class, args);
}
@Override
public void run (ApplicationArguments args) throws Exception {
messageService.sendMsg();
}
}

⑤:消费者配置

org.springframework.boot
spring-boot-starter-amqp
server:
port: 8092
spring:
rabbitmq:
host: 192.168.100.134
port: 5672
username: Coke
password: 123456
listener:
simple:
acknowledge-mode: manual # 开启消费者手动确认

⑥:接收消息

package com.it.receiver.serive;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.xml.crypto.Data;
import java.io.IOException;
import java.util.Date;
/**
* 接收消息
*
* @author: Coke
* @DateTime: 2023/09/16/16:33
**/
@Component
@Slf4j
public class ReceiverMsg {
@RabbitListener (queues = {"queue.normal.4"})
public void receiverMsg (Message message, Channel channel) throws IOException {
// 获取消息属性
MessageProperties properties = message.getMessageProperties();
// 获取消息的唯一标识(类似身份证号)
long tag = properties.getDeliveryTag();
try {
byte[] body = message.getBody();
String msg = new String(body);
log.info("接收到消息{},时间为:{}", msg, new Date());
int i= 1/0;
// 消费者手动确认 tag: 消息唯一标识 false: 只确认当前消息(true:批量确认)
channel.basicAck(tag,false);
}catch (Exception e){
e.printStackTrace();
// tag: 消息唯一标识 false: 只确认当前消息 false: 不重新入队
channel.basicNack(tag,false,false);
}
}
}

⑦:效果

RabbitMQ消息中间件-每日运维

RabbitMQ消息中间件-每日运维

5. 消费者拒绝消息

  • 只需要将接收消费中的channel.basicNack(tag,false,false);改为channel.basicReject(tag, false);

  • 可参考:4. 消费者拒绝消息不进行重新投递

image.png

五、RabbitMQ延迟队列

1. 采用消息中间件(rabbitmq)

问题:先发送的消息,延时时间长,会影响到后面的消息

RabbitMQ消息中间件-每日运维

①:引入依赖和配置

org.springframework.boot
spring-boot-starter-amqp
server:
port: 8093
spring:
rabbitmq:
host: 192.168.100.134
port: 5672
username: Coke
password: 123456
rabbit:
exchangeName: exchange.delay.1 # 既是正常交换机 又是 死信交换机
queueNormalName: queue.delay.normal.1
queueDlxName: queue.delay.dlx.1

②:添加配置类

package com.it.rabbit.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* TODO
*
* @author: Coke
* @DateTime: 2023/08/20/19:15
**/
@Configuration
public class RabbitConfig {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Value ("${rabbit.queueNormalName}")
private String queueNormalName;
@Value ("${rabbit.queueDlxName}")
private String queueDlxName;
// 定义交换机
@Bean
public TopicExchange topicExchange () {
return ExchangeBuilder
.topicExchange(exchangeName)
.build();
}
// 定义正常队列
@Bean
public Queue queueNormal () {
return QueueBuilder
.durable(queueNormalName) // 队列名称
.ttl(25000) // 队列过期时间
.deadLetterExchange(exchangeName) // 设置死信交换机
.deadLetterRoutingKey("error") // 设置死信路由key
.build();
}
// 定义死信队列
@Bean
public Queue queueDlx () {
return QueueBuilder.durable(queueDlxName).build();
}
// 绑定正常队列与交换机
@Bean
public Binding bindingNormal (Queue queueNormal, TopicExchange topicExchange) {
return BindingBuilder.bind(queueNormal).to(topicExchange).with("order");
}
// 绑定正常死信与交换机
@Bean
public Binding bindingDlx (Queue queueDlx, TopicExchange topicExchange) {
return BindingBuilder.bind(queueDlx).to(topicExchange).with("error");
}
}

③:发送消息

package com.it.rabbit.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.Date;
/**
* TODO
*
* @author: Coke
* @DateTime: 2023/08/20/19:25
**/
@Service
@Slf4j
public class MessageService {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(){
Message message = MessageBuilder.withBody("这是一个延时队列的消息".getBytes(StandardCharsets.UTF_8)).build();
// 参数 (交换机 路由key 消息体)
rabbitTemplate.convertAndSend(exchangeName, "order", message);
log.info("消息发送成!{}", new Date());
}
}

④:配置启动类发送消息

@SpringBootApplication
public class Rabbit03TopicApplication implements ApplicationRunner {
@Autowired
private MessageService messageService;
public static void main (String[] args) {
SpringApplication.run(Rabbit03TopicApplication.class, args);
}
@Override
public void run (ApplicationArguments args) throws Exception {
messageService.sendMsg();
}
}

⑤:效果

  • 正常队列中有一条消息

RabbitMQ消息中间件-每日运维

  • 25秒后消息进入到死信队列中

RabbitMQ消息中间件-每日运维

2. 使用插件(rabbitmq-delayed-message-exchange)

RabbitMQ消息中间件-每日运维

①:下载插件

1. 官网:github.com/rabbitmq/ra…

RabbitMQ消息中间件-每日运维

RabbitMQ消息中间件-每日运维

②:安装插件

1. 将插件复制到linux中rabbitmq的/plugins目录下

RabbitMQ消息中间件-每日运维

2. 解压 unzip rabbitmq_delayed_message_exchange-3.9.0.ez

image.png

  • 解压后

RabbitMQ消息中间件-每日运维

③:启动插件

1. 可以先使用命令列出所有的插件名字 rabbitmq_delayed_message_exchange

RabbitMQ消息中间件-每日运维

2.启动插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchange

RabbitMQ消息中间件-每日运维

RabbitMQ消息中间件-每日运维

** 3.可以看到多了一种交换机类型**

RabbitMQ消息中间件-每日运维

④: 引入依赖与配置

image.png

org.springframework.boot
spring-boot-starter-amqp
server:
port: 8094
spring:
rabbitmq:
host: 192.168.100.134
port: 5672
username: Coke
password: 123456
rabbit:
exchangeName: exchange.delay.2 # 正常交换机
queueDelayName: queue.delay.2 # 正常队列

⑤:添加配置类

@Configuration
public class RabbitConfig {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Value ("${rabbit.queueDelayName}")
private String queueDelayName;
// 定义交换机 (自定义交换机)
@Bean
public CustomExchange customExchange () {
HashMap arguments = new HashMap();
arguments.put("x-delayed-type", "direct"); // 放一个参数
return new CustomExchange(exchangeName, "x-delayed-message", true, false, arguments);
}
// 定义正常队列
@Bean
public Queue queueNormal () {
return QueueBuilder
.durable(queueDelayName) // 队列名称
.build();
}
// 绑定正常队列与交换机
@Bean
public Binding bindingNormal (Queue queueNormal, CustomExchange customExchange) {
// 绑定 需要指定路由key
return BindingBuilder.bind(queueNormal).to(customExchange).with("plugin").noargs();
}
}

⑥:发送消息

@Service
@Slf4j
public class MessageService {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg () {
{
MessageProperties properties = new MessageProperties();
properties.setHeader("x-delay", 25000); // 延时25秒
Message message = MessageBuilder.withBody("这是一个延时25秒的消息消息".getBytes(StandardCharsets.UTF_8)).andProperties(properties).build();
// 参数 (交换机 路由key 消息体)
rabbitTemplate.convertAndSend(exchangeName, "plugin", message);
log.info("延时25秒消息发送成!{}", new Date());
}
{
MessageProperties properties = new MessageProperties();
properties.setHeader("x-delay", 15000); // 延时15秒
Message message = MessageBuilder.withBody("这是一个延时15秒的消息消息".getBytes(StandardCharsets.UTF_8)).andProperties(properties).build();
// 参数 (交换机 路由key 消息体)
rabbitTemplate.convertAndSend(exchangeName, "plugin", message);
log.info("延时15秒消息发送成!{}", new Date());
}
}
}

⑦:接收消息

@Component
@Slf4j
public class ReceiveMessage {
@RabbitListener (queues = "queue.delay.2")
public void receiveMessage (Message message) {
byte[] body = message.getBody();
String msg = new String(body);
log.info("接收到消息:{}, 时间为:{}", msg, new Date());
}
}

⑧:可以看到消息延时时间长的并没有影响到消息延时时间短的

image.png

六、消息可靠性(消息到达交换机)

1. confirm确认模式

image.png

①:配置文件(开启确认模式)

org.springframework.boot
spring-boot-starter-amqp
server:
port: 8097
spring:
rabbitmq:
host: 192.168.100.134
port: 5672
username: Coke
password: 123456
publisher-confirm-type: correlated # 开启生产者的消息确认模式
rabbit:
exchangeName: exchange.confirm.1
queueName: queue.confirm.1

②:添加配置类

@Configuration
public class RabbitConfig {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Value ("${rabbit.queueName}")
private String queueName;
// 定义交换机
@Bean
public TopicExchange topicExchange(){
return ExchangeBuilder.topicExchange(exchangeName).build();
}
// 定义队列
@Bean
public Queue queue(){
return QueueBuilder.durable(queueName).build();
}
// 绑定队列与交换机
@Bean
public Binding binding1(Queue queue, TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("info");
}
}
@Component
@Slf4j
public class MyConfirmCallBack implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm (CorrelationData correlationData, boolean ack, String cause) {
if (ack){
log.info("消息正确的达到交换机!");
return;
}
log.error("消息没有到达交换机,原因为:{}", cause);
}
}

③:发送消息

@Service
@Slf4j
public class MessageService {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MyConfirmCallBack myConfirmCallBack;
@PostConstruct // 该注解会在构造方法执行后执行,相当于初始化作用
public void init(){
rabbitTemplate.setConfirmCallback(myConfirmCallBack);
}
public void sendMsg(){
Message message = MessageBuilder.withBody("这是一个带有过期时间的消息".getBytes(StandardCharsets.UTF_8)).build();
CorrelationData correlationData = new CorrelationData();
correlationData.setId("id_110"); // 设置消息的关联id
// 参数 (交换机 路由key 消息体 相关生产者确认的数据)
rabbitTemplate.convertAndSend(exchangeName, "info", message,correlationData);
log.info("消息发送成!{}", new Date());
}
}

④:测试

1.发送消息时 故意将交换机名字写错 模拟错误

RabbitMQ消息中间件-每日运维

2.正常测试

RabbitMQ消息中间件-每日运维

2.confirm确认模式(lambda表达式写法)

RabbitMQ消息中间件-每日运维

MessageService(发送消息类)

@Service
@Slf4j
public class MessageService {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Autowired
private RabbitTemplate rabbitTemplate;
/* @PostConstruct // 该注解会在构造方法执行后执行,相当于初始化作用
public void init(){
rabbitTemplate.setConfirmCallback(
new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm (CorrelationData correlationData, boolean ack, String cause) {
if (ack){
log.info("消息正确的达到交换机!");
return;
}
log.error("消息没有到达交换机,原因为:{}", cause);
}
}
);
}*/
@PostConstruct // 该注解会在构造方法执行后执行,相当于初始化作用
public void init () {
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息正确的达到交换机!");
return;
}
log.error("消息没有到达交换机,原因为:{}", cause);
});
}
public void sendMsg () {
Message message = MessageBuilder.withBody("这是一个带有过期时间的消息".getBytes(StandardCharsets.UTF_8)).build();
CorrelationData correlationData = new CorrelationData();
correlationData.setId("id_110"); // 设置消息的关联id
// 参数 (交换机 路由key 消息体 相关生产者确认的数据)
rabbitTemplate.convertAndSend(exchangeName, "info", message, correlationData);
log.info("消息发送成!{}", new Date());
}
}

七、消息可靠性(交换机到队列)

image.png

1. Return模式

RabbitMQ消息中间件-每日运维

①:配置文件(开启确认模式)

org.springframework.boot
spring-boot-starter-amqp
server:
port: 8098
spring:
rabbitmq:
host: 192.168.100.134
port: 5672
username: Coke
password: 123456
publisher-returns: true # 开启return模式
rabbit:
exchangeName: exchange.return.1
queueName: queue.return.1

②:添加配置类

@Configuration
public class RabbitConfig {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Value ("${rabbit.queueName}")
private String queueName;
// 定义交换机
@Bean
public TopicExchange topicExchange(){
return ExchangeBuilder.topicExchange(exchangeName).build();
}
// 定义队列
@Bean
public Queue queue(){
return QueueBuilder.durable(queueName).build();
}
// 绑定队列与交换机
@Bean
public Binding binding1(Queue queue, TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("info");
}
}
@Component
@Slf4j
public class MyReturnCallBack implements RabbitTemplate.ReturnsCallback {
// 只有消息从交换机没有正确到达队列时才会回调
@Override
public void returnedMessage (ReturnedMessage returned) {
log.error("消息从交换机未能正确到达队列,原因为:{}", returned.getReplyText());
}
}

③:发送消息

@Service
@Slf4j
public class MessageService {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MyReturnCallBack myReturnCallBack;
@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(myReturnCallBack);
}
public void sendMsg(){
Message message = MessageBuilder.withBody("这是一个消息".getBytes(StandardCharsets.UTF_8)).build();
// 参数 (交换机 路由key 消息体)
rabbitTemplate.convertAndSend(exchangeName, "info11", message);
log.info("消息发送成!{}", new Date());
}
}

④:测试

1.发送消息时 故意将路由key写错 模拟错误

image.png

RabbitMQ消息中间件-每日运维

2.正常测试(消息发送成功,没有报错)

RabbitMQ消息中间件-每日运维

RabbitMQ消息中间件-每日运维

2. Return模式(lambda表达式写法)

image.png

MessageService(发送消息类)

@Service
@Slf4j
public class MessageService {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init () {
rabbitTemplate.setReturnsCallback(
returned -> log.error("消息从交换机未能正确到达队列,原因为:{}", returned.getReplyText())
);
}
public void sendMsg () {
Message message = MessageBuilder.withBody("这是一个消息".getBytes(StandardCharsets.UTF_8)).build();
// 参数 (交换机 路由key 消息体)
rabbitTemplate.convertAndSend(exchangeName, "info", message);
log.info("消息发送成!{}", new Date());
}
}

3. 交换机属性

// 定义交换机
@Bean
public TopicExchange topicExchange(){
return ExchangeBuilder.
topicExchange(exchangeName)
.durable(false) // isDurable – 持久标志(默认为 true)
.autoDelete() // 设置自动删除 (默认不是自动删除)
.build();
}

RabbitMQ消息中间件-每日运维

八、备用交换机

RabbitMQ消息中间件-每日运维

RabbitMQ消息中间件-每日运维

1. 实现

①:引入依赖与配置

org.springframework.boot
spring-boot-starter-amqp
server:
port: 8110
spring:
rabbitmq:
host: 192.168.100.134
port: 5672
username: Coke
password: 123456
rabbit:
exchangeName: exchange.alternate.normal.1 # 正常交换机
exchangeAlternateName: exchange.alternate.alternate.1 # 备用交换机
queueName: queue.alternate.normal.1 # 正常队列
queueAlternateName: queue.alternate.alternate.1 # 备用队列

②:添加配置类

@Configuration
public class RabbitConfig {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Value ("${rabbit.queueName}")
private String queueName;
@Value ("${rabbit.exchangeAlternateName}")
private String exchangeAlternateName;
@Value ("${rabbit.queueAlternateName}")
private String queueAlternateName;
// 正常交换机
@Bean
public TopicExchange topicExchange(){
return ExchangeBuilder.
topicExchange(exchangeName)
.alternate(exchangeAlternateName) // 设置备用交换机
.build();
}
// 正常队列
@Bean
public Queue queue(){
return QueueBuilder.durable(queueName).build();
}
// 绑定正常队列与交换机
@Bean
public Binding binding1(Queue queue, TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("info");
}
// 备用交换机
@Bean
public FanoutExchange fanoutExchange(){
return ExchangeBuilder.fanoutExchange(exchangeAlternateName).build();
}
// 备用队列
@Bean
public Queue queueAlternate(){
return QueueBuilder.durable(queueAlternateName).build();
}
// 绑定备用交换机与队列
@Bean
public Binding binding(Queue queueAlternate, FanoutExchange fanoutExchange){
return BindingBuilder.bind(queueAlternate).to(fanoutExchange);
}
}

③:发送消息

@Service
@Slf4j
public class MessageService {
@Value ("${rabbit.exchangeName}")
private String exchangeName;
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(){
Message message = MessageBuilder.withBody("这是一个消息".getBytes(StandardCharsets.UTF_8)).build();
// 参数 (交换机 路由key 消息体)
rabbitTemplate.convertAndSend(exchangeName, "info", message);
log.info("消息发送成!{}", new Date());
}
}

④:测试效果

1. 正常测试(指定正确路由key)

image.png

RabbitMQ消息中间件-每日运维

2.非正常测试(指定错误的路由key)通过备用交换机进入备用队列中

RabbitMQ消息中间件-每日运维

RabbitMQ消息中间件-每日运维

RabbitMQ消息中间件-每日运维