前言
本章基于rocketmq4.6.0分析nameserver的实现。
nameserver的基础概念不再赘述,可以参考官网。
本章将分析以下内容:
- 借nameserver,分析rocketmq-remoting通讯层通用模块
- nameserver的kv配置管理
- nameserver的路由信息管理
后续分析不会再看nameserver侧,有个概念即可。
一、概览
nameserver侧比较简单,一共不超过10个类。
NamesrvStartup:启动类,读取配置,构造NamesrvController启动;
NamesrvController:管理nameserver所有组件,比如:通讯server、kv配置管理、路由信息管理等;
KVConfigManager:kv配置管理,kv配置的增删改查;
RouteInfoManager:路由信息管理,路由信息的增删改查;
BrokerHousekeepingService:当通讯层长连接发生变更,调用RouteInfoManager处理;
DefaultRequestProcessor:业务请求处理器,处理所有客户端的请求,包括broker和client;
二、Server端通讯层
所有组件的通讯层都会使用rocketmq-remoting模块,无论客户端还是服务端,无论是producer还是consumer还是broker还是nameserver。
这里简单分析一下服务端的实现,即NettyRemotingServer。
1、协议
绕过底层bytebuffer的读写,业务上都是RemotingCommand模型,无论请求报文还是响应报文。
- code:业务报文编码,对于请求来说,区分请求类型,对于响应来说,0代表成功,非0代表异常
- language:语言类型
- version:rocketmq的版本号
- opaque:请求id,由请求方生成,内存自增
- flag:标志位,目前只有两个二进制位被使用,一位用于区分是请求还是响应报文,一位用于区分是否是oneway请求
- remark:提示信息
- extFields:自定义头部
- serializeTypeCurrentRPC:序列化方式,默认json
- body:请求体
2、线程模型
Netty
NettyRemotingServer#start:
Netty线程分为三组
- eventLoopGroupBoss:接收连接,1个线程;
- eventLoopGroupSelector:io线程组,3个线程,可配置serverSelectorThreads;
- defaultEventExecutorGroup:worker线程组,处理编解码等业务,8个线程,可配置serverWorkerThreads;
业务线程
无论客户端还是服务端,当netty的worker线程处理完编解码后,区分请求还是响应,走不同逻辑。
NettyRemotingAbstract#processMessageReceived:
NettyRemotingAbstract#processRequestCommand:
对于请求来说,根据请求业务编码,找对应NettyRequestProcessor处理器,及其业务线程池处理。
NettyRemotingAbstract.processorTable:
NettyRemotingAbstract#defaultRequestProcessor:
对于没有特殊配置的业务请求,使用默认的Processor处理。
NamesrvController#registerProcessor:nameserver就只有一个Processor。
nameserver的业务线程池大小默认也是8,通过serverWorkerThreads配置。
NettyRemotingAbstract#processResponseCommand:
对于响应来说,根据请求id找到future,
rpc都差不多,发送请求的时候把id和future全局存一下,收到响应根据id找future
如果future是注册了callback回调方法的,使用future的callback线程池执行回调,
否则netty的worker线程,将response写入future,唤醒阻塞等待响应的线程。
超时请求检测
NettyRemotingServer#start:这里是服务端,但是客户端也类似,开启一个线程扫描超时请求。
NettyRemotingAbstract#responseTable:当前已经发出,但未收到相应的请求。
三、kv配置
1、KVConfigManager
内存中维护一份Map,存储Namespace-Key-Value。
KVConfigManager#putKVConfig:写kv配置,先拿写锁写内存,后拿读锁写磁盘。
KVConfigManager#getKVConfig:拿读锁读配置。
默认kv配置会存储在user.home下的namesrv/kvConfig.json中。
{"configTable":{"namespace":{"key":"value"}}}
2、用途
目前只有一个地方在用这个kv配置,针对生产者,场景是顺序消息。
假设有两个broker,每个broker有8个queue。
对于生产者发现路由来说,当一个broker下线后,原来是16个队列hash,现在变成8个队列hash,那么有可能乱序。
通过在nameserver侧写死路由表,可以保证不发生上面的情况,问题在于消息投递到下线的broker会报错。
比如updateTopic命令,设置TopicA为顺序topic。
mqadmin updateTopic -n localhost:9876 -b 169.254.246.163:10911 -t topicA -r 8 -w 8 -o true
则会在namespace=ORDER_TOPIC_CONFIG,新增kv配置,key=topic名,value=broker名1:队列数量1;broker名2:队列数量2。见UpdateTopicSubCommand#execute。
MQClientInstance#topicRouteData2TopicPublishInfo:
生产者获取路由表时,如果topic在nameserver被配置为顺序消息topic,将采用静态的路由表给生产者。
但是,默认情况下nameserver并没有开启这个功能。
四、路由信息
1、RouteInfoManager
RouteInfoManager基于内存存储路由信息,一共5个table。
2、broker注册
DefaultRequestProcessor#registerBrokerWithFilterServer:
broker注册请求包含broker配置信息和topic配置信息;
broker响应请求包含master地址和所有topic的静态路由信息(kv配置);
写入5个table
RouteInfoManager#registerBroker:先获取写锁,处理内存5个table
step1,clusterAddrTable,注册cluster到brokerName的映射关系。
step2,brokerAddrTable,注册brokerName到BrokerData的关系。
BrokerData包含同名broker的拓扑关系。
step3,topicQueueTable,注册topic到QueueData(队列)的映射关系,其中QueueData包含brokerName。
Step4,brokerLiveTable,注册broker地址到存活broker信息的映射关系。
Step5,filterServerTable,注册broker地址到filterServer的映射关系。sql92过滤相关。
Step6,如果注册broker非master,从brokerAddrTable获取masterBroker地址,从brokerLiveTable获取masterBroker的ha地址,返回broker。
触发条件
broker注册分为两大类。
一个类是broker启动或故障恢复之后注册。
举个例子,BrokerController#start:
启动后注册,定时注册,默认30s一次。
另一类是topic变更触发broker注册。
举个例子,TopicConfigManager#createTopicInSendMessageMethod:
broker自动创建topic,触发broker注册,实际是为了刷新topic配置。
3、broker注销
主动注销
broker正常关闭,主动调用nameserver注销。
DefaultRequestProcessor#unregisterBroker:
RouteInfoManager#unregisterBroker:
注销broker也是先拿写锁,然后操作5个table。
需要注意的是,除了两个纯broker纬度的table,其他table的处理是有条件的。
- brokerAddrTable:当同名broker全注销后,才移除BrokerData;
- clusterAddrTable:当同名broker全注销后,才移除cluster对应broker关系,如果cluster下broker为空,移除cluster;
- topicQueueTable:当同名broker全注销后,才移除brokerName对应的所有QueueData;
通讯层注销
BrokerHousekeepingService:
当通讯层连接关闭、异常、空闲,都会被动触发broker注销。
RouteInfoManager#onChannelDestroy:
对于RouteInfoManager来说,和主动注销的区别在于,
需要先获取读锁,通过通讯层的netty channel,找到存活brokerTable中的broker信息,执行上述5个table的remove操作。
定时心跳检测注销
NamesrvController#initialize:
nameserver后台每隔10秒会扫描非活跃broker执行注销。
RouteInfoManager#scanNotActiveBroker:
如果broker2分钟内未发送注册请求,同样会执行注销。
4、查询路由
最常用的方法是根据topic查询路由TopicRouteData,无论生产消费都需要。
DefaultRequestProcessor#getRouteInfoByTopic:
- 根据topic查询TopicRouteData;
- 如果nameserver开启orderMessage,默认false,查询topic静态路由配置;
TopicRouteData:包含topic下所有queue,以及broker信息。
RouteInfoManager#pickupTopicRouteData:从几个table中根据topic查路由。
- 根据topic找QueueData;
- 根据QueueData上的brokerName找BrokerData;
- 根据BrokerData上的brokerAddr找filter server;
结合broker注销,只要有存活的同名broker,都能拿到QueueData。
总结
RocketMQ通讯协议是个私有协议,主要包含几部分:
- code:业务报文编码,对于请求来说,区分请求类型,对于响应来说,0代表成功,非0代表异常
- language:语言类型
- version:rocketmq的版本号
- opaque:请求id,由请求方生成,内存自增
- flag:标志位,一位用于区分是请求还是响应报文,一位用于区分是否是oneway请求
- remark:提示信息
- extFields:自定义头部
- serializeTypeCurrentRPC:序列化方式,默认json
- body:请求体
Server端线程模型如下:
Client端线程模型如下(本文没提,见NettyRemotingClient):
nameserver提供动态kv配置能力,目前只有一个用途,针对顺序消费场景,生产者可以走nameserver提供的静态路由配置。但是这个能力默认nameserver是禁用的,orderMessageEnable=false。
broker注册和心跳,在nameserver侧维护了5张table。
对于producer和consumer来说,最重要的是其中两张table提供的路由数据,topicQueueTable和brokerAddrTable。
BrokerLiveInfo主要存储了broker的通讯层channel、心跳时间、ha地址等信息。
有三种方式导致broker从nameserver注销:broker主动注销、通讯层注销、nameserver定时检测心跳。
无论是通讯层空闲还是心跳检测,超时时间都是120秒。
BrokerLiveInfo移除后,只有所有同名broker下线,才会导致topicQueueTable和brokerAddrTable移除路由数据。
欢迎大家评论或私信讨论问题。
本文原创,未经许可不得转载。
欢迎关注公众号【程序猿阿越】。