相当通俗易懂的RocketMQ顺序消息

2023年 8月 18日 94.6k 0

归去,也无风雨也无晴。

DSC_0414.JPG

哈喽大家好呀,好久不见,今天继续来跟大家聊一聊RocketMQ吧,上回书说到了RocketMQ的性能优化,有兴趣的同学可以回去翻一翻上一篇文章:
关于RocketMQ那些你可能不知道的性能优化

其实顺序消息是业务中常用的功能之一,只要是使用MQ的,就不太可能绕的过去这个问题。就算是业务上不需要保证顺序消息,但是作为开发,作为码农,怎么着都要跟你的PM确认一下需求的啦,那肯定就会聊到MQ的顺序消息的啦!

举一个简单的例子,咱们就不说老生常谈的购物交易系统,订单系统之类的,就比如数据库的BinLog消息,数据库执行新增,修改,删除语句,那BinLog记录的也必须是新增,修改,删除语句,这个很好理解吧。

那话不多说,咱们直接开整~

风和日丽,天朗气清,又是适合面试的一天,作为一个十面九offer的面试者,又来拜访面试官了。

小伙子,上次咱们聊到RocketMQ的性能优化,你回答的不错,那你知道RocketMQ是怎么保证顺序消息的吗?

哎哟,这不是侮辱人呢吗,我都自己聊到RocketMQ了,我害怕你问?那我害怕你不问呢。

推了推眼镜,故作思考:

是这样的面试官大佬,顺序消息呢,分为顺序发送和顺序接收,而在RocketMQ中呢,本身是支持顺序消息的, 所以在代码上实现起来也是很简单的。

顺序发送消息:

  • 首先,我们可以把我们spring.properties中的配置更改为以下配置,保证同步发送:

    spring.cloud.stream.rocketmq.bindings.output.pruducer.sync=true

  • 其次,我们在业务代码中发送消息时,指定消息发送的Header,指定发送到第0个消息队列:

    @RestController
    public class OrderlyController {
        @Autowired
        private Source source;
        
        @GetMapping(value = "/orderly")
        public String orderlySend() {
            List messageList = Arrays.asList("insert", "update", "delete");
            
            for (String message : messageList) {
                MessageBuilder builder = MessageBuilder.withPayload(message).setHeader(BindingHeaders.PARTITION_HEADER, 0);
                
                Message msg = builder.build();
                source.output().send(message);
            }
            return "success";
        }
    }
    
  • 顺序接收消息:

  • 同样的,我们也需要把spring.properties中的配置文件改为以下配置,保证同步接收:

    spring.cloud.stream.rocketmq.bindings.input.consumer.orderly=true

  • 我们在消费消息的时候,只需要正常的消费消息就可以了,代码嘛,随便写写就好咯:

    @StreamListener(value = Sink.INPUT)
    public void receive(String receivedMessage) {
        System.out.println(receivedMessage);
    }
    
  • 顺序发送消息原理:

    说起这个不难,那小伙子你知道这个顺序消息的原理吗?RocketMQ他是怎么实现顺序消息的呢?

    咳咳。。。你以为我为什么说一半不说了,呐,你这不是跳进来了吗?这个时候我知道答案,怎么办,很急,很想一股脑全说出来,但是!我知道很急,但我先别急,等他三五秒钟。

    RocketMQ呢,它的顺序消息其实分为两种,第一种是局部有序,也就是前面我说的那一种,第二种就是全局有序了。

    那什么是局部有序,什么是全局有序呢?

    局部有序:

    好问题!

    局部有序呢是指发送到同一个消息队列的消息是有序的,可以在发送消息的时候指定发送的消息队列(参考上文发送顺序消息),同样,在消费消息时也按顺序消费。例如,同一个用户的数据库操作需要保证有序,不同的用户的数据库操作就没有约束,相互之间不会影响,可以是并行的。

    全局有序:

    理解了上一个,那这一个就比较好理解了,我们只需要设置Topic只有一个队列,就可以实现全局有序了(有没有吐了一口老血?),我们需要在创建Topic的时候手动设置只创建一个队列。但是这一类的场景在国内比较少见,因为国内用户基数大嘛,这种性能低下的方式通常不会被推荐使用。

    同时呢,RocketMQ中消息发送有三种方式:同步,异步,单向。

    同步发送:发送网络请求后会同步等待Broker服务器的返回结果,支持发送失败重试,适用于比较重要的消息通知业务场景。

    异步发送:异步发送网络请求,不会阻塞当前线程,不支持失败重试,适用于对响应时间要求相对较高的业务场景。

    单向发送:单向发送的原理和异步一致,但不支持回调。适用于响应时间非常短,可靠性不高的场景,比如:日志。

    其实顺序发送消息的原理很简单,只需要将同一类消息发送到相同的队列即可。为了保证先发送的消息先到达消息队列,那就必须使用同步的消息发送模式,否则可能发生后发送的消息反而先到消息队列的场景,这样的话,消息就乱序了。

    这里咱们也看一下RocketMQ的源码,分析一波:

    image-20230817233124316.png

    选择队列的过程由MessageQueueSelector和hashKey在实现类SelectMessageQueueByHash中完成:

    • 根据hashKey计算hash值,hashKey时我们前面例子中的用户的唯一键,例如用户ID之类的,所以计算出的hash值完全相同。
    • 用hash值和队列数mqs.size()取模,得到一个索引值,结果当然是小于队列数的。
    • 根据索引值从队列列表中取出一个队列msq.get(value),hash值相同则队列相同。

    在队列列表获取的过程中,由Producer从NameServer根据Topic查询Broker列表,缓存在本地内存中,以便于下次从缓存中读取。

    哟,小伙子不错嘛,这也能答上来,那我再问问你,RocketMQ中除了顺序消息还有其他类型的消息吗?

    额。。当然有啦,除了顺序消息之外,RocketMQ还支持事务消息和延迟消息,除了这三种之外,其他的嘛,都可以称之为普通消息。日常开发中最常用的场景也是普通消息,这也是因为最常用的场景是系统间的异步解耦和削峰填谷,所以在这些场景下,尽量优先要保证的是消息的高效收发。

    普通消息和顺序消息对比来看呢,普通消息在发送时选择消息队列的策略不同。普通消息发送选择队列主要有两种机制,这里简单说一下轮询:

    • 轮询(默认):作为默认选择的机制,也通俗易懂,一个Topic有多个队列,轮询选择其中一个就好了。原理是路由信息TopicPublishInfo中维护了一个计数器,每发送一次都要 查询一次路由,并把计数器+1,通过计数器的值index与队列的数量取模计算来实现轮询算法。

      轮询机制固然简单好用,但是有一个弊端,如果轮旋选择的队列是在宕机的Broker上,会导致消息发送失败,即使消息重新发送冲的时候重新选择队列,也有可能选择在宕机的Broker上,无法规避发送失败的情况,所以就有了故障规避的机制,后续我们慢慢聊这个。

    image-20230817235016767.png

    顺序消费消息原理:

    你说了这么多,那顺序消费消息又是如何实现的呢?

    哦哦,这个问题是这样的,RocketMQ呢支持两种消息模式,集群消费和广播消费。这两者的区别是,在广播消费模式下,每条消息会被ComsumerGroup的每个Comsumer消费,在集群消费模式下,每条消息只会被ConsumerGroup中的一个Consumer消费。

    其实呢,多数场景下都是使用集群消费,消息没消费一次就代表一次业务处理,这也不难理解,就像是每条消息被集群模式下的单个服务实例来处理。少数场景下会使用广播消费,比如数据发生变化,需要每一个收到此消息的服务实例清除一次缓存,这就需要整个集群中每个消费者都消费一次消息。默认情况下呢,是集群消费,我们也就分析此条件下的顺序消费。

    顺序消费也叫有序消费,原理是同一个消息队列只允许Consumer中的一个消费线程拉去消费。Consumer中有多个消费线程,多个线程会同时消费消息,那在此条件下,很容易就想到独占锁,没错,顺序消费中,消费线程会先申请独占锁,得到锁之后才会被允许消费,我们来看看源码:

    image-20230818000710452.png

    image-20230818000739924.png
    消费成功后,会向Broker提交消费进度,更新消费位点信息,避免下次拉取到已经消费过的消息。顺序消费中,如果消费线程顺序消费中如果消费线程在监听器中进行业务处理时抛出异常,則不会提交消费进度,消费进度会阻塞在当前这条消息,并不会维续消费该队列中后续的消息,从而保证瓶宇消费。在版序消费的场景下,特别需要注意对异常的处理,如果重试也失败,会一直阻塞在当前消息,直到超出最大重试次数,从而在很长一段时间内无法消费后续消息造成队列消息堆积。

    并发消费的技术原理:

    面试官表面故作正定,内心实则大为震惊,缓缓说道:除了顺序消费,还有其他消费模式吗?

    我真是***了,还不够???

    其实是有的,Rocke MQ 支持两种消费方式:顺序消费和并发消费。并发消费是默认的消费方式,日常开发过程中最常用的方式,除了顺序消费就是并发消费。并发消费也称为乱序消费,其原理是同一个消息队列提供给 Consumer 中的多个消费线程拉取消费。Consumer 中会维护一个消费线程池,多个消费线程可以井发去同一个消息队列中拉取消息进行消费。如果某个消费线程在监听器中进行业务处理时抛出异常,当前消费线程拉取的消息会进行重试,不影响其他消费线程和消意队列的消费进度,消费成功的线程正常提交消费进度。

    并发消费相比顺序消费没有资源争抢上锁的过程,消费消息的速度比顺序消费要快很多。

    消息的幂等性:

    哟,看来还真是难不倒你嘛,那你再说说消息的幂等性吧。

    好的面试官,我们在业务处理中经常会遇到幂等性的问题,同一个消息是否能够被多次消费,如果重复收到,多次消费是否符合业务逻辑?

    在RocketMQ这里,它是不保证消息不被重复消费的,如果业务队消费重复非常敏感,那必须要在业务层面进行幂等性处理,具体实现可以有多种方式,当然,也可以通过分布式锁来实现:

    在所有的消息系统中,包括rabbitMQ,kafka,rocketMQ,一共有三种模式:at-most-once(最多一次)、at-least-once(最少一次)和exactly-only-once(精确一次),分布式消息都是在三者之间取平衡,前两者是可行的并且广泛使用的。

    at-most-once(最多一次):消息投递后不论消费是否成功,不会再重复投递,可能会导致消费未被消费,RocketMQ未使用这种方式。

    at-least-once(最少一次):消息投递后,向服务器返回ACK(消息确认机制),没有消费则一定不会返回ACK,由于网络异常等其他情况,服务器没能接收到客户端返回的ACK,服务器会再次投递,这就会导致重复消息,RocketMQ就是通过ACK确认来确保消息至少被消费一次。

    exactly-only-once(精确一次):必须满足两个条件,第一,发送消息阶段,不允许发送重复消息,第二,消费消息阶段,不允许消费重复消息。在分布式的环境中,如果要实现该模式,巨大的开销是少不了的,为了追求高性能,RocketMQ不保证此特性,无法避免消息重复,改为业务来保证幂等性。

    可以小伙子,看来你还不错,基础还不错,今天也不早了,你先回去吧,明天我们再聊聊事务消息?

    咳咳...好的面试官大大!(内心OS:还来?你是不是天天闲的啊?我看你比我天天背八股文都闲)

    后记:

    写这篇看似简单的文章,也翻了一些书籍,查了一些资料,也不容易哈哈哈,希望能帮助到你!

    另外RocketMQ也写了一大半了,快要结束了,大家后续有什么想要看的,可以评论告诉我哦。

    顺便再次吐槽,mac自带的输入法难用到极点!真的!

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论