问题描述
同事说有个项目不能自动创建新队列,每次新增队列都要先在rabbitmq后台手动创建,并且只有生产环境是这样,测试环境没有这个问题。
我一听还有这种事情,这怎么能忍,本着我不入地狱谁入地狱的精神开始排查这个问题,顺便在深入了解下springboot怎么实现rabbitmq队列的自动创建。
查找原因
正常来说,消费者使用@RabbitListener注解声明了queue、exchange之后会自动创建不存在的队列,并声明队列和exchange的绑定关系。在这个项目中也是这么用的,但是为什么没有创建队列呢。
首先通过观察启动日志发现如下报错信息片段
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no queue 'test_queue' in vhost 'test_vhost', class-id=50, method-id=10)
通过报错信息能很明确知道是因为队列在vhost中不存在,channel异常关闭。通过官网也能说明这一点
rabbitmq官网对channel错误信息描述但这还是因为没有创建队列导致的结果,还要继续找没有自动创建的原因。继续查看日志发现还有一条报错信息
Channel shutdown: channel error; protocol method: #method(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'auto_delete' for exchange 'dome_exchange' in vhost 'test_vhost': received 'false' but current is 'true', class-id=40, method-id=10)
因为这条报错信息和当前队列并没有直接关系,所以被同事忽略了。但我看到channel.close关键字心想难道就是因为这个报错导致链接关闭没有创建队列?带着这个疑问继续排查问题。
查看源码
首先确认报错原因,通过rabbitmq后台查看deme_exchange,发现auto_delete参数确实为true
rabbitmq后台查看参数
消费者代码
消费者代码中并没有声明auto_delete参数,通过查看@Exchange注解发现auto_delete默认为false
/**
* @return true if the exchange is to be declared as auto-delete.
*/
String autoDelete() default "false";
这就导致应用启动时代码中声明的exchange和远程已经存在的参数不一致,导致报406错误。通过上面官网截图也可看到对406报错的说明能够印证这一点。至于不一致的原因,后来问同事他应该是在后台手动创建的exchange,并没有注意这些参数细节。所以一定要规范操作啊,不然各种挖坑。
找到406的报错原因后,继续排查为什么队列没有自动创建。通过查看@RabbitListener注解源码,发现对queues参数的说明如下(不同springboot版本说明可能不一样)
/**
* The queues for this listener.
* The entries can be 'queue name', 'property-placeholder keys' or 'expressions'.
* Expression must be resolved to the queue name or {@code Queue} object.
* The queue(s) must exist, or be otherwise defined elsewhere as a bean(s) with
* a {@link org.springframework.amqp.rabbit.core.RabbitAdmin} in the application
* context.
* Mutually exclusive with {@link #bindings()} and {@link #queuesToDeclare()}.
* @return the queue names or expressions (SpEL) to listen to from target
* @see org.springframework.amqp.rabbit.listener.MessageListenerContainer
*/
String[] queues() default {};
其中说到要么队列已经存在,或者在其他地方定义了RabbitAdmin这个bean。
继续查看RabbitAdmin源码发现几个关键信息,该类实现InitializingBean接口,该接口在Bean初始化完成后会执行指定逻辑,在InitializingBean接口下的afterPropertiesSet()方法中实现,RabbitAdmin重写的afterPropertiesSet()方法中发现会调用initialize()方法,该方法源码如下
/**
* Declares all the exchanges, queues and bindings in the enclosing application context, if any. It should be safe
* (but unnecessary) to call this method more than once.
*/
@Override // NOSONAR complexity
public void initialize() {
if (this.applicationContext == null) {
this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings");
return;
}
this.logger.debug("Initializing declarations");
Collection contextExchanges = new LinkedList(
this.applicationContext.getBeansOfType(Exchange.class).values());
Collection contextQueues = new LinkedList(
this.applicationContext.getBeansOfType(Queue.class).values());
Collection contextBindings = new LinkedList(
this.applicationContext.getBeansOfType(Binding.class).values());
Collection customizers =
this.applicationContext.getBeansOfType(DeclarableCustomizer.class).values();
processDeclarables(contextExchanges, contextQueues, contextBindings);
final Collection exchanges = filterDeclarables(contextExchanges, customizers);
final Collection queues = filterDeclarables(contextQueues, customizers);
final Collection bindings = filterDeclarables(contextBindings, customizers);
for (Exchange exchange : exchanges) {
if ((!exchange.isDurable() || exchange.isAutoDelete()) && this.logger.isInfoEnabled()) {
this.logger.info("Auto-declaring a non-durable or auto-delete Exchange ("
+ exchange.getName()
+ ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". "
+ "It will be deleted by the broker if it shuts down, and can be redeclared by closing and "
+ "reopening the connection.");
}
}
for (Queue queue : queues) {
if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) {
this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue ("
+ queue.getName()
+ ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:"
+ queue.isExclusive() + ". "
+ "It will be redeclared if the broker stops and is restarted while the connection factory is "
+ "alive, but all messages will be lost.");
}
}
if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) {
this.logger.debug("Nothing to declare");
return;
}
this.rabbitTemplate.execute(channel -> {
declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
declareQueues(channel, queues.toArray(new Queue[queues.size()]));
declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
return null;
});
this.logger.debug("Declarations finished");
}
可以看到initialize()方法中会从spring容器中获取所有的Exchange、Queue和Binding等bean,并在declareExchanges()、declareQueues()、declareBindings()等方法中创建不存在的exchange、queues和绑定关系。这也能解释为什么要在消费者中创建相应的exchange、queue和Bingding的bean,并且自动创建队列。
当declareExchanges()方法有异常返回406时,并不会继续执行下面的declareQueues()和declareBindings()方法,所以新队列并没有自动创建。至此就找到了队列没有创建的原因以及自动创建exchange和queues相关的源码。而测试环境没有报错的原因是因为不存在exchange参数不一致的情况。
RabbitAdmin是如何自动创建的
进一步思考发现RabbitAdmin这个bean并没有在代码中创建,那么它是什么时候注入到容器中的呢。
继续追查源码发现RabbitAdmin在RabbitAutoConfiguration类中被创建并注入到容器中
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
@ConditionalOnMissingBean
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
RabbitAutoConfiguration是rabbitmq的配置类,通过注解可以发现当存在RabbitTemplate类时会启用
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
RabbitTemplate是对Rabbitmq封装的操作类,在项目中有被显式声明,所以RabbitAutoConfiguration也会被创建,至此rabbitmq自动创建队列的大致流程就被串起来了。