一、Dubbo服务暴露流程简述
需要明确一点:Dubbo采用URL的方式来作为约定的参数类型。
protocol://username:password@host:port/path?key=value&key=value
- protocol: 指的是dubbo中的各种协议,比如dubbo, thrift, http
- username:password: 用户名:密码
- host:port: 主机:端口
- path: 接口全限定名
- parameters: 参数键值对
ServiceBean实现了ApplicationListener, 监听ContextRefreshedEvent事件,在Spring IOC容器刷新完成后调用onApplicationEvent方法,服务暴露的启动点,根据配置得到URL,再利用Dubbo SPI机制根据URL的参数选择对应的实现类,实现扩展。
通过javassist动态封装服务实现类,统一暴露出Invoker使得调用方便,屏蔽底层实现细节,然后封装成exporter存储起来,等待消费者的调用,并且会将URL注册到注册中心,使得消费者可以获取服务提供者的信息。
一个服务如果有多个协议,那么就都需要暴露。比如同时支持dubbo协议和hessian协议,那么需要将这个服务用两种协议分别向多个注册中心(如果有多个的话)暴露服务。
二、Dubbo服务暴露详细流程
如下图所示,为Dubbo服务暴露的完整流程。
public class DubboNamespaceHandler extends NamespaceHandlerSupport {
public DubboNamespaceHandler() {
}
public void init() {
this.registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
this.registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
this.registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
this.registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
this.registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
this.registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
this.registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
this.registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
this.registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
this.registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
}
static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}
}
//解析dubbo配置文件,生成BeanDefinition
public class DubboBeanDefinitionParser implements BeanDefinitionParser {
public BeanDefinition parse(Element element, ParserContext parserContext) {
return parse(element, parserContext, this.beanClass, this.required);
}
}
InitializingBean
,ApplicationListener
这两个接口,所以在ServiceBean实例化过程中,会调用afterPropertiesSet()方法,在Spring IOC容器完成刷新之后,会调用onApplicationEvent(ContextRefreshedEvent event)方法,这里有服务暴露的启动点。public class ServiceBean extends ServiceConfig implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware {
//在IOC容器完成刷新事件后调用
public void onApplicationEvent(ContextRefreshedEvent event) {
if (this.isDelay() && !this.isExported() && !this.isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + this.getInterface());
}
//服务暴露
this.export();
}
}
//在ServiceBean实例化过程中调用
public void afterPropertiesSet() throws Exception {
//设置dubbo配置文件中的类间的关系??
}
}
public class ServiceConfig extends AbstractServiceConfig {
public synchronized void export() {
if (this.export == null || this.export) {
if (this.delay != null && this.delay > 0) {
...
} else {
//服务暴露
this.doExport();
}
}
}
protected synchronized void doExport() {
this.doExportUrls();
}
private void doExportUrls() {
//加载注册中心url
List registryURLs = this.loadRegistries(true);
//可以配置多个通信协议
Iterator i$ = this.protocols.iterator();
while(i$.hasNext()) {
ProtocolConfig protocolConfig = (ProtocolConfig)i$.next();
//为每个协议暴露服务
this.doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List registryURLs) {
String name = protocolConfig.getName();
if (name == null || name.length() == 0) {
//如果没有配置protocol,则默认为dubbo。
name = "dubbo";
}
//injvm协议,不使用注册中心
if ("injvm".equals(protocolConfig.getName())) {
protocolConfig.setRegister(false);
map.put("notify", "false");
}
URL url = new URL(name, host, port, (contextPath != null && contextPath.length() != 0 ? contextPath + "/" : "") + this.path, map);
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {
url = ((ConfiguratorFactory)ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getExtension(url.getProtocol())).getConfigurator(url).configure(url);
}
String scope = url.getParameter("scope");
if (!"none".toString().equalsIgnoreCase(scope)) {
if (!"remote".toString().equalsIgnoreCase(scope)) {
//本地暴露
this.exportLocal(url);
}
//远程暴露
if (!"local".toString().equalsIgnoreCase(scope)) {
//配置了注册中心的地址
if (registryURLs != null && registryURLs.size() > 0) {
Iterator i$ = registryURLs.iterator();
//将服务暴露到所有的注册中心上
while(i$.hasNext()) {
URL registryURL = (URL)i$.next();
//将实现类、接口、URL封装到invoker中,通过SPI机制生成代理类
Invoker invoker = proxyFactory.getInvoker(this.ref, this.interfaceClass, registryURL.addParameterAndEncoded("export", url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//服务暴露
Exporter exporter = protocol.export(wrapperInvoker);
this.exporters.add(exporter);
}
} else {
//未配置注册中心
Invoker invoker = proxyFactory.getInvoker(this.ref, this.interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter exporter = protocol.export(wrapperInvoker);
this.exporters.add(exporter);
}
}
}
}
}
首先调用RegistryRrotocol.export(),然后调用比如DubboProtocol.export()。
public class RegistryProtocol implements Protocol {
public Exporter export(final Invoker originInvoker) throws RpcException {
URL providerUrl = this.getProviderUrl(originInvoker);
//暴露Invoker
RegistryProtocol.ExporterChangeableWrapper exporter = this.doLocalExport(originInvoker, providerUrl);
URL registeredProviderUrl = this.getUrlToRegistry(providerUrl, registryUrl);
boolean register = providerUrl.getParameter("register", true);
if (register) {
//把服务提供者的地方保存在注册中心
this.register(registryUrl, registeredProviderUrl);
}
}
private RegistryProtocol.ExporterChangeableWrapper doLocalExport(final Invoker originInvoker, URL providerUrl) {
String key = this.getCacheKey(originInvoker);
return (RegistryProtocol.ExporterChangeableWrapper)this.bounds.computeIfAbsent(key, (s) -> {
Invoker invokerDelegate = new RegistryProtocol.InvokerDelegate(originInvoker, providerUrl);
//通过SPI机制调用DubboProtocol.export()。
return new RegistryProtocol.ExporterChangeableWrapper(this.protocol.export(invokerDelegate), originInvoker);
});
}
}
public class DubboProtocol extends AbstractProtocol {
public Exporter export(Invoker invoker) throws RpcException {
URL url = invoker.getUrl();
String key = serviceKey(url);
//将invoker包装成DubboExporter
DubboExporter exporter = new DubboExporter(invoker, key, this.exporterMap);
this.exporterMap.put(key, exporter);
//打开服务器
this.openServer(url);
return exporter;
}
private void openServer(URL url) {
//服务暴露的IP:PORT,比如127.0.0.1:20880
String key = url.getAddress();
boolean isServer = url.getParameter("isserver", true);
if (isServer) {
ProtocolServer server = (ProtocolServer)this.serverMap.get(key);
if (server == null) {
synchronized(this) {
server = (ProtocolServer)this.serverMap.get(key);
if (server == null) {
//第一个服务暴露时,创建服务器
this.serverMap.put(key, this.createServer(url));
}
}
} else {
server.reset(url);
}
}
}
private ProtocolServer createServer(URL url) {
url = URLBuilder.from(url).addParameterIfAbsent("channel.readonly.sent", Boolean.TRUE.toString()).addParameterIfAbsent("heartbeat", String.valueOf(60000)).addParameter("codec", "dubbo").build();
//绑定url(dubbo://192.168.1.101:20880/com.gmall.UserService)和请求处理器DubboProtocol
ExchangeServer server = Exchangers.bind(url, this.requestHandler);
return new DubboProtocolServer(server);
}
}
public class Exchangers {
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
url = url.addParameterIfAbsent("codec", "exchange");
return getExchanger(url).bind(url, handler);
}
}
public class HeaderExchanger implements Exchanger {
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new ChannelHandler[]{new DecodeHandler(new HeaderExchangeHandler(handler))}));
}
}
public class Transporters {
public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {
Object handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
//接下来使用NettyTransporter创建一个NettyServer
return getTransporter().bind(url, (ChannelHandler)handler);
}
}
public class NettyTransporter implements Transporter {
//创建NettyServer,监听通信端口
public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
return new NettyServer(url, handler);
}
}
public class ProviderConsumerRegTable {
//key是url, value是提供者的invoker
public static ConcurrentHashMap providerInvokers = new ConcurrentHashMap();
}
public class RegistryProtocol implements Protocol {
private void register(URL registryUrl, URL registeredProviderUrl) {
Registry registry = this.registryFactory.getRegistry(registryUrl);
registry.register(registeredProviderUrl);
}
}
三、简单的调用过程
Consumer--->URL--->Exporter--->Invoker--->xxxServiceImpl