抛出几个问题。
第一个是producer是怎么在开始的时候获取namesrv的地址;
第二个是producer获取namesrv的方式有几种;
第三个是produce是怎么刷新namesrv地址的;
producer获取namesrv地址
在开始时注入namesrv
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
如代码所示,在生成producer的时候可以注入namesrv的地址,然后会在producer.start()
里面去配置namesrv地址,具体位置:
// org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)`
// (获取mQClientFactory)
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 往里走到真正存储namesrv的方法
// org.apache.rocketmq.client.impl.factory.MQClientInstance#MQClientInstance(org.apache.rocketmq.client.ClientConfig, int, java.lang.String, org.apache.rocketmq.remoting.RPCHook)
if (this.clientConfig.getNamesrvAddr() != null) {
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
}
// 解析namesrv的方法
// org.apache.rocketmq.client.impl.MQClientAPIImpl#updateNameServerAddressList
public void updateNameServerAddressList(final String addrs) {
String[] addrArray = addrs.split(";");
List list = Arrays.asList(addrArray);
this.remotingClient.updateNameServerAddressList(list);
}
由上面代码可以看出,namesrv在传的时候可以传多个,只要用 ; 分割就行,但是这种方法我们一般仅仅在自己调试的时候使用,因为这个办法比较笨,当线上的namesrv出问题或者需要扩容缩容的时候都没法解决。
动态获取namesrv
为了解决namesrv动态增删的问题,rocketMQ给出了一种方式,跟着源码看一下:
// org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)`
// 212行
mQClientFactory.start();
// org.apache.rocketmq.client.impl.factory.MQClientInstance#start
// 232行
// 在启动的时候会检查是否已经配置了namesrv地址,如果没有的话,就去远端抓取
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// 我们直接进到远端抓取namesrv地址最里面的方法
// org.apache.rocketmq.common.namesrv.TopAddressing#fetchNSAddr(boolean, long)
public final String fetchNSAddr(boolean verbose, long timeoutMills) {
String url = this.wsAddr;
try {
if (!UtilAll.isBlank(this.unitName)) {
url = url + "-" + this.unitName + "?nofix=1";
}
// 请求远端地址,获取namesrv地址
HttpTinyClient.HttpResult result = HttpTinyClient.httpGet(url, null, null, "UTF-8", timeoutMills);
if (200 == result.code) {
String responseStr = result.content;
if (responseStr != null) {
// 解析远端返回的地址
return clearNewLine(responseStr);
} else {
log.error("fetch nameserver address is null");
}
} else {
log.error("fetch nameserver address failed. statusCode=" + result.code);
}
} catch (IOException e) {
if (verbose) {
log.error("fetch name server address exception", e);
}
}
我们看到远端的namesrv地址就是在这个上面去拉取,但是url却不知道是怎么来的,看到第一行代码String url = this.wsAddr;
,那么继续跟踪wsAddr
这个变量,发现是在这赋值的:
// org.apache.rocketmq.common.MixAll#getWSAddr
public static String getWSAddr() {
String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
String wsDomainSubgroup = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
String wsAddr = "http://" + wsDomainName + ":8080/rocketmq/" + wsDomainSubgroup;
if (wsDomainName.indexOf(":") > 0) {
wsAddr = "http://" + wsDomainName + "/rocketmq/" + wsDomainSubgroup;
}
return wsAddr;
}
// DEFAULT_NAMESRV_ADDR_LOOKUP = jmenv.tbsite.net
所以看出url的默认地址就是:jmenv.tbsite.net:8080/rocketmq/ns…
我们可以在本地演示一下:
System.setProperty("rocketmq.namesrv.domain", "127.0.0.1:8081");
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
然后另启动一个服务,写一个获取地址的接口:
@RestController
@RequestMapping(value = "/rocketmq")
@Slf4j
public class RocketMQConfigController {
@GetMapping(value = "/nsaddr")
public String queryUserInfo(@RequestParam(required = false) String nofix) {
String address = "127.0.0.1:9876";
log.info("获取rocketmq nameserver address:{}, nofix:{}", address, nofix);
return address;
}
}
这样就可以把namesrv地址的获取过程变成动态的了,如下图:
刷新namesrv地址
刷新namesrv地址其实本身是启动了一个线程池去定时拉取远端地址,启动线程池的地方和第一次拉取远端地址的代码在一格方法内:
// org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)`
// 212行
mQClientFactory.start();
// org.apache.rocketmq.client.impl.factory.MQClientInstance#start
// 238行
// Start various schedule tasks
this.startScheduledTask();
// org.apache.rocketmq.client.impl.factory.MQClientInstance#startScheduledTask
private void startScheduledTask() {
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
// 后续的线程池启动代码忽略...
}