spring boot整合RabbitMQ,使用fastjson2实现序列化

2023年 7月 13日 22.0k 0

1.问题介绍

在Springboot项目中使用RabbitMQ作为消息中间件使用过程中,有时候我们会使用java对象作为传输的消息,默认情况下允许使用java对象作为传输的消息,如果强制性的使用Java对象作为消息(不使用json工具的情况下),会因为序列化和反序列化导出运行时异常, 但是却有着严格的条件限制:

  • 生产者和消费者的Java对象必须相同
  • Java对象的类路径(包名)必须相同
  • 如果在项目中开发者和消费者属于同一个项目则没有问题,如果开发者和消费者属于不同项目则第2个条件则比较麻烦,当然也有解决办法。比如实体类作为基础包(微服务/多模块项目中叫做基础模块)引入生产者和消费者项目中等等。

    还有一种方法是通过在生产者中将java对象转为json字符串,在消费者中将json字符串转回java对象。这种方法有很多弊端,首先要增加代码量(互相转换),还有就是针对不同的对象需要增加转换的代码。

    2.解决办法

    消息序列化(Jackson2)

    针对消息生产者,我们都是用RabbitTemplate发起消息,在RabbitTemplate对象中有一个方法专门用来实现Java对象的序列化,

    @SpringBootTest
    public class MqTest {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void producertTest(){
         // setMessageConverter 方法就是修改序列化
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            User user = new User();
            user.setName("张三");
            user.setAge(23);
            rabbitTemplate.convertAndSend(MqConstant.Exchange,"",user);
        }
    }
    

    注意: 上面这种序列化方式太low了,如果针对多个发起消息或者在不同的地方使用,每次都还要去修改序列化,所以,我们就将RabbitTemplate配置成一个Bean,我们每次去依赖注入的也就是这个Bean。

    @Configuration
    public class RabbitMQConfig {
    
        @Bean
        @SuppressWarnings("all")
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            // 在注入Bean的时候必须将连接工厂装配进来,因为这是自己定义的一个Bean
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            // setMessageConverter 方法使用jackson2去实现序列化
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            return rabbitTemplate;
        }
    }
    

    消息反序列化(Jackson2)

    消费者就是针对 @RabbitListener 这个注解反序列化,就要去实现RabbitListenerConfigurer 接口。

    @Configuration
    public class MyRabbitListenerConfigurer implements RabbitListenerConfigurer{
    
        @Bean
        public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
            DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
            // 这里的转换器设置实现了 通过 @Payload 注解 自动反序列化message body
            factory.setMessageConverter(new MappingJackson2MessageConverter());
            return factory;
        }
    
        @Override
        public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {
            rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
        }
    }
    

    在反序列化中,你可以使用 @Payload 注解直接将消息转换成Java对象,例如下面.

    @Service
    public class RabbitService {
    
        @RabbitListener(queues = MQConstant.QUEUE)
        public void message(@Payload User message){
            System.out.println(message);
        }
    }
    

    在后面还有很多注解可以使用,例如获取消息的id,获取消息的头。这些都是在使用spring boot整合RabbitMQ框架中最常用的方式

    注意: 实现序列化和反序列化都是实现 MessageConverter 接口 ,结果一个严重的问题出现了,如果你是通过第三方json框架去封装的消息序列化,那么你就要特别注意了。

    实现序列化使用的是:org.springframework.amqp.support.converter 包下的 MessageConverter

    实现反序列化使用的是:org.springframework.messaging.converter 包下的MessageConverter

    消息序列化(FastJson2)

    Fastjson就是一个第三方框架,之所以在项目常见到它,就是因为他的解析速度快的惊人(他们官方自己说的),并且在这次项目中我们使用的是Fastjson2版本,解析速度更不用说了。

    下面我们要在spring boot中使用第三方框架实现RabbitMQ的序列化问题,那么就要先去找FastJson中是否有这个实现 org.springframework.amqp.support.converter 包下的 MessageConverter 接口 或 继承 AbstractMessageConverter 抽象类的实现类,发现没有,那我们就要去实现。

  • 导入fastJson的依赖
  • 
        com.alibaba.fastjson2
        fastjson2-extension-spring5
        2.0.34
    
    
  • 排除spring boot的默认Jackson依赖,也可以不排除,个人习惯
  • 
        org.springframework.boot
        spring-boot-starter-web
        
            
                org.springframework.boot
                spring-boot-starter-json
            
        
    
    
  • 实现 MessageConverter 接口或者AbstractMessageConverter抽象类
  • import com.alibaba.fastjson2.JSON;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.support.converter.MessageConversionException;
    import org.springframework.amqp.support.converter.MessageConverter;
    
    public class FastjsonMessageConverter implements MessageConverter {
    
        @Override
        public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
            byte[] bytes = JSON.toJSONBytes(object);
            messageProperties.setContentType("application/json");
            messageProperties.setContentEncoding("UTF-8");
            messageProperties.setContentLength(bytes.length);
            return new Message(bytes, messageProperties);
        }
    
        @Override
        public Object fromMessage(Message message) throws MessageConversionException {
            byte[] bytes = message.getBody();
            return JSON.parseObject(bytes, Object.class);
        }
    }
    
    import com.alibaba.fastjson2.JSON;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.support.converter.AbstractMessageConverter;
    import org.springframework.amqp.support.converter.MessageConversionException;
    
    public class FastjsonMessageConverter extends AbstractMessageConverter {
        @Override
        protected Message createMessage(Object o, MessageProperties messageProperties) {
            byte[] bytes = JSON.toJSONBytes(o);
            messageProperties.setContentType("application/json");
            messageProperties.setContentEncoding("UTF-8");
            messageProperties.setContentLength(bytes.length);
            return new Message(bytes, messageProperties);
        }
    
        @Override
        public Object fromMessage(Message message) throws MessageConversionException {
            byte[] bytes = message.getBody();
            return JSON.parseObject(bytes, Object.class);
        }
    }
    

    其实 AbstractMessageConverter 抽象类 也是实现了MessageConverter接口,继承AbstractMessageConverter去实现,主要是因为AbstractMessageConverter帮我们自动生成了消息的id,看看源码就知道。

    image.png

    消息反序列化(FastJson2)

    使用FastJson解决mq反序列化,其实与上面jackson的反序列化一模一样,只是将 MappingJackson2MessageConverter 改成MappingFastJsonMessageConverter即可

    @Configuration
    public class MyRabbitListenerConfigurer implements RabbitListenerConfigurer {
    
        @Bean
        public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
            DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
            // 这里的转换器设置实现了 通过 @Payload 注解 自动反序列化message body
            factory.setMessageConverter(new MappingFastJsonMessageConverter());
            return factory;
        }
    
        @Override
        public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {
            rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
        }
    }
    

    提示: 如果你使用其他的json库,也都是这样实现序列化问题

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论