Dubbo服务暴露流程

2023年 8月 18日 111.2k 0

一、Dubbo服务暴露流程简述

需要明确一点:Dubbo采用URL的方式来作为约定的参数类型。
protocol://username:password@host:port/path?key=value&key=value

  • protocol: 指的是dubbo中的各种协议,比如dubbo, thrift, http
  • username:password: 用户名:密码
  • host:port: 主机:端口
  • path: 接口全限定名
  • parameters: 参数键值对

ServiceBean实现了ApplicationListener, 监听ContextRefreshedEvent事件,在Spring IOC容器刷新完成后调用onApplicationEvent方法,服务暴露的启动点,根据配置得到URL,再利用Dubbo SPI机制根据URL的参数选择对应的实现类,实现扩展。

通过javassist动态封装服务实现类,统一暴露出Invoker使得调用方便,屏蔽底层实现细节,然后封装成exporter存储起来,等待消费者的调用,并且会将URL注册到注册中心,使得消费者可以获取服务提供者的信息。

一个服务如果有多个协议,那么就都需要暴露。比如同时支持dubbo协议和hessian协议,那么需要将这个服务用两种协议分别向多个注册中心(如果有多个的话)暴露服务。

  • 检测配置,如果有些配置为空的话会默认创建,并且组装成URL。
  • 根据URL进行服务暴露,创建代理类Invoker, 根据URL得知具体的协议,根据Dubbo SPI机制选取实现类实现exporter。
  • 如果只是本地暴露,将exporter存入ServiceConfig的缓存。
  • 远程暴露,先通过registry协议找到RegistryProtocol进行export,将URL中export=dubbo://...先转换成exporter。然后获取注册中心的相关配置,如果需要注册则向注册中心注册,并且在ProviderConsumerRegTable这个表中记录服务提供者,其实就是往一个ConcurrentHashmap中存入invoker, key就是服务接口全限定名,value是一个set,set里面会存包装过的invoker,根据URL上Dubbo协议暴露出exporter, 打开Server调用NettyServer来监听服务。
  • 二、Dubbo服务暴露详细流程

    如下图所示,为Dubbo服务暴露的完整流程。
    无标题.png

  • DubboNamespaceHandler类中定义了dubbo标签与java类的对应关系。
  • public class DubboNamespaceHandler extends NamespaceHandlerSupport {  
    public DubboNamespaceHandler() {  
    }  
      
    public void init() {  
    this.registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));  
    this.registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));  
    this.registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));  
    this.registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));  
    this.registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));  
    this.registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));  
    this.registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));  
    this.registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));  
    this.registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));  
    this.registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));  
    }  
      
    static {  
    Version.checkDuplicate(DubboNamespaceHandler.class);  
    }  
    }
    
  • DubboBeanDefinitionParser类会解析dubbo的xml文件,解析标签,生成BeanDefinition。
  • //解析dubbo配置文件,生成BeanDefinition
    public class DubboBeanDefinitionParser implements BeanDefinitionParser {
        public BeanDefinition parse(Element element, ParserContext parserContext) {
            return parse(element, parserContext, this.beanClass, this.required);
        }
    }
    
  • ServiceBean实现了InitializingBeanApplicationListener 这两个接口,所以在ServiceBean实例化过程中,会调用afterPropertiesSet()方法,在Spring IOC容器完成刷新之后,会调用onApplicationEvent(ContextRefreshedEvent event)方法,这里有服务暴露的启动点。
  • public class ServiceBean extends ServiceConfig implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware {
    
        //在IOC容器完成刷新事件后调用
        public void onApplicationEvent(ContextRefreshedEvent event) {
            if (this.isDelay() && !this.isExported() && !this.isUnexported()) {
                if (logger.isInfoEnabled()) {
                        logger.info("The service ready on spring started.                            service: " + this.getInterface());
                }
                //服务暴露
                this.export();
            }
    
        }
        
        //在ServiceBean实例化过程中调用
        public void afterPropertiesSet() throws Exception {
            //设置dubbo配置文件中的类间的关系??
        }
    }
    
  • ServiceConfig中提供了服务暴露的接口,通过Dubbo的SPI机制找到proxyFactory的实现JavasistProxyFactory,将服务实现类、服务提供接口、URL封装成JavasistProxyFactory类型的invoker,并调用protocol.export()将invoker暴露出去。
  • public class ServiceConfig extends AbstractServiceConfig {
    	public synchronized void export() {
            if (this.export == null || this.export) {
                if (this.delay != null && this.delay > 0) {
                    ...
                } else {
                	//服务暴露
                    this.doExport();
                }
            }
        }
    
        protected synchronized void doExport() {
        	this.doExportUrls();
        }
    
        private void doExportUrls() {
        	//加载注册中心url
            List registryURLs = this.loadRegistries(true);
            //可以配置多个通信协议
            Iterator i$ = this.protocols.iterator();
    
            while(i$.hasNext()) {
                ProtocolConfig protocolConfig = (ProtocolConfig)i$.next();
                //为每个协议暴露服务
                this.doExportUrlsFor1Protocol(protocolConfig, registryURLs);
            }
    
        }
    
        private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List registryURLs) {
    
        	String name = protocolConfig.getName();
            if (name == null || name.length() == 0) {
            	//如果没有配置protocol,则默认为dubbo。
                name = "dubbo";
            }
    
            //injvm协议,不使用注册中心
            if ("injvm".equals(protocolConfig.getName())) {
                protocolConfig.setRegister(false);
                map.put("notify", "false");
            }
    
            URL url = new URL(name, host, port, (contextPath != null && contextPath.length() != 0 ? contextPath + "/" : "") + this.path, map);
            if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {
                url = ((ConfiguratorFactory)ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getExtension(url.getProtocol())).getConfigurator(url).configure(url);
            }
    
            String scope = url.getParameter("scope");
            if (!"none".toString().equalsIgnoreCase(scope)) {
                if (!"remote".toString().equalsIgnoreCase(scope)) {
                	//本地暴露
                    this.exportLocal(url);
                }
                //远程暴露
                if (!"local".toString().equalsIgnoreCase(scope)) {
                   	//配置了注册中心的地址
                    if (registryURLs != null && registryURLs.size() > 0) {
                        Iterator i$ = registryURLs.iterator();
                        //将服务暴露到所有的注册中心上
                        while(i$.hasNext()) {
                            URL registryURL = (URL)i$.next();
    
                            //将实现类、接口、URL封装到invoker中,通过SPI机制生成代理类
                            Invoker invoker = proxyFactory.getInvoker(this.ref, this.interfaceClass, registryURL.addParameterAndEncoded("export", url.toFullString()));
                            DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                           	//服务暴露
                            Exporter exporter = protocol.export(wrapperInvoker);
                            this.exporters.add(exporter);
                        }
                    } else {
                    	//未配置注册中心
                        Invoker invoker = proxyFactory.getInvoker(this.ref, this.interfaceClass, url);
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                        Exporter exporter = protocol.export(wrapperInvoker);
                        this.exporters.add(exporter);
                    }
                }
            }
        }
    
    }
    
  • 基于Dubbo的SPI机制,得到Protocol接口的实现。因为使用了注册中心,所以会发现RegistryRrotocol;根据使用的通信协议,找到对应的协议实现类,比如DubboProtocol。
    首先调用RegistryRrotocol.export(),然后调用比如DubboProtocol.export()。
  • public class RegistryProtocol implements Protocol {
        public  Exporter export(final Invoker originInvoker) throws RpcException {
            URL providerUrl = this.getProviderUrl(originInvoker);
            //暴露Invoker
            RegistryProtocol.ExporterChangeableWrapper exporter = this.doLocalExport(originInvoker, providerUrl);
            URL registeredProviderUrl = this.getUrlToRegistry(providerUrl, registryUrl);
            boolean register = providerUrl.getParameter("register", true);
            if (register) {
                //把服务提供者的地方保存在注册中心
                this.register(registryUrl, registeredProviderUrl);
            }
    
        }
    
        private  RegistryProtocol.ExporterChangeableWrapper doLocalExport(final Invoker originInvoker, URL providerUrl) {
            String key = this.getCacheKey(originInvoker);
            return (RegistryProtocol.ExporterChangeableWrapper)this.bounds.computeIfAbsent(key, (s) -> {
                Invoker invokerDelegate = new RegistryProtocol.InvokerDelegate(originInvoker, providerUrl);
                
                //通过SPI机制调用DubboProtocol.export()。
                return new RegistryProtocol.ExporterChangeableWrapper(this.protocol.export(invokerDelegate), originInvoker);
            });
        }
    
    }
    
  • DubboProtocol中启动Netty服务器,监听20880端口。
  • public class DubboProtocol extends AbstractProtocol {
        public  Exporter export(Invoker invoker) throws RpcException {
            URL url = invoker.getUrl();
            String key = serviceKey(url);
            //将invoker包装成DubboExporter
            DubboExporter exporter = new DubboExporter(invoker, key, this.exporterMap);
            this.exporterMap.put(key, exporter);
            //打开服务器
            this.openServer(url);
            return exporter;
        }
        
        private void openServer(URL url) {
            //服务暴露的IP:PORT,比如127.0.0.1:20880
            String key = url.getAddress();
            boolean isServer = url.getParameter("isserver", true);
            if (isServer) {
                ProtocolServer server = (ProtocolServer)this.serverMap.get(key);
                if (server == null) {
                    synchronized(this) {
                        server = (ProtocolServer)this.serverMap.get(key);
                        if (server == null) {
                            //第一个服务暴露时,创建服务器
                            this.serverMap.put(key, this.createServer(url));
                        }
                    }
                } else {
                    server.reset(url);
                }
            }
    
        }
    
        private ProtocolServer createServer(URL url) {
            url = URLBuilder.from(url).addParameterIfAbsent("channel.readonly.sent", Boolean.TRUE.toString()).addParameterIfAbsent("heartbeat", String.valueOf(60000)).addParameter("codec", "dubbo").build();
    
            //绑定url(dubbo://192.168.1.101:20880/com.gmall.UserService)和请求处理器DubboProtocol
            ExchangeServer server = Exchangers.bind(url, this.requestHandler);
    
            return new DubboProtocolServer(server);
        }
        
    }
    
  • Netty服务器监控通信端口。
  • public class Exchangers {
        public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
            url = url.addParameterIfAbsent("codec", "exchange");
            return getExchanger(url).bind(url, handler);
        }
    }
    
    public class HeaderExchanger implements Exchanger {
    
    
        public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
            return new HeaderExchangeServer(Transporters.bind(url, new ChannelHandler[]{new DecodeHandler(new HeaderExchangeHandler(handler))}));
        }
    }
    
    public class Transporters {
       
        public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {
            Object handler;
            if (handlers.length == 1) {
                handler = handlers[0];
            } else {
                handler = new ChannelHandlerDispatcher(handlers);
            }
            //接下来使用NettyTransporter创建一个NettyServer
            return getTransporter().bind(url, (ChannelHandler)handler);
        }
    }
    
    public class NettyTransporter implements Transporter {
    
        //创建NettyServer,监听通信端口
        public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
            return new NettyServer(url, handler);
        }
    }
    
  • 成功启动Netty服务器后,回到RegistryProtocol中,有个ProviderConsumerRegTable,服务暴露时,将provider信息保存到table中。
  • public class ProviderConsumerRegTable {
    	//key是url, value是提供者的invoker
        public static ConcurrentHashMap providerInvokers = new ConcurrentHashMap();
    }
    
  • 将URL注册到注册中心。
  • public class RegistryProtocol implements Protocol {
    
        private void register(URL registryUrl, URL registeredProviderUrl) {
            Registry registry = this.registryFactory.getRegistry(registryUrl);
            registry.register(registeredProviderUrl);
        }
    }
    

    三、简单的调用过程

    Consumer--->URL--->Exporter--->Invoker--->xxxServiceImpl

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论