这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党
背景
在我们要平滑升级broker的时候,无损升级的最佳实践应该是
所以我们本次就是来分析如何完成broker
的停写
源码入口
其实通过查看源码,我们发现有两种方式可以停写broker
两种方式没有区别,都是调用DefaultMQAdminExt
的wipeWritePermOfBroker
方法
源码分析
client
我们这里直接进去到最底层的实现类代码
public int wipeWritePermOfBroker(final String namesrvAddr, String brokerName,
final long timeoutMillis) throws RemotingCommandException,
RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
WipeWritePermOfBrokerRequestHeader requestHeader = new WipeWritePermOfBrokerRequestHeader();
requestHeader.setBrokerName(brokerName);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.WIPE_WRITE_PERM_OF_BROKER, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
WipeWritePermOfBrokerResponseHeader responseHeader =
(WipeWritePermOfBrokerResponseHeader) response.decodeCommandCustomHeader(WipeWritePermOfBrokerResponseHeader.class);
return responseHeader.getWipeTopicCount();
}
default:
break;
}
throw new MQClientException(response.getCode(), response.getRemark());
}
客户端的代码我们没什么好看的,我们还是通过请求状态码RequestCode.WIPE_WRITE_PERM_OF_BROKER
查看实际的Nameserver
的处理逻辑
NameServer
DefaultRequestProcessor
的方法wipeWritePermOfBroker
可以看到实际的代码逻辑在这一行
int wipeTopicCnt = this.namesrvController.getRouteInfoManager().wipeWritePermOfBrokerByLock(requestHeader.getBrokerName());
我们进入到这个方法看看
可以看到实际的逻辑还在更下层,我们再进去看看
private int operateWritePermOfBroker(final String brokerName, final int requestCode) {
int topicCnt = 0;
for (Entry entry : this.topicQueueTable.entrySet()) {
Map qdMap = entry.getValue();
final QueueData qd = qdMap.get(brokerName);
if (qd == null) {
continue;
}
int perm = qd.getPerm();
switch (requestCode) {
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
perm &= ~PermName.PERM_WRITE;
break;
case RequestCode.ADD_WRITE_PERM_OF_BROKER:
perm = PermName.PERM_READ | PermName.PERM_WRITE;
break;
}
qd.setPerm(perm);
topicCnt++;
}
return topicCnt;
}
这里的topicQueueTable
我们可以看看他实际的数据结构
可以看到这里他的perm
是7
如果我们去broker
的 config的topics.json
查看相关的配置信息
可以看到刚好可以对得上
这里我们解释一下perm
的权限问题
PERM_PRIORITY = 0x1