RocketMQ重置消费位点源码分析

2023年 7月 12日 57.6k 0

这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党

背景

最近在使用RocketMQ的重置消费位点的时候经常出现报错,所以就打算研究下RocketMQ是如何重置消费者的消费位点的

RocketMQ版本

  • 5.1.0
  • Dashboard: 最新github master分支代码

源码分析

dashboard

功能入口

该功能主要是在RocketMQdashboard页面

我们直接使用跳过堆积按钮,看调用的哪个接口

通过接口请求我们可以看到调用的是skipAccumulate.do接口

然后我们简单看看参数

{
    "resetTime": -1,
    "consumerGroupList": [
        "gid-xiao-zou-topic"
    ],
    "topic": "xiao-zou-topic",
    "force": true
}

可以看到传入了一个gid,一个topic,还有一个force为true

force这个参数我们后面进行源码分析再说

我们直接全局搜索找到这个接口

可以看到是传统的MVC架构,controller-service-serviceImpl
我们这里直接去看看他的实现类

  • ConsumerServiceImpl

可以看到核心方法是调用org.apache.rocketmq.tools.admin.MQAdminExtresetOffsetByTimestamp方法

Map resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce)
    throws RemotingException, MQBrokerException, InterruptedException, MQClientException;

这里可以看到这几个参数我们就是我们之前接口请求的json都有的。
topicgroup我们都知道,时间戳传入的有点特殊,这里传入的是一个-1isForce传入的是true
isForce就是表示强制是否强制重置消费进度
force参数为true时,无论消费者当前的消费进度是否比指定的时间戳早,都会将消费进度重置为指定时间戳对应的消息。

force参数为false时,只有当消费者当前的消费进度比指定的时间戳早时,才会将消费进度重置为指定时间戳对应的消息。

RocketMQ

这里我们找到了入口我们直接回到RocketMQ源码我们去看看
org.apache.rocketmq.tools.admin.MQAdminExtresetOffsetByTimestamp方法

public Map resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce,
    boolean isC) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
    TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
    List brokerDatas = topicRouteData.getBrokerDatas();
    Map allOffsetTable = new HashMap();
    if (brokerDatas != null) {
        for (BrokerData brokerData : brokerDatas) {
            String addr = brokerData.selectBrokerAddr();
            if (addr != null) {
                Map offsetTable = this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce, timeoutMillis, isC);
                if (offsetTable != null) {
                    allOffsetTable.putAll(offsetTable);
                }
            }
        }
    }
    return allOffsetTable;
}

这里代码我们分析下

  • 通过Nameserve获取topic的元数据topicRouteData
  • 通过topic的元数据topicRouteData获取到topic所在的broker信息List brokerDatas
  • 循环所有向所有broker发送重置消费位点请求
  • 步骤一和二不是我们的分析重点
    我们重点看看3的源代码

      public Map invokeBrokerToResetOffset(final String addr, final String topic, final String group,
        final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC)
        throws RemotingException, MQClientException, InterruptedException {
        ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
        requestHeader.setTopic(topic);
        requestHeader.setGroup(group);
        requestHeader.setTimestamp(timestamp);
        requestHeader.setForce(isForce);
        // offset is -1 means offset is null
        requestHeader.setOffset(-1L);
    
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader);
        if (isC) {
            request.setLanguage(LanguageCode.CPP);
        }
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
            request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                if (response.getBody() != null) {
                    ResetOffsetBody body = ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class);
                    return body.getOffsetTable();
                }
            }
            default:
                break;
        }
    
        throw new MQClientException(response.getCode(), response.getRemark());
    }
    

    实际的逻辑肯定都封装在broker的,所以我们直接通过请求码INVOKE_BROKER_TO_RESET_OFFSET找到对应的broker的逻辑代码

    • AdminBrokerProcessor
    public RemotingCommand resetOffset(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final ResetOffsetRequestHeader requestHeader =
            (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
        LOGGER.info("[reset-offset] reset offset started by {}. topic={}, group={}, timestamp={}, isForce={}",
            RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(),
            requestHeader.getTimestamp(), requestHeader.isForce());
    
        if (this.brokerController.getBrokerConfig().isUseServerSideResetOffset()) {
            String topic = requestHeader.getTopic();
            String group = requestHeader.getGroup();
            int queueId = requestHeader.getQueueId();
            long timestamp = requestHeader.getTimestamp();
            Long offset = requestHeader.getOffset();
            return resetOffsetInner(topic, group, queueId, timestamp, offset);
        }
    
        boolean isC = false;
        LanguageCode language = request.getLanguage();
        switch (language) {
            case CPP:
                isC = true;
                break;
        }
        return this.brokerController.getBroker2Client().resetOffset(requestHeader.getTopic(), requestHeader.getGroup(),
            requestHeader.getTimestamp(), requestHeader.isForce(), isC);
    }
    
  • 判断是否开启broker管理消费位点。5.0之前都是client管理的,为了兼容云原生,支持http的方式,后面都支持broker管理消费位点
  • 如果不由broker管理消费位点则调用this.brokerController.getBroker2Client().resetOffset
  • resetOffset这个方法的实现有点长,我们慢慢看

    public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce,
        boolean isC) {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        // 获取topic的元数据信息
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
        // 如果topic为空则报错
        if (null == topicConfig) {
            log.error("[reset-offset] reset offset failed, no topic in this broker. topic={}", topic);
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("[reset-offset] reset offset failed, no topic in this broker. topic=" + topic);
            return response;
        }
    
        Map offsetTable = new HashMap();
        // 循环
        for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
            MessageQueue mq = new MessageQueue();
            mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
            mq.setTopic(topic);
            mq.setQueueId(i);
            // 查询当前消费偏移量
            long consumerOffset =
                this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i);
            // 如果当前偏移量为-1表示消费进度不存在
            if (-1 == consumerOffset) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark(String.format("THe consumer group  not exist", group));
                return response;
            }
    
            long timeStampOffset;
            // 如果传入的参数时间戳为-1 则表示要跳过堆积,直接更新消费位点为最新的,否则获取指定时间的消费位点
            if (timeStamp == -1) {
    
                timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
            } else {
                timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
            }
            
            // 如果查询到的偏移量不正常将要重置的消费位点设置为0
            if (timeStampOffset < 0) {
                log.warn("reset offset is invalid. topic={}, queueId={}, timeStampOffset={}", topic, i, timeStampOffset);
                timeStampOffset = 0;
            }
            // 如果开启强制重置消费位点则直接重置
            // 否则需要消费位点小于当前消费的消费位点,避免丢失消息
            if (isForce || timeStampOffset = MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
                    try {
                        this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
                        log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}",
                            topic, group, entry.getValue().getClientId());
                    } catch (Exception e) {
                        log.error("[reset-offset] reset offset exception. topic={}, group={} ,error={}",
                            topic, group, e.toString());
                    }
                } else {
                    response.setCode(ResponseCode.SYSTEM_ERROR);
                    response.setRemark("the client does not support this feature. version="
                        + MQVersion.getVersionDesc(version));
                    log.warn("[reset-offset] the client does not support this feature. channel={}, version={}",
                        RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
                    return response;
                }
            }
        } else {
            String errorInfo =
                String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d",
                    requestHeader.getGroup(),
                    requestHeader.getTopic(),
                    requestHeader.getTimestamp());
            log.error(errorInfo);
            response.setCode(ResponseCode.CONSUMER_NOT_ONLINE);
            response.setRemark(errorInfo);
            return response;
        }
        response.setCode(ResponseCode.SUCCESS);
        ResetOffsetBody resBody = new ResetOffsetBody();
        resBody.setOffsetTable(offsetTable);
        response.setBody(resBody.encode());
        return response;
    }
    

    可以看到是获取服务端的消费位点,然后设置消费位点,然后通过发送请求通知所有client,通知他们修改他们本地的消费位点.所以这里重置消费位点失败有几种情况

  • topic不存在
  • 消费者不存在
  • 没有连接的消费者
  • 这里需要注意即使消费者有消息堆积,消费者没有连接到broker,也是会重置消费位点失败的。
    这里重置消费位点实际还是还是通知所有client用心的消费位点去broker拉去消息。不是去修改broker的消费位点

    我们来看看最终通知client的处理逻辑
    网络请求码是220

    public static final int RESET_CONSUMER_CLIENT_OFFSET = 220;
    

    • ClientRemotingProcessor
    this.mqClientFactory.resetOffset(requestHeader.getTopic(), requestHeader.getGroup(), offsetTable);
    
    public synchronized void resetOffset(String topic, String group, Map offsetTable) {
        DefaultMQPushConsumerImpl consumer = null;
        try {
            MQConsumerInner impl = this.consumerTable.get(group);
            if (impl instanceof DefaultMQPushConsumerImpl) {
                consumer = (DefaultMQPushConsumerImpl) impl;
            } else {
                log.info("[reset-offset] consumer dose not exist. group={}", group);
                return;
            }
            consumer.suspend();
    
            ConcurrentMap processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();
            for (Map.Entry entry : processQueueTable.entrySet()) {
                MessageQueue mq = entry.getKey();
                if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) {
                    ProcessQueue pq = entry.getValue();
                    pq.setDropped(true);
                    pq.clear();
                }
            }
    
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException ignored) {
            }
    
            Iterator iterator = processQueueTable.keySet().iterator();
            while (iterator.hasNext()) {
                MessageQueue mq = iterator.next();
                Long offset = offsetTable.get(mq);
                if (topic.equals(mq.getTopic()) && offset != null) {
                    try {
                        consumer.updateConsumeOffset(mq, offset);
                        consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq));
                        iterator.remove();
                    } catch (Exception e) {
                        log.warn("reset offset failed. group={}, {}", group, mq, e);
                    }
                }
            }
        } finally {
            if (consumer != null) {
                consumer.resume();
            }
        }
    }
    

    总的处理流程如下:

  • 获取到需要重置消费位点的消费者,然后暂停消费

  • 获取消费者的消息处理队列表processQueueTable,遍历 processQueueTable 中的条目,对于满足条件 topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)的消息队列 mq,执行以下操作
    a. 将对应的消息处理队列 pq 设置为已丢弃状态,即 pq.setDropped(true)
    b. 清空消息处理队列 pq,即 pq.clear()。

  • 线程休眠10s

  • 遍历processQueueTable,对于满足条件topic.equals(mq.getTopic()) && offset != null的消息队列 mq,执行以下操作:
    a. 从 offsetTable 中获取对应消息队列 mq 的消费位点 offset
    b. 尝试更新消费者的消费位点为offset,即 consumer.updateConsumeOffset(mq, offset)
    c. 从消费者的消息队列重新平衡实现中移除不必要的消息队列,即 consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq))
    d. 使用迭代器的 remove()方法从processQueueTable中移除当前的消息队列mq

  • 恢复消费

  • 总结

    总的来说RocketMQ的消费位点如果是客户端管理,重置消费位点是由客户端发起,发送到broker,最终还是由broker去通知所有broker去更新本地消费位点

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论