大家好,我是小趴菜,接下来我会从0到1手写一个RPC框架,该专题包括以下专题,有兴趣的小伙伴就跟着我一起学习吧
本章源码地址:gitee.com/baojh123/se…
自定义注解 -> opt-01
服务提供者收发消息基础实现 -> opt-01
自定义网络传输协议的实现 -> opt-02
自定义编解码实现 -> opt-03
服务提供者调用真实方法实现 -> opt-04
完善服务消费者发送消息基础功能 -> opt-05
注册中心基础功能实现 -> opt-06
服务提供者整合注册中心 -> opt-07
服务消费者整合注册中心 -> opt-08
完善服务消费者接收响应结果 -> opt-09
服务消费者,服务提供者整合SpringBoot -> opt-10
动态代理屏蔽RPC服务调用底层细节 -> opt-10
SPI机制基础功能实现 -> opt-11
SPI机制扩展随机负载均衡策略 -> opt-12
SPI机制扩展轮询负载均衡策略 -> opt-13
SPI机制扩展JDK序列化 -> opt-14
SPI机制扩展JSON序列化 -> opt-15
SPI机制扩展protustuff序列化 -> opt-16
前言
在之前我们实现了服务提供者发送请求,已经服务提供者收到请求以后调用业务真实方法,然后返回响应结果了。接下来我们需要完成的是注册中心的基础功能
一个完整的RPC框架,注册中心是其中最重要的一部分,服务提供者需要注册到注册中心,之后服务提供者需要从注册中心获取到服务提供者,然后才能发送请求。
实现
创建一个注册中心模块 xpc-rpc-register,然后创建二个子项目,分别是:xpc-rpc-register-common 和 xpc-rpc-register-zookeeper,项目结构如下
xpc-rpc-register-commonpom.xml文件如下
xpc-rpc-register
com.xpc
1.0-SNAPSHOT
4.0.0
xpc-rpc-register-common
com.xpc
xpc-rpc-common
1.0-SNAPSHOT
创建一个注册配置类 com.xpc.rpc.register.common.config.RegisterConfig
package com.xpc.rpc.register.common.config;
import com.xpc.rpc.common.utils.yml.BootYamlUtils;
import java.net.InetAddress;
import java.net.UnknownHostException;
/**
* 注册的基本配置类
*/
public class RegisterConfig {
/**
* 注册地址
*/
private String registerAddress;
/**
* 注册类型
*/
private String registerType;
/**
* Netty监听端口
*/
private Integer registerPort;
/**
* 服务名称
*/
private String applicationName;
/**
* 项目服务的监听端口
*/
private Integer serverPort;
/**
* 本机的IP地址
*/
private String serverAddress;
public RegisterConfig() {
this.registerAddress = (String) BootYamlUtils.getProperties("register.address");
this.registerType = (String)BootYamlUtils.getProperties("register.type");
this.applicationName = (String)BootYamlUtils.getProperties("application.name");
this.registerPort = (Integer)BootYamlUtils.getProperties("register.port");
this.serverPort = (Integer)BootYamlUtils.getProperties("server.port");
try {
this.serverAddress = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
public String getRegisterAddress() {
return registerAddress;
}
public void setRegisterAddress(String registerAddress) {
this.registerAddress = registerAddress;
}
public String getRegisterType() {
return registerType;
}
public void setRegisterType(String registerType) {
this.registerType = registerType;
}
public String getApplicationName() {
return applicationName;
}
public void setApplicationName(String applicationName) {
this.applicationName = applicationName;
}
public Integer getRegisterPort() {
return registerPort;
}
public void setRegisterPort(Integer registerPort) {
this.registerPort = registerPort;
}
public Integer getServerPort() {
return serverPort;
}
public void setServerPort(Integer serverPort) {
this.serverPort = serverPort;
}
public String getServerAddress() {
return serverAddress;
}
public void setServerAddress(String serverAddress) {
this.serverAddress = serverAddress;
}
}
- registerType:注册类型,有zookeeper,nacos,consul等注册中心
- registerAddress:注册中心的地址,zookeeper就是127.0.0.1:2181
- registerPort:Netty启动需要绑定一个监听端口,也就是Netty服务端的监听端口
- applicationName:服务名称,也就是我们自己项目的应用名称
- serverPort:我们自己服务的监听端口
- serverAddress:我们自己服务部署的机器的IP地址
在配置类的构造方法中,我们通过使用自己的工具类,从配置文件中获取对应的配置的值,然后进行赋值操作
创建一个服务注册接口 com.xpc.rpc.register.common.RegisterService
package com.xpc.rpc.register.common;
import com.xpc.rpc.protocol.meta.ServiceMeta;
public interface RegisterService {
/**
* 服务注册
* @param serviceMeta 服务注册元数据
*/
void register(ServiceMeta serviceMeta) throws Exception;
/**
* 服务发现
* @param serviceName
* @return
* @throws Exception
*/
ServiceMeta discovery(String serviceName) throws Exception;
}
接下来就是服务注册的元数据类:com.xpc.rpc.protocol.meta.ServiceMeta
package com.xpc.rpc.protocol.meta;
/**
* 服务注册元数据
*/
public class ServiceMeta {
private String interfaceName;
/**
* 本机的服务地址
*/
private String serverAddress;
/**
* 服务端口号
*/
private Integer serverPort;
/**
* Netty监听端口
*/
private Integer registerPort;
/**
* 服务名称
*/
private String applicationName;
/**
* 负载均衡策略
*/
private String loanBalancedType;
public String getLoanBalancedType() {
return loanBalancedType;
}
public void setLoanBalancedType(String loanBalancedType) {
this.loanBalancedType = loanBalancedType;
}
public String getInterfaceName() {
return interfaceName;
}
public void setInterfaceName(String interfaceName) {
this.interfaceName = interfaceName;
}
public Integer getRegisterPort() {
return registerPort;
}
public void setRegisterPort(Integer registerPort) {
this.registerPort = registerPort;
}
public String getApplicationName() {
return applicationName;
}
public void setApplicationName(String applicationName) {
this.applicationName = applicationName;
}
public String getServerAddress() {
return serverAddress;
}
public void setServerAddress(String serverAddress) {
this.serverAddress = serverAddress;
}
public Integer getServerPort() {
return serverPort;
}
public void setServerPort(Integer serverPort) {
this.serverPort = serverPort;
}
}
服务注册和发现的接口设计好了,那么接下来就是去实现这二个接口具体功能了,具体的实现功能是在 com.xpc.rpc.register.zookeeper.ZookeeperRegisterServiceImpl 类中
package com.xpc.rpc.register.zookeeper;
import com.xpc.rpc.protocol.meta.ServiceMeta;
import com.xpc.rpc.register.common.RegisterService;
import com.xpc.rpc.register.common.config.RegisterConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.List;
/**
* zookeeper注册中心功能实现
*/
public class ZookeeperRegisterServiceImpl implements RegisterService {
private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperRegisterServiceImpl.class);
private static final int BASE_SLEEP_TIME_MS = 1000;
private static final int MAX_RETRIES = 3;
private static final String ZK_BASE_PATH = "/xpc_rpc";
private ServiceDiscovery serviceDiscovery;
private RegisterConfig registerConfig;
public ZookeeperRegisterServiceImpl(RegisterConfig registerConfig) {
this.registerConfig = registerConfig;
CuratorFramework client = CuratorFrameworkFactory.newClient(registerConfig.getRegisterAddress(), new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES));
client.start();
JsonInstanceSerializer serializer = new JsonInstanceSerializer(ServiceMeta.class);
this.serviceDiscovery = ServiceDiscoveryBuilder.builder(ServiceMeta.class)
.client(client)
.serializer(serializer)
.basePath(ZK_BASE_PATH)
.build();
try {
this.serviceDiscovery.start();
} catch (Exception e) {
LOGGER.error("serviceDiscovery start error :{}",e);
}
}
//服务注册
@Override
public void register(ServiceMeta serviceMeta) throws Exception {
ServiceInstance serviceInstance = ServiceInstance
.builder()
.name(serviceMeta.getApplicationName())
.address(serviceMeta.getServerAddress())
.port(serviceMeta.getServerPort())
.payload(serviceMeta)
.build();
serviceDiscovery.registerService(serviceInstance);
}
//服务发现
@Override
public ServiceMeta discovery(String serviceName) throws Exception {
Collection serviceInstances = serviceDiscovery.queryForInstances(serviceName);
// TODO 后续会根据负载均衡策略来选取一个服务提供者
return selectByLoadBalanceType((List)serviceInstances);
}
private ServiceMeta selectByLoadBalanceType(List list) {
if(list == null || list.isEmpty()) {
return null;
}
return list.get(0).getPayload();
}
}
至此注册中心的功能就实现完毕了,接下来就是将服务提供者和服务消费者整合到注册中心