RocketMQ broker停写功能源码分析

2023年 7月 26日 63.5k 0

这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党

背景

在我们要平滑升级broker的时候,无损升级的最佳实践应该是

  • 新broker启动
  • 旧broker停写
  • 旧broker消息消费完成后下线(包括延时消息)
  • 所以我们本次就是来分析如何完成broker的停写

    源码入口

    其实通过查看源码,我们发现有两种方式可以停写broker

  • 通过mqadmin运维工具命令行的方式
  • 通过MQAdminExt管理工具类
  • 两种方式没有区别,都是调用DefaultMQAdminExtwipeWritePermOfBroker方法

    源码分析

    client

    我们这里直接进去到最底层的实现类代码

    public int wipeWritePermOfBroker(final String namesrvAddr, String brokerName,
            final long timeoutMillis) throws RemotingCommandException,
            RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
            WipeWritePermOfBrokerRequestHeader requestHeader = new WipeWritePermOfBrokerRequestHeader();
            requestHeader.setBrokerName(brokerName);
    
            RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.WIPE_WRITE_PERM_OF_BROKER, requestHeader);
            RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMillis);
            assert response != null;
            switch (response.getCode()) {
                case ResponseCode.SUCCESS: {
                    WipeWritePermOfBrokerResponseHeader responseHeader =
                        (WipeWritePermOfBrokerResponseHeader) response.decodeCommandCustomHeader(WipeWritePermOfBrokerResponseHeader.class);
                    return responseHeader.getWipeTopicCount();
                }
                default:
                    break;
            }
    
            throw new MQClientException(response.getCode(), response.getRemark());
        }
    
    

    客户端的代码我们没什么好看的,我们还是通过请求状态码RequestCode.WIPE_WRITE_PERM_OF_BROKER查看实际的Nameserver的处理逻辑

    NameServer

    • DefaultRequestProcessor的方法 wipeWritePermOfBroker

    可以看到实际的代码逻辑在这一行

    int wipeTopicCnt = this.namesrvController.getRouteInfoManager().wipeWritePermOfBrokerByLock(requestHeader.getBrokerName());
    

    我们进入到这个方法看看

    可以看到实际的逻辑还在更下层,我们再进去看看

    private int operateWritePermOfBroker(final String brokerName, final int requestCode) {
            int topicCnt = 0;
    
            for (Entry entry : this.topicQueueTable.entrySet()) {
                Map qdMap = entry.getValue();
    
                final QueueData qd = qdMap.get(brokerName);
                if (qd == null) {
                    continue;
                }
                int perm = qd.getPerm();
                switch (requestCode) {
                    case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
                        perm &= ~PermName.PERM_WRITE;
                        break;
                    case RequestCode.ADD_WRITE_PERM_OF_BROKER:
                        perm = PermName.PERM_READ | PermName.PERM_WRITE;
                        break;
                }
                qd.setPerm(perm);
                topicCnt++;
            }
            return topicCnt;
        }
    

    这里的topicQueueTable我们可以看看他实际的数据结构

    可以看到这里他的perm7

    如果我们去broker的 config的topics.json查看相关的配置信息

    可以看到刚好可以对得上

    这里我们解释一下perm的权限问题

    • PERM_PRIORITY = 0x1

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论