RocketMQ producer获取Namesrv地址

2023年 7月 17日 52.6k 0

抛出几个问题。
第一个是producer是怎么在开始的时候获取namesrv的地址;
第二个是producer获取namesrv的方式有几种;
第三个是produce是怎么刷新namesrv地址的;

producer获取namesrv地址

在开始时注入namesrv

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();

如代码所示,在生成producer的时候可以注入namesrv的地址,然后会在producer.start()里面去配置namesrv地址,具体位置:

// org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)`
// (获取mQClientFactory)
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

// 往里走到真正存储namesrv的方法
// org.apache.rocketmq.client.impl.factory.MQClientInstance#MQClientInstance(org.apache.rocketmq.client.ClientConfig, int, java.lang.String, org.apache.rocketmq.remoting.RPCHook)
if (this.clientConfig.getNamesrvAddr() != null) {
    this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
    log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
}

// 解析namesrv的方法
// org.apache.rocketmq.client.impl.MQClientAPIImpl#updateNameServerAddressList
public void updateNameServerAddressList(final String addrs) {
    String[] addrArray = addrs.split(";");
    List list = Arrays.asList(addrArray);
    this.remotingClient.updateNameServerAddressList(list);
}

由上面代码可以看出,namesrv在传的时候可以传多个,只要用 ; 分割就行,但是这种方法我们一般仅仅在自己调试的时候使用,因为这个办法比较笨,当线上的namesrv出问题或者需要扩容缩容的时候都没法解决。

动态获取namesrv

为了解决namesrv动态增删的问题,rocketMQ给出了一种方式,跟着源码看一下:

// org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)`
// 212行
mQClientFactory.start();

// org.apache.rocketmq.client.impl.factory.MQClientInstance#start
// 232行
// 在启动的时候会检查是否已经配置了namesrv地址,如果没有的话,就去远端抓取
if (null == this.clientConfig.getNamesrvAddr()) {
    this.mQClientAPIImpl.fetchNameServerAddr();
}


// 我们直接进到远端抓取namesrv地址最里面的方法
// org.apache.rocketmq.common.namesrv.TopAddressing#fetchNSAddr(boolean, long)
    public final String fetchNSAddr(boolean verbose, long timeoutMills) {
        String url = this.wsAddr;
        try {
            if (!UtilAll.isBlank(this.unitName)) {
                url = url + "-" + this.unitName + "?nofix=1";
            }
            // 请求远端地址,获取namesrv地址
            HttpTinyClient.HttpResult result = HttpTinyClient.httpGet(url, null, null, "UTF-8", timeoutMills);
            if (200 == result.code) {
                String responseStr = result.content;
                if (responseStr != null) {
                    // 解析远端返回的地址
                    return clearNewLine(responseStr);
                } else {
                    log.error("fetch nameserver address is null");
                }
            } else {
                log.error("fetch nameserver address failed. statusCode=" + result.code);
            }
        } catch (IOException e) {
            if (verbose) {
                log.error("fetch name server address exception", e);
            }
        }

我们看到远端的namesrv地址就是在这个上面去拉取,但是url却不知道是怎么来的,看到第一行代码String url = this.wsAddr;,那么继续跟踪wsAddr这个变量,发现是在这赋值的:

// org.apache.rocketmq.common.MixAll#getWSAddr
    public static String getWSAddr() {
        String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
        String wsDomainSubgroup = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
        String wsAddr = "http://" + wsDomainName + ":8080/rocketmq/" + wsDomainSubgroup;
        if (wsDomainName.indexOf(":") > 0) {
            wsAddr = "http://" + wsDomainName + "/rocketmq/" + wsDomainSubgroup;
        }
        return wsAddr;
    }
// DEFAULT_NAMESRV_ADDR_LOOKUP = jmenv.tbsite.net

所以看出url的默认地址就是:jmenv.tbsite.net:8080/rocketmq/ns…
我们可以在本地演示一下:

System.setProperty("rocketmq.namesrv.domain", "127.0.0.1:8081");

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();

然后另启动一个服务,写一个获取地址的接口:

@RestController
@RequestMapping(value = "/rocketmq")
@Slf4j
public class RocketMQConfigController {

    @GetMapping(value = "/nsaddr")
    public String queryUserInfo(@RequestParam(required = false) String nofix) {
        String address = "127.0.0.1:9876";
        log.info("获取rocketmq nameserver address:{}, nofix:{}", address, nofix);

        return address;
    }
}

这样就可以把namesrv地址的获取过程变成动态的了,如下图:

RocketMQ-namesrv远端地址.png

刷新namesrv地址

刷新namesrv地址其实本身是启动了一个线程池去定时拉取远端地址,启动线程池的地方和第一次拉取远端地址的代码在一格方法内:

// org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)`
// 212行
mQClientFactory.start();

// org.apache.rocketmq.client.impl.factory.MQClientInstance#start
// 238行
// Start various schedule tasks
this.startScheduledTask();

// org.apache.rocketmq.client.impl.factory.MQClientInstance#startScheduledTask
    private void startScheduledTask() {
        if (null == this.clientConfig.getNamesrvAddr()) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                @Override
                public void run() {
                    try {
                        MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                    } catch (Exception e) {
                        log.error("ScheduledTask fetchNameServerAddr exception", e);
                    }
                }
            }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
        }
        // 后续的线程池启动代码忽略...
    }

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论