手撸RPC框架 注册中心基础功能实现

2023年 7月 14日 55.5k 0

大家好,我是小趴菜,接下来我会从0到1手写一个RPC框架,该专题包括以下专题,有兴趣的小伙伴就跟着我一起学习吧

本章源码地址:gitee.com/baojh123/se…

自定义注解 -> opt-01
服务提供者收发消息基础实现 -> opt-01
自定义网络传输协议的实现 -> opt-02
自定义编解码实现 -> opt-03
服务提供者调用真实方法实现 -> opt-04
完善服务消费者发送消息基础功能 -> opt-05
注册中心基础功能实现 -> opt-06
服务提供者整合注册中心 -> opt-07
服务消费者整合注册中心 -> opt-08
完善服务消费者接收响应结果 -> opt-09
服务消费者,服务提供者整合SpringBoot -> opt-10
动态代理屏蔽RPC服务调用底层细节 -> opt-10
SPI机制基础功能实现 -> opt-11
SPI机制扩展随机负载均衡策略 -> opt-12
SPI机制扩展轮询负载均衡策略 -> opt-13
SPI机制扩展JDK序列化 -> opt-14
SPI机制扩展JSON序列化 -> opt-15
SPI机制扩展protustuff序列化 -> opt-16

前言

在之前我们实现了服务提供者发送请求,已经服务提供者收到请求以后调用业务真实方法,然后返回响应结果了。接下来我们需要完成的是注册中心的基础功能

一个完整的RPC框架,注册中心是其中最重要的一部分,服务提供者需要注册到注册中心,之后服务提供者需要从注册中心获取到服务提供者,然后才能发送请求。

实现

创建一个注册中心模块 xpc-rpc-register,然后创建二个子项目,分别是:xpc-rpc-register-common 和 xpc-rpc-register-zookeeper,项目结构如下

image.png

xpc-rpc-register-commonpom.xml文件如下



    
        xpc-rpc-register
        com.xpc
        1.0-SNAPSHOT
    
    4.0.0

    xpc-rpc-register-common


    
        
            com.xpc
            xpc-rpc-common
            1.0-SNAPSHOT
        
    

创建一个注册配置类 com.xpc.rpc.register.common.config.RegisterConfig

package com.xpc.rpc.register.common.config;

import com.xpc.rpc.common.utils.yml.BootYamlUtils;

import java.net.InetAddress;
import java.net.UnknownHostException;

/**
 * 注册的基本配置类
 */
public class RegisterConfig {

    /**
     * 注册地址
     */
    private String registerAddress;

    /**
     * 注册类型
     */
    private String registerType;

    /**
     * Netty监听端口
     */
    private Integer registerPort;

    /**
     * 服务名称
     */
    private String applicationName;

    /**
     * 项目服务的监听端口
     */
    private Integer serverPort;

    /**
     * 本机的IP地址
     */
    private String serverAddress;


    public RegisterConfig() {
        this.registerAddress = (String) BootYamlUtils.getProperties("register.address");
        this.registerType = (String)BootYamlUtils.getProperties("register.type");
        this.applicationName = (String)BootYamlUtils.getProperties("application.name");
        this.registerPort = (Integer)BootYamlUtils.getProperties("register.port");
        this.serverPort = (Integer)BootYamlUtils.getProperties("server.port");
        try {
            this.serverAddress = InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
    }

    public String getRegisterAddress() {
        return registerAddress;
    }

    public void setRegisterAddress(String registerAddress) {
        this.registerAddress = registerAddress;
    }

    public String getRegisterType() {
        return registerType;
    }

    public void setRegisterType(String registerType) {
        this.registerType = registerType;
    }

    public String getApplicationName() {
        return applicationName;
    }

    public void setApplicationName(String applicationName) {
        this.applicationName = applicationName;
    }

    public Integer getRegisterPort() {
        return registerPort;
    }

    public void setRegisterPort(Integer registerPort) {
        this.registerPort = registerPort;
    }

    public Integer getServerPort() {
        return serverPort;
    }

    public void setServerPort(Integer serverPort) {
        this.serverPort = serverPort;
    }

    public String getServerAddress() {
        return serverAddress;
    }

    public void setServerAddress(String serverAddress) {
        this.serverAddress = serverAddress;
    }
}
  • registerType:注册类型,有zookeeper,nacos,consul等注册中心
  • registerAddress:注册中心的地址,zookeeper就是127.0.0.1:2181
  • registerPort:Netty启动需要绑定一个监听端口,也就是Netty服务端的监听端口
  • applicationName:服务名称,也就是我们自己项目的应用名称
  • serverPort:我们自己服务的监听端口
  • serverAddress:我们自己服务部署的机器的IP地址

在配置类的构造方法中,我们通过使用自己的工具类,从配置文件中获取对应的配置的值,然后进行赋值操作

创建一个服务注册接口 com.xpc.rpc.register.common.RegisterService

package com.xpc.rpc.register.common;

import com.xpc.rpc.protocol.meta.ServiceMeta;

public interface RegisterService {

      /**
       * 服务注册
       * @param serviceMeta 服务注册元数据
       */
      void register(ServiceMeta serviceMeta) throws Exception;

      /**
       * 服务发现
       * @param serviceName
       * @return
       * @throws Exception
       */
      ServiceMeta discovery(String serviceName) throws Exception;
}

接下来就是服务注册的元数据类:com.xpc.rpc.protocol.meta.ServiceMeta

package com.xpc.rpc.protocol.meta;

/**
 * 服务注册元数据
 */
public class ServiceMeta {

    private String interfaceName;

    /**
     * 本机的服务地址
     */
    private String serverAddress;

    /**
     * 服务端口号
     */

    private Integer serverPort;

    /**
     * Netty监听端口
     */
    private Integer registerPort;

    /**
     * 服务名称
     */
    private String applicationName;
    
    /**
     * 负载均衡策略
     */
    private String loanBalancedType;

    public String getLoanBalancedType() {
        return loanBalancedType;
    }

    public void setLoanBalancedType(String loanBalancedType) {
        this.loanBalancedType = loanBalancedType;
    }


    public String getInterfaceName() {
        return interfaceName;
    }

    public void setInterfaceName(String interfaceName) {
        this.interfaceName = interfaceName;
    }

    public Integer getRegisterPort() {
        return registerPort;
    }

    public void setRegisterPort(Integer registerPort) {
        this.registerPort = registerPort;
    }

    public String getApplicationName() {
        return applicationName;
    }

    public void setApplicationName(String applicationName) {
        this.applicationName = applicationName;
    }

    public String getServerAddress() {
        return serverAddress;
    }

    public void setServerAddress(String serverAddress) {
        this.serverAddress = serverAddress;
    }

    public Integer getServerPort() {
        return serverPort;
    }

    public void setServerPort(Integer serverPort) {
        this.serverPort = serverPort;
    }
}

服务注册和发现的接口设计好了,那么接下来就是去实现这二个接口具体功能了,具体的实现功能是在 com.xpc.rpc.register.zookeeper.ZookeeperRegisterServiceImpl 类中

package com.xpc.rpc.register.zookeeper;

import com.xpc.rpc.protocol.meta.ServiceMeta;
import com.xpc.rpc.register.common.RegisterService;
import com.xpc.rpc.register.common.config.RegisterConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.List;

/**
 * zookeeper注册中心功能实现
 */
public class ZookeeperRegisterServiceImpl implements RegisterService {

    private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperRegisterServiceImpl.class);

    private static final int BASE_SLEEP_TIME_MS = 1000;

    private static final int MAX_RETRIES = 3;

    private static final String ZK_BASE_PATH = "/xpc_rpc";

    private ServiceDiscovery serviceDiscovery;

    private RegisterConfig registerConfig;


    public ZookeeperRegisterServiceImpl(RegisterConfig registerConfig) {
        this.registerConfig = registerConfig;
        CuratorFramework client = CuratorFrameworkFactory.newClient(registerConfig.getRegisterAddress(), new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES));
        client.start();
        JsonInstanceSerializer serializer = new JsonInstanceSerializer(ServiceMeta.class);
        this.serviceDiscovery = ServiceDiscoveryBuilder.builder(ServiceMeta.class)
                .client(client)
                .serializer(serializer)
                .basePath(ZK_BASE_PATH)
                .build();
        try {
            this.serviceDiscovery.start();
        } catch (Exception e) {
            LOGGER.error("serviceDiscovery start error :{}",e);
        }
    }

    //服务注册
    @Override
    public void register(ServiceMeta serviceMeta) throws Exception {
        ServiceInstance serviceInstance = ServiceInstance
                .builder()
                .name(serviceMeta.getApplicationName())
                .address(serviceMeta.getServerAddress())
                .port(serviceMeta.getServerPort())
                .payload(serviceMeta)
                .build();
        serviceDiscovery.registerService(serviceInstance);
    }

    //服务发现
    @Override
    public ServiceMeta discovery(String serviceName) throws Exception {
        Collection serviceInstances = serviceDiscovery.queryForInstances(serviceName);
        // TODO 后续会根据负载均衡策略来选取一个服务提供者
        return selectByLoadBalanceType((List)serviceInstances);
    }

    private ServiceMeta selectByLoadBalanceType(List list) {
        if(list == null || list.isEmpty()) {
            return null;
        }
        return list.get(0).getPayload();
    }
}

至此注册中心的功能就实现完毕了,接下来就是将服务提供者和服务消费者整合到注册中心

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论