RocketMQ4源码(五)消费者特性

2023年 8月 18日 76.2k 0

前言

本章基于4.6.0分析两个消费者特性:

  • 广播消费
  • 顺序消费
  • 虽然把顺序消费归类为消费者特性,实际上顺序消费需要producer和broker两个角色的支持,只不过大部分逻辑在consumer侧。

    注:本文结合第三章RocketMQ4源码(三)普通消息消费一起食用。

    一、广播消费

    案例

    设置MessageModel为BROADCASTING。

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_broadcasting_consumer");
    consumer.setNamesrvAddr("localhost:9876");
    consumer.setMessageModel(MessageModel.BROADCASTING);
    consumer.subscribe("TopicTest", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List msgs,
            ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    

    广播消费大致流程和普通消费还是一致的。

    启动

    consumer启动阶段,有三个特点。

    第一,广播消费不会自动订阅重试topic。

    所以广播消费肯定不支持消费重试。

    DefaultMQPushConsumerImpl#copySubscription:

    第二,MQClientInstance创建。

    集群消费,同一个进程中只有一个MQClientInstance实例。

    广播消费,同一个进程中也只有一个MQClientInstance实例。

    但是如果进程中同时存在集群消费和广播消费实例,这里会存在两个MQClientInstance实例。

    DefaultMQPushConsumerImpl#start:

    第三,调用OffsetStore#load加载消费进度。

    对于集群消费来说,RemoteBrokerOffsetStore是空实现。

    对于广播消费来说,LocalFileOffsetStore将从本地文件系统加载offset到内存table。

    DefaultMQPushConsumerImpl#start:

    LocalFileOffsetStore本地存储offset,不同group存储在不同路径:{user.home}/.rocketmq_offsets/{clientId}/{group}/offsets.json。

    区别于broker存储offset,broker将所有group都存储在一个consumerOffset.json文件中。

    rebalance

    RebalanceImpl#rebalanceByTopic:

    广播消费,在rebalance阶段只需要用路由中topic下所有queue分配给自己即可;

    集群消费,需要执行平均分配算法AllocateMessageQueueAveragely分配。

    RebalanceImpl#updateProcessQueueTableInRebalance:

    更新processQueueTable时,消费进度也不需要从broker获取,从本地LocalFileOffsetStore获取即可。

    pull

    DefaultMQPushConsumerImpl#pullMessage:

    集群模式下,pull消息可以顺便commit消费进度;

    广播模式下,消费进度由consumer自己管理在本地,所以不需要broker在pull消息时做offset提交逻辑;

    consume

    消费逻辑分为两步:1-调用用户Listener,2-处理消费结果。

    ConsumeMessageConcurrentlyService#processConsumeResult:

    处理消费结果,广播消费不具备重试能力,仅仅打印一些warn日志。

    回顾集群消费,支持将消费失败的消息,重新投递给broker,投递broker失败还支持延迟5s重新构建ConsumeRequest到本地。

    更新消费进度和集群消费一致,都是更新内存。

    MQClientInstance#startScheduledTask:

    消费进度的持久化完全依赖于客户端后台每5s的定时任务。

    consume超时

    ConsumeMessageConcurrentlyService#start:

    之前说到,并行消费模式下,每隔15分钟扫描分配给自己的所有ProcessQueue。

    如果ProcessQueue中有消息在用户Listener中执行超出15分钟,重新投递给broker进行重试。

    这里比较特别的是,虽然广播消费不支持消费重试,但是仍然做了这个事情,不知道有什么意义。

    二、顺序消费

    案例

    生产者需要使用MessageQueueSelector,按照业务诉求对于需要顺序执行消息,放入同一个MessageQueue。

    比如下面将同样尾号的订单消息放入同一个MessageQueue。

    DefaultMQProducer producer = new DefaultMQProducer("groupXXX");
    producer.setNamesrvAddr("localhost:9876");
    producer.start();
    for (int orderId = 0; orderId < 100; orderId++) {
        Message msg = new Message("TopicABC", (orderId + "").getBytes());
        SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List mqs, Message msg, Object arg) {
                Integer id = (Integer) arg;
                int index = id % mqs.size();
                return mqs.get(index);
            }
        }, orderId);
        System.out.printf("%s%n", sendResult);
    }
    

    消费者需要使用MessageListenerOrderly执行消费逻辑。

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer_test");
    consumer.setNamesrvAddr("localhost:9876");
    consumer.subscribe("TopicABC", "*");
    consumer.registerMessageListener(new MessageListenerOrderly() {
        @Override
        public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeOrderlyStatus.SUCCESS;
        }
    });
    consumer.start();
    

    producer 发送

    DefaultMQProducerImpl#sendSelectImpl:

    相较于普通消息发送,顺序消息发送有两个区别:

  • 顺序消息由用户决定发送的MessageQueue;而普通消息由框架决定;
  • 顺序消息在框架层面不支持发送重试;而普通消息同步发送支持2次重试;
  • broker 全局锁api

    虽然consumer端通过AllocateMessageQueueAveragely平均分配算法,可以很好的处理queue的分配。

    但是这取决于rebalance时刻,broker返回给当前consumer实例的组内成员视图clientId集合。

    当消费组成员上下线时,不同组内成员可能读到的成员视图不同,导致多个组内成员可能在某一时刻消费同一个queue。(rebalance线程只能顺序处理成员变更)

    当消费组集群趋于稳定,最终每个queue会被组内某个consumer实例独占。

    为了保证queue消费的严格顺序,broker提供了两个api,来确保同一时刻一个queue只能被一个组内成员消费。

    • LOCK_BATCH_MQ:根据group、clientId,批量获取MessageQueue的独占锁;
    • UNLOCK_BATCH_MQ:根据group、clientId,批量释放MessageQueue的独占锁;

    RebalanceLockManager用一个mqLockTable管理每个消费组对于MessageQueue的锁定情况。

    LockEntry包含clientId的上次获取全局锁成功的时间(续期)。

    LockEntry#isExpired:

    为了防止consumer非正常下线,全局锁是有有效期的,默认60s未续期,锁就失效了。

    当锁过期,其他组内consumer实例就能竞争这个全局锁。

    LockEntry#isLocked:

    判断MessageQueue是否被当前client锁定,包含锁过期逻辑。

    释放全局锁

    有两种情况会释放全局锁:

  • 消费者rebalance,原先分配给自己的queue不再分配给自己,移除ProcessQueue;
  • 消费者正常shutdown下线;
  • RebalanceLockManager#unlockBatch:

    释放全局锁逻辑较为简单,循环客户端发送的所有queue,判断组内这个queue确实被当前客户端持有,则移除MessageQueue。

    注意点1,必须先获取一个ReentrantLock,保证单线程操作mqLockTable,否则和获取全局锁并发下会发生race-condition。

    注意点2,unlock没有任何返回值,即不关心是否unlock成功哪些queue。

    获取全局锁

    有两种情况会获取全局锁:

  • 消费者rebalance,新增分配queue给自己,新增ProcessQueue;
  • 消费者对已经分配给自己的queue执行全局锁续期;
  • RebalanceLockManager#tryLockBatch:

    对于锁续期,大部分情况下consumer组处于稳定状态。

    所以只需要简单校验并续期即可,不需要操作mqLockTable,所以可以走一个fast-path,不需要走ReentrantLock。

    RebalanceLockManager#isLocked:

    判断MessageQueue是否属于当前client且未过期,如果是的话直接续期,返回true。

    RebalanceLockManager#tryLockBatch:

    consumer组处于非稳定状态,要竞争全局锁,需要ReentrantLock保护下操作mqLockTable。

    注意这里ReentrantLock的范围是整个mqLockTable,即使两个消费组在这里也不能并发操作。

    本质上获取全局锁成功就是将当前clientId放到mqLockTable中。

    consumer rebalance

    rebalance阶段,为自己分配完MessageQueue后,更新内存processQueueTable。

    RebalanceImpl#updateProcessQueueTableInRebalance:

    无论是移除queue还是新增queue,对于顺序消费都有一些特殊逻辑。

    新增queue(获取全局锁)

    RebalanceImpl#updateProcessQueueTableInRebalance:

    对于新分配给自己的queue,需要调用lock。

    如果lock失败,不会将queue真正分配给自己,需要等到下次rebalance检测时再次获取锁。

    注:这里有个bug,新分配给自己的queue,首次创建ProcessQueue未设置lock标志位为true,在4.9.5版本修复,见ISSUE-5465。

    RebalanceImpl#lock:

    向MessageQueue所在broker发送一个LOCK_BATCH_MQ请求,其中包含本次要锁定的MessageQueue和当前消费组。

    broker返回锁定成功的MessageQueue,如果本次要锁定的MessageQueue在其中,则表示锁定成功,返回true。

    其他情况都当做锁定失败,返回false。

    移除queue(释放全局锁)

    RebalanceImpl#updateProcessQueueTableInRebalance:

    对于移除自己的queue,需要调用unlock,如果unlock失败,不会将queue真正移除。

    RebalancePushImpl#removeUnnecessaryMessageQueue:

    对于顺序消费需要执行unlock。

    这里需要获取ProcessQueue中的一个互斥锁(pq.getLockConsume.tryLock),与用户Listener消费逻辑互斥。

    如果用户代码正在消费ProcessQueue,那么这里可能tryLock失败,queue不能从processQueueTable中移除,还属于自己。

    RebalancePushImpl#unlockDelay:

    如果ProcessQueue中还有缓存的消息,延迟20s再执行unlock;否则立即执行unlock。

    RebalanceImpl#unlock:

    向MessageQueue所在broker发送一个UNLOCK_BATCH_MQ请求。

    如果oneway请求失败,有全局锁过期逻辑。

    consumer 收到消息

    PullCallback:

    当broker返回拉取到的消息后,consumer将消息缓存到ProcessQueue,并提交到ConsumeMessageService。

    ConsumeMessageConcurrentlyService#submitConsumeRequest:

    回顾并行消费,将消息按照批量消费大小(默认1),直接投递ConsumeRequest到消费线程池。

    而对于顺序消费,不能将消息直接分批丢到线程池里,否则会乱序。

    简单来想,可以将相同queue指定分配到同一个线程。

    比如ZooKeeper中,每个客户端session请求按照顺序执行。

    WorkerService#schedule通过hash算法,将sessionId分配到指定的一个单线程的ExecutorService执行。

    但是如果大部分session都分配到个别线程中,线程就无法充分利用。

    zk可以有很多客户端session,session越多,这种由于hash算法分配线程不均的情况就可以忽略。

    对于rocketmq来说,queue是有限的,假设一共分配给自己4个queue,消费线程数有2个。

    根据某种hash算法分配queue给thread。

    q0、q2分配给thread0,q1、q3分配给thread1。

    假设q0和q2都收到32条消息,q1和q3没有消息,那么thread1就没事干,对于q0或q2会产生消费延迟。

    这同时取决于queue数量、消费线程数量、hash算法。

    所以对于顺序消费,为了保证有序,且提高并行度,提交ConsumeRequest取决于ProcessQueue目前的情况。

    ProcessQueue#putMessage:

    在缓存消息到ProcessQueue时,会返回是否允许提交新的ConsumeRequest,这仅针对顺序消费有用。

    如果缓存消息(msgTreeMap)非空且当前没有线程在消费这个ProcessQueue(consuming=false),才允许提交新的ConsumeRequest。

    当然,针对消费逻辑,肯定也需要获取同一把锁,来设置consuming字段。(后面)

    ConsumeMessageOrderlyService#submitConsumeRequest:

    顺序消费,根据写入缓存消息后的ProcessQueue情况,提交ConsumeRequest到消费线程池。

    对于批量消费的情况,也不在提交前分批,还是为了有序。

    注:顺序消费的consume线程池和普通消费一致,都是20线程+无界LinkedBlockingQueue。

    consumer consume

    顺序消费的consume实现与普通消费的consume实现完全不同。

    获取本地锁

    按照上面收到消息的逻辑来说,同一个ProcessQueue,同一时间内应当只有一个线程处理,实际上并非如此。

    否则上来就应该对ProcessQueue上个writeLock,然后把consuming状态改了。

    之所以会有一个本地锁,和一些重试、补偿逻辑相关。

    ConsumeMessageOrderlyService.ConsumeRequest#run:

    MessageQueueLock本地锁逻辑简单,针对每个MessageQueue有一个Object。

    全局锁失效

    ConsumeMessageOrderlyService.ConsumeRequest#run:

    对于集群消费,需要判断ProcessQueue是否正常拿到broker的全局锁。

    ConsumeMessageOrderlyService#tryLockLaterAndReconsume:

    如果未获取到全局锁或全局锁已超时,则延迟100ms重新获取全局锁。

    如果获取全局锁成功,延迟10ms,重新提交ConsumeRequest;

    如果获取全局锁失败,延迟3s,重新提交ConsumeRequest。

    所以并非同一时刻同一队列只有一个ConsumeRequest,这也是为什么要上本地锁。

    ProcessQueue#isLockExpired:锁过期时间为30s,所以一定有地方在做锁续期。

    获取缓存Message

    ConsumeMessageOrderlyService.ConsumeRequest#run:

    在实际从ProcessQueue中获取消息前,再次判断了一些逻辑:

  • queue被rebalance移除,直接退出;
  • queue的全局锁失效,和上面一样,延迟10ms获取全局锁并重新提交ConsumeRequest;
  • 当前ConsumeRequest执行超过60s,延迟10ms再次提交ConsumeRequest;(应该是为了避免队列数超过consume线程数,同一个队列持续占用consume线程,导致其他队列无法及时消费)
  • ProcessQueue#takeMessags:

    不同于普通消费(普通消息在ConsumeRequest中直接分配了消息),顺序消费在当前consume线程中才开始分批拉取缓存消息。

    顺序消费将正在处理的消息放到consumingMsgOrderlyTreeMap另一个TreeMap中并返回。

    需要注意的是,在这里如果结果集为空,代表ProcessQueue中已经没有更多消息了。

    将consuming设置为false,可以允许收消息线程提交ConsumeRequest。

    ConsumeMessageOrderlyService.ConsumeRequest#run:

    如果ProcessQueue没有更多缓存的消息,退出consume,收到新消息将提交新的ConsumeRequest。

    执行MessageListenerOrderly

    ConsumeMessageOrderlyService.ConsumeRequest#run:

    执行用户MessageListenerOrderly逻辑消费。

    有几个关注点。

    第一,顺序消费不存在消费超时逻辑。

    ConsumeMessageConcurrentlyService.ConsumeRequest#run:

    普通消费,会在调用用户代码前给msg注入消费开始时间,用于超时检测。

    第二,顺序消费上下文。

    ConsumeOrderlyContext属性:

  • messageQueue:队列;
  • autoCommit:是否自动提交,默认true;
  • suspendCurrentQueueTimeMillis:消费失败挂起毫秒数;
  • 手动提交,主动调用OffsetStore更新消费进度。

    需要context中两个属性配合,一个是autoCommit=false,另一个是messageQueue。

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer_test");
    consumer.setNamesrvAddr("localhost:9876");
    consumer.subscribe("TopicABC", "*");
    consumer.registerMessageListener(new MessageListenerOrderly() {
        @Override
        public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
            context.setAutoCommit(false);
            for (MessageExt msg : msgs) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
                consumer.getOffsetStore().updateOffset(context.getMessageQueue(), msg.getQueueOffset(), true);
            }
            return ConsumeOrderlyStatus.SUCCESS;
        }
    });
    consumer.start();
    

    第三,消费结果。

    ConsumeOrderlyStatus没有废弃的只有两个:

  • SUCCESS:消费成功;
  • SUSPEND_CURRENT_QUEUE_A_MOMENT:消费失败,挂起当前queue一段时间;
  • 当用户代码发生异常或返回null,都当做SUSPEND_CURRENT_QUEUE_A_MOMENT。

    处理消费结果

    ConsumeMessageOrderlyService#processConsumeResult:

    默认自动提交,支持手动提交。

    自动提交消费成功,更新OffsetStore内存消费进度offset。

    和普通消费一致,内存消费进度每隔5s定时同步给broker。

    消费成功

    手动提交什么都不做,自动提交是默认情况。

    ConsumeMessageOrderlyService#processConsumeResult:

    ProcessQueue提交返回offset,OffsetStore更新内存中的消费进度offset。

    ProcessQueue#commit:

    清空正在处理的消息TreeMap(consumingMsgOrderlyTreeMap),

    返回这批消息的最大offset+1作为提交offset。

    消费挂起(重试)

    ConsumeMessageOrderlyService#processConsumeResult:

    校验是否到达最大重试次数。

    如果超出重试次数,直接提交offset(如果非自动提交,这里忽略),继续处理后续消息;

    如果未超出重试次数,将消息重回内存ProcessQueue,延迟1s后重新消费,退出本次consume;

    ConsumeMessageOrderlyService#checkReconsumeTimes:

    如果超出最大消费次数,sendMessageBack给broker,如果发送broker失败,兜底走未超出最大消费次数逻辑;

    如果未超出最大消费次数,挂起consumer;

    ConsumeMessageOrderlyService#getMaxReconsumeTimes:

    默认最大消费次数是Integer.MAX_VALUE。

    所以顺序消费默认失败只能重回本地ProcessQueue,并不会sendMessageBack给broker。

    因为sendMessageBack给broker会破坏顺序语义。

    ProcessQueue#makeMessageToCosumeAgain:重回ProcessQueue。

    从处理中treeMap移除,重新放回msgTreeMap。

    consumer 锁续期

    ConsumeMessageOrderlyService#start:

    虽然顺序消费没有consume超时消息逻辑,但是每隔20s会对所有ProcessQueue重新获取全局锁。

    RebalanceImpl#lockAll:

    Step1,从processQueueTable中收集broker和MessageQueue的映射关系,循环每个broker。

    Step2,LOCK_BATCH_MQ向broker发送分配给自己的MessageQueue。

    Step3,对于成功获取锁的MessageQueue,设置locked=true,续期。

    Step4,对于未成功获取锁的MessageQueue,设置locked=false。

    这能够补偿当前consumer实例rebalance阶段对于某个queue由于oneway unlock失败的情况。

    总结

    广播消费

    广播消费与集群消费的区别:

  • 启动阶段,未自动订阅重试topic=%RETRY%+消费组;
  • 启动阶段,如果当前进程中同时存在广播消费和集群消费者实例,底层会存在2个MQClientInstance实例,比如心跳就会发两份;
  • 启动阶段,OffsetStore实现选择LocalFileOffsetStore,加载本地文件系统({user.home}/.rocketmq_offsets/{clientId}/{group}/offsets.json)中的offset到内存;
  • rebalance阶段,直接将nameserver返回的topic下所有MessageQueue分配给自己,不需要执行分配策略;
  • pull阶段,不会顺便提交offset到broker,因为广播消费进度由每个实例自己管理在本地文件系统;
  • consume阶段,广播消费不支持重试,仅仅会打印warn日志;
  • consume超时,很奇怪,会sendMessageBack给broker;
  • 顺序消费

    生产者侧

  • 用户使用层面,需要通过MessageQueueSelector指定发送MessageQueue;
  • 框架api层面,不支持重试(普通发送消息支持2次);
  • broker侧

    broker提供全局锁相关api给consumer使用,包括批量获取锁LOCK_BATCH_MQ和批量释放锁UNLOCK_BATCH_MQ。

    请求参数包含:group-消费组、clientId-消费者实例id、mqSet-目标MessageQueue集合。

    RebalanceLockManager维护了一个mqLockTable,维护group-queue-clientId(LockEntry)的映射关系。

    本质上获取全局锁就是将自己的group-queue-clientId放入这个table中。

    为了防止consumer非正常下线,每个锁有超时时间60s,如果客户端没有及时续期,组内其他consumer可以争抢这个queue。

    consumer侧

    为了保证queue顺序消费,consumer需要从两方面保证:

  • 进程间,rebalance需要向broker获取全局锁,只有获取全局锁成功才能消费;
  • 进程内,同一个queue的消息同时只能由一个线程顺序处理;
  • 顺序消费与并行消费的区别:

  • rebalance阶段,更新processQueueTable,对于新增的queue需要从broker获取全局锁,对于移除的queue需要从broker释放全局锁,否则不能真正新增或移除ProcessQueue,调用broker失败的情况由rebalance和锁续期来补偿;
  • 收到消息阶段,消息不能直接分批丢到Consume线程池中,通过ProcessQueue的状态来确保Consume线程池中每个queue只有一个在运行的ConsumeRequest;
  • consume阶段,逻辑与并行消费完全不同:
  • 先获取queue对应本地锁;
  • 保证全局锁正常的情况下,执行用户MessageListenerOrderly逻辑;
  • 处理消费结果
  • consume阶段,消费成功,支持手动提交和自动提交,默认自动提交,OffsetStore更新内存offset;
  • consume阶段,消费失败,挂起重试,默认重试次数无上限,将所有消息重回ProcessQueue内存,延迟1s后再次走consume;可选择设置重试次数上限,支持将消息sendBack给broker,不挂起直接消费后续消息,但是破坏顺序语义;
  • 锁续期,顺序消费没有消费超时逻辑,但是有锁续期逻辑,每隔20s会对所有ProcessQueue重新获取全局锁;
  • 欢迎大家评论或私信讨论问题。

    本文原创,未经许可不得转载。

    欢迎关注公众号【程序猿阿越】。

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论