大家好,我是小趴菜,接下来我会从0到1手写一个RPC框架,该专题包括以下专题,有兴趣的小伙伴就跟着我一起学习吧
本章源码地址:gitee.com/baojh123/se…
自定义注解 -> opt-01
服务提供者收发消息基础实现 -> opt-01
自定义网络传输协议的实现 -> opt-02
自定义编解码实现 -> opt-03
服务提供者调用真实方法实现 -> opt-04
完善服务消费者发送消息基础功能 -> opt-05
注册中心基础功能实现 -> opt-06
服务提供者整合注册中心 -> opt-07
服务消费者整合注册中心 -> opt-08
完善服务消费者接收响应结果 -> opt-09
服务消费者,服务提供者整合SpringBoot -> opt-10
动态代理屏蔽RPC服务调用底层细节 -> opt-10
SPI机制基础功能实现 -> opt-11
SPI机制扩展随机负载均衡策略 -> opt-12
SPI机制扩展轮询负载均衡策略 -> opt-13
SPI机制扩展JDK序列化 -> opt-14
SPI机制扩展JSON序列化 -> opt-15
SPI机制扩展protustuff序列化 -> opt-16
前言
在上一章中我们已经完成了SPI注解的实现,以及核心类的实现,并完成了测试。从这章开始,我们将会使用SPI机制扩展负载均衡策略。
实现
新建 xpc-rpc-loadbalancer模块
- 负载均衡接口类:com.xpc.rpc.loadbalancer.api.ServiceLoadBalancerApi
package com.xpc.rpc.loadbalancer.api;
import com.xpc.rpc.annotation.SPI;
import java.util.List;
@SPI
public interface ServiceLoadBalancerApi {
T select(List services);
}
- 随机负载均衡策略实现类:com.xpc.rpc.loadbalancer.random.RamdomLoadbalancer
package com.xpc.rpc.loadbalancer.random;
import com.xpc.rpc.annotation.SPIClass;
import com.xpc.rpc.loadbalancer.api.ServiceLoadBalancerApi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Random;
@SPIClass
public class RamdomLoadbalancer implements ServiceLoadBalancerApi {
private static final Logger LOGGER = LoggerFactory.getLogger(RamdomLoadbalancer.class);
@Override
public T select(List services) {
LOGGER.info("这是随机负载均衡策略..........................");
if(services == null || services.isEmpty()) {
return null;
}
Random random = new Random();
int index = random.nextInt(services.size());
return services.get(index);
}
}
然后在 resources目录下新建一个文件
文件内容如下
random=com.xpc.rpc.loadbalancer.random.RamdomLoadbalancer
修改 xpc-rpc-register-common 下的com.xpc.rpc.register.common.config.RegisterConfig,新增一个配置项
/**
* 负载均衡类型
*/
private String loadbalancerType;
修改 xpc-rpc-consumer-common下的 com.xpc.rpc.consumer.config.RpcConsumerConfig,新增一个配置项
/**
* 负载均衡类型
*/
private String loadbalancerType;
修改 xpc-rpc-register-common 下的 com.xpc.rpc.register.common.RegisterService
在服务发现接口新增一个负责均衡类型参数
package com.xpc.rpc.register.common;
import com.xpc.rpc.protocol.meta.ServiceMeta;
import com.xpc.rpc.register.common.config.RegisterConfig;
public interface RegisterService {
/**
* 服务注册
* @param serviceMeta 服务注册元数据
*/
void register(ServiceMeta serviceMeta) throws Exception;
/**
* 服务发现
* @param serviceName
* @loadbalancerType: 负载均衡策略类型
* @return
* @throws Exception
*/
ServiceMeta discovery(String serviceName,String loadbalancerType) throws Exception;
/**
* 初始化方法
*/
void init(RegisterConfig registerConfig);
}
修改它的实现方法
package com.xpc.rpc.register.zookeeper;
import com.xpc.rpc.loadbalancer.api.ServiceLoadBalancerApi;
import com.xpc.rpc.protocol.meta.ServiceMeta;
import com.xpc.rpc.register.common.RegisterService;
import com.xpc.rpc.register.common.config.RegisterConfig;
import com.xpc.rpc.spi.loader.ExtensionLoader;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.List;
/**
* zookeeper注册中心功能实现
*/
public class ZookeeperRegisterServiceImpl implements RegisterService {
//其它代码省略
@Override
public ServiceMeta discovery(String interfaceName,String loadbalancerType) throws Exception {
Collection serviceInstances = serviceDiscovery.queryForInstances(interfaceName);
//通过SPI机制以及负载均衡类型获取对应的负载均衡策略
ServiceLoadBalancerApi serviceLoadBalancerApi = ExtensionLoader.getExtension(ServiceLoadBalancerApi.class, loadbalancerType);
ServiceInstance serviceInstance = (ServiceInstance)serviceLoadBalancerApi.select((List) serviceInstances);
//返回一个ServiceData
return serviceInstance.getPayload();
}
}
测试
先启动服务提供者服务 xpc-rpc-web-provider
在服务消费者服务 xpc-rpc-web-consumer 的 application.yml新增负载均衡类型
xpc:
registerAddress: 127.0.0.1:2181
registerType: zookeeper
registerPort: 21770
packageName: com.xpc
loadbalancerType: random
server:
port: 8081
启动服务消费者服务,然后在浏览器上访问 http://localhost:8081/consumer
可以看到已经通过我们的随机负载均衡策略去获取一个服务了