1.问题介绍
在Springboot项目中使用RabbitMQ作为消息中间件使用过程中,有时候我们会使用java对象作为传输的消息,默认情况下允许使用java对象作为传输的消息,如果强制性的使用Java对象作为消息(不使用json工具的情况下),会因为序列化和反序列化导出运行时异常, 但是却有着严格的条件限制:
如果在项目中开发者和消费者属于同一个项目则没有问题,如果开发者和消费者属于不同项目则第2个条件则比较麻烦,当然也有解决办法。比如实体类作为基础包(微服务/多模块项目中叫做基础模块)引入生产者和消费者项目中等等。
还有一种方法是通过在生产者中将java对象转为json字符串,在消费者中将json字符串转回java对象。这种方法有很多弊端,首先要增加代码量(互相转换),还有就是针对不同的对象需要增加转换的代码。
2.解决办法
消息序列化(Jackson2)
针对消息生产者,我们都是用RabbitTemplate发起消息,在RabbitTemplate对象中有一个方法专门用来实现Java对象的序列化,
@SpringBootTest
public class MqTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void producertTest(){
// setMessageConverter 方法就是修改序列化
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
User user = new User();
user.setName("张三");
user.setAge(23);
rabbitTemplate.convertAndSend(MqConstant.Exchange,"",user);
}
}
注意: 上面这种序列化方式太low了,如果针对多个发起消息或者在不同的地方使用,每次都还要去修改序列化,所以,我们就将RabbitTemplate配置成一个Bean,我们每次去依赖注入的也就是这个Bean。
@Configuration
public class RabbitMQConfig {
@Bean
@SuppressWarnings("all")
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
// 在注入Bean的时候必须将连接工厂装配进来,因为这是自己定义的一个Bean
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// setMessageConverter 方法使用jackson2去实现序列化
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
}
消息反序列化(Jackson2)
消费者就是针对 @RabbitListener 这个注解反序列化,就要去实现RabbitListenerConfigurer 接口。
@Configuration
public class MyRabbitListenerConfigurer implements RabbitListenerConfigurer{
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
// 这里的转换器设置实现了 通过 @Payload 注解 自动反序列化message body
factory.setMessageConverter(new MappingJackson2MessageConverter());
return factory;
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {
rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}
}
在反序列化中,你可以使用 @Payload 注解直接将消息转换成Java对象,例如下面.
@Service
public class RabbitService {
@RabbitListener(queues = MQConstant.QUEUE)
public void message(@Payload User message){
System.out.println(message);
}
}
在后面还有很多注解可以使用,例如获取消息的id,获取消息的头。这些都是在使用spring boot整合RabbitMQ框架中最常用的方式
注意: 实现序列化和反序列化都是实现 MessageConverter 接口 ,结果一个严重的问题出现了,如果你是通过第三方json框架去封装的消息序列化,那么你就要特别注意了。
实现序列化使用的是:org.springframework.amqp.support.converter 包下的 MessageConverter
实现反序列化使用的是:org.springframework.messaging.converter 包下的MessageConverter
消息序列化(FastJson2)
Fastjson就是一个第三方框架,之所以在项目常见到它,就是因为他的解析速度快的惊人(他们官方自己说的),并且在这次项目中我们使用的是Fastjson2版本,解析速度更不用说了。
下面我们要在spring boot中使用第三方框架实现RabbitMQ的序列化问题,那么就要先去找FastJson中是否有这个实现 org.springframework.amqp.support.converter 包下的 MessageConverter 接口 或 继承 AbstractMessageConverter 抽象类的实现类,发现没有,那我们就要去实现。
com.alibaba.fastjson2
fastjson2-extension-spring5
2.0.34
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-json
import com.alibaba.fastjson2.JSON;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
public class FastjsonMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
byte[] bytes = JSON.toJSONBytes(object);
messageProperties.setContentType("application/json");
messageProperties.setContentEncoding("UTF-8");
messageProperties.setContentLength(bytes.length);
return new Message(bytes, messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
byte[] bytes = message.getBody();
return JSON.parseObject(bytes, Object.class);
}
}
import com.alibaba.fastjson2.JSON;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;
public class FastjsonMessageConverter extends AbstractMessageConverter {
@Override
protected Message createMessage(Object o, MessageProperties messageProperties) {
byte[] bytes = JSON.toJSONBytes(o);
messageProperties.setContentType("application/json");
messageProperties.setContentEncoding("UTF-8");
messageProperties.setContentLength(bytes.length);
return new Message(bytes, messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
byte[] bytes = message.getBody();
return JSON.parseObject(bytes, Object.class);
}
}
其实 AbstractMessageConverter 抽象类 也是实现了MessageConverter接口,继承AbstractMessageConverter去实现,主要是因为AbstractMessageConverter帮我们自动生成了消息的id,看看源码就知道。
消息反序列化(FastJson2)
使用FastJson解决mq反序列化,其实与上面jackson的反序列化一模一样,只是将 MappingJackson2MessageConverter 改成MappingFastJsonMessageConverter即可
@Configuration
public class MyRabbitListenerConfigurer implements RabbitListenerConfigurer {
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
// 这里的转换器设置实现了 通过 @Payload 注解 自动反序列化message body
factory.setMessageConverter(new MappingFastJsonMessageConverter());
return factory;
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {
rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}
}
提示: 如果你使用其他的json库,也都是这样实现序列化问题