手撸RPC框架 服务提供者整合注册中心

2023年 7月 14日 45.1k 0

大家好,我是小趴菜,接下来我会从0到1手写一个RPC框架,该专题包括以下专题,有兴趣的小伙伴就跟着我一起学习吧

本章源码地址:gitee.com/baojh123/se…

自定义注解 -> opt-01
服务提供者收发消息基础实现 -> opt-01
自定义网络传输协议的实现 -> opt-02
自定义编解码实现 -> opt-03
服务提供者调用真实方法实现 -> opt-04
完善服务消费者发送消息基础功能 -> opt-05
注册中心基础功能实现 -> opt-06
服务提供者整合注册中心 -> opt-07
服务消费者整合注册中心 -> opt-08
完善服务消费者接收响应结果 -> opt-09
服务消费者,服务提供者整合SpringBoot -> opt-10
动态代理屏蔽RPC服务调用底层细节 -> opt-10
SPI机制基础功能实现 -> opt-11
SPI机制扩展随机负载均衡策略 -> opt-12
SPI机制扩展轮询负载均衡策略 -> opt-13
SPI机制扩展JDK序列化 -> opt-14
SPI机制扩展JSON序列化 -> opt-15
SPI机制扩展protustuff序列化 -> opt-16

前言

在前面一章我们已经实现了以zookeeper的注册中心的基本功能,并实现了 服务注册,服务发现二个接口。接下来我们就是要将服务提供者注册到注册中心。

实现

在之前的服务提供者的测试类中,我们扫描的包路径是写死的,其实在最后整合springboot的时候,这些配置我们肯定得从配置文件中去动态获取的,所以接下来我们首先就是要修改配置相关的信息

package com.xpc.test.netty;

import com.xpc.rpc.provider.server.base.BaseServer;
import org.junit.Test;

public class ProviderTest {

    @Test
    public void startNetty() {
        BaseServer baseServer = new BaseServer("com.xpc");
        baseServer.startNettyServer();
    }
}
  • com.xpc.rpc.register.common.config.RegisterConfig
package com.xpc.rpc.register.common.config;


import org.springframework.boot.context.properties.ConfigurationProperties;

import java.net.InetAddress;
import java.net.UnknownHostException;

/**
 * 注册的基本配置类,需要从项目的配置文件中读取
 */
@ConfigurationProperties(prefix = "xpc")
public class RegisterConfig {

    /**
     * 注册地址
     */
    private String registerAddress;

    /**
     * 注册类型
     */
    private String registerType;

    /**
     * Netty监听端口
     */
    private Integer registerPort;

    /**
     * 本机的IP地址
     */
    private String serverAddress;
    
    
    /**
     * 服务名称
     */
    private String applicationName;

    /**
     * 项目服务的监听端口
     */
    private Integer serverPort;


    /**
     * 扫描的包路径
     */
    private String packageName;

    
}
  • 新建配置自动化配置的配置类 com.xpc.rpc.register.common.config.RegisterAutoConfiguration
package com.xpc.rpc.register.common.config;

import com.xpc.rpc.register.common.cache.RegisterConfigCacheManager;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

@Configuration
@EnableConfigurationProperties(RegisterConfig.class)
public class RegisterAutoConfiguration implements InitializingBean {


    @Resource
    private RegisterConfig registerConfig;

    @Override
    public void afterPropertiesSet() throws Exception {
        //将配置类缓存起来,后续会使用到
        new RegisterConfigCacheManager(registerConfig);
    }
}

在 xpc-rpc-register-common项目下的resouorces目录下新建一个目录 META-INF,然后创建一个 spring.factories文件。文件内容如下

org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.xpc.rpc.register.common.config.RegisterAutoConfiguration

修改服务提供者的启动类:com.xpc.rpc.provider.server.base.BaseServer

package com.xpc.rpc.provider.server.base;

import com.xpc.rpc.codec.RpcDecoder;
import com.xpc.rpc.codec.RpcEncoder;
import com.xpc.rpc.common.utils.RemotingUtil;
import com.xpc.rpc.provider.handler.RpcProviderHandler;
import com.xpc.rpc.provider.scanner.service.DubboServiceScanner;
import com.xpc.rpc.provider.server.api.Server;
import com.xpc.rpc.register.common.RegisterService;
import com.xpc.rpc.register.common.cache.RegisterConfigCacheManager;
import com.xpc.rpc.register.common.config.RegisterConfig;
import com.xpc.rpc.register.zookeeper.ZookeeperRegisterServiceImpl;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;

public class BaseServer implements Server {

    private static final Logger LOGGER = LoggerFactory.getLogger(BaseServer.class);

    private EventLoopGroup bossGroup;

    private EventLoopGroup workerGroup;

    private ServerBootstrap bootstrap;

    /**
     * @RpcService标注的类对应的接口的class
     */
    protected Map handlerMap = new HashMap();

    private RegisterService registerService;

    private RegisterConfig registerConfig;

    public BaseServer() {
        //从缓存中获取配置类信息
        this.registerConfig = RegisterConfigCacheManager.getRegisterConfig();
    }

    @Override
    public void startNettyServer() {
        registerService = new ZookeeperRegisterServiceImpl();
        registerService.init(registerConfig);
        try {
            this.handlerMap = DubboServiceScanner.doScanDubboServiceByPackages(this.registerConfig.getPackageName(),registerService,registerConfig);
        } catch (Exception e) {
            LOGGER.error("scan @DubboService error: {}",e);
        }
        /**
         * 性能优化
         */
        if(useEpoll()) {
            bossGroup = new EpollEventLoopGroup();
            workerGroup = new EpollEventLoopGroup();
        }else {
            bossGroup = new NioEventLoopGroup();
            workerGroup = new NioEventLoopGroup();
        }
        bootstrap = new ServerBootstrap();

        bootstrap.group(bossGroup,workerGroup)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG,128)
                .childOption(ChannelOption.SO_KEEPALIVE,true)
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(SocketChannel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
                        //我们自己的编解码器
                        pipeline.addLast(new RpcDecoder());
                        pipeline.addLast(new RpcEncoder());
                        //我们自己实现的处理器
                        pipeline.addLast(new RpcProviderHandler(handlerMap));
                    }
                });

        try {
            ChannelFuture channelFuture = bootstrap.bind(registerConfig.getServerAddress(), registerConfig.getRegisterPort()).sync();
            LOGGER.info("Netty 服务端启动成功............");
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            LOGGER.error("Netty 服务端启动失败:{}",e);
        }finally {
            shutDown();
        }
    }

    @Override
    public void shutDown() {
        if(bossGroup != null) {
            bossGroup.shutdownGracefully();
        }
        if(workerGroup != null) {
            workerGroup.shutdownGracefully();
        }
    }

    private boolean useEpoll() {
        return RemotingUtil.isLinuxPlatform() && Epoll.isAvailable();
    }
}

测试

我们新建一个子模块 xpc-rpc-web,并创建对应的项目

image.png

新建 xpc-rpc-web-interfaces,然后创建一个接口 com.xpc.interfaces.UserService

package com.xpc.interfaces;

public interface UserService {

    String hello(String name);
}

创建服务提供者 xpc-rpc-web-provider,pom.xml文件如下



    
        org.springframework.boot
        spring-boot-starter-parent
        2.2.6.RELEASE
    

    4.0.0

    xpc-rpc-web-provider

    
        8
        8
        UTF-8
        2.2.6.RELEASE
    

    
        
            
                org.springframework.boot
                spring-boot-dependencies
                ${spring.boot.version}
                pom
                import
            
        
    

    

        
            org.springframework.boot
            spring-boot-starter-test
            provided
        

        
            org.springframework.boot
            spring-boot-starter-web
            
                
                    org.springframework.boot
                    spring-boot-starter-tomcat
                
                
                    org.springframework.boot
                    spring-boot-starter-logging
                
            
        

        
            org.springframework.boot
            spring-boot-starter-undertow
        
       
        
        
        
            com.xpc
            xpc-rpc-provider
            1.0-SNAPSHOT
        
        
            com.xpc
            xpc-rpc-wen-interfaces
            1.0-SNAPSHOT
        
    


在 resources目录下创建 application.yml,内容如下:

xpc:
  registerAddress: 127.0.0.1:2181
  registerType: zookeeper
  registerPort: 21770
  packageName: com.xpc

创建启动类:

package com.xpc;

import com.xpc.rpc.provider.server.base.BaseServer;
import com.xpc.rpc.register.common.config.RegisterConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
@SpringBootApplication
public class App {

    public static void main(String[] args) {
        SpringApplication.run(App.class,args);
    }

    @Resource
    private RegisterConfig registerConfig;

    @GetMapping("/test")
    public String test() {
        BaseServer baseServer = new BaseServer();
        baseServer.startNettyServer();
        return null;
    }
}

启动之后,在浏览器访问:localhost:8080/test

image.png

我们可以发现服务提供者已经成功启动了

接下来我们看下服务提供者是否已经注册到zookeeper中去了

image.png

可以看到,服务已经成功注册到zookeeper中了

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论