大家好,我是小趴菜,接下来我会从0到1手写一个RPC框架,该专题包括以下专题,有兴趣的小伙伴就跟着我一起学习吧
本章源码地址:gitee.com/baojh123/se…
自定义注解 -> opt-01
服务提供者收发消息基础实现 -> opt-01
自定义网络传输协议的实现 -> opt-02
自定义编解码实现 -> opt-03
服务提供者调用真实方法实现 -> opt-04
完善服务消费者发送消息基础功能 -> opt-05
注册中心基础功能实现 -> opt-06
服务提供者整合注册中心 -> opt-07
服务消费者整合注册中心 -> opt-08
完善服务消费者接收响应结果 -> opt-09
服务消费者,服务提供者整合SpringBoot -> opt-10
动态代理屏蔽RPC服务调用底层细节 -> opt-10
SPI机制基础功能实现 -> opt-11
SPI机制扩展随机负载均衡策略 -> opt-12
SPI机制扩展轮询负载均衡策略 -> opt-13
SPI机制扩展JDK序列化 -> opt-14
SPI机制扩展JSON序列化 -> opt-15
SPI机制扩展protustuff序列化 -> opt-16
前言
在前面一章我们已经实现了以zookeeper的注册中心的基本功能,并实现了 服务注册,服务发现二个接口。接下来我们就是要将服务提供者注册到注册中心。
实现
在之前的服务提供者的测试类中,我们扫描的包路径是写死的,其实在最后整合springboot的时候,这些配置我们肯定得从配置文件中去动态获取的,所以接下来我们首先就是要修改配置相关的信息
package com.xpc.test.netty;
import com.xpc.rpc.provider.server.base.BaseServer;
import org.junit.Test;
public class ProviderTest {
@Test
public void startNetty() {
BaseServer baseServer = new BaseServer("com.xpc");
baseServer.startNettyServer();
}
}
- com.xpc.rpc.register.common.config.RegisterConfig
package com.xpc.rpc.register.common.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import java.net.InetAddress;
import java.net.UnknownHostException;
/**
* 注册的基本配置类,需要从项目的配置文件中读取
*/
@ConfigurationProperties(prefix = "xpc")
public class RegisterConfig {
/**
* 注册地址
*/
private String registerAddress;
/**
* 注册类型
*/
private String registerType;
/**
* Netty监听端口
*/
private Integer registerPort;
/**
* 本机的IP地址
*/
private String serverAddress;
/**
* 服务名称
*/
private String applicationName;
/**
* 项目服务的监听端口
*/
private Integer serverPort;
/**
* 扫描的包路径
*/
private String packageName;
}
- 新建配置自动化配置的配置类 com.xpc.rpc.register.common.config.RegisterAutoConfiguration
package com.xpc.rpc.register.common.config;
import com.xpc.rpc.register.common.cache.RegisterConfigCacheManager;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
@Configuration
@EnableConfigurationProperties(RegisterConfig.class)
public class RegisterAutoConfiguration implements InitializingBean {
@Resource
private RegisterConfig registerConfig;
@Override
public void afterPropertiesSet() throws Exception {
//将配置类缓存起来,后续会使用到
new RegisterConfigCacheManager(registerConfig);
}
}
在 xpc-rpc-register-common项目下的resouorces目录下新建一个目录 META-INF,然后创建一个 spring.factories文件。文件内容如下
org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.xpc.rpc.register.common.config.RegisterAutoConfiguration
修改服务提供者的启动类:com.xpc.rpc.provider.server.base.BaseServer
package com.xpc.rpc.provider.server.base;
import com.xpc.rpc.codec.RpcDecoder;
import com.xpc.rpc.codec.RpcEncoder;
import com.xpc.rpc.common.utils.RemotingUtil;
import com.xpc.rpc.provider.handler.RpcProviderHandler;
import com.xpc.rpc.provider.scanner.service.DubboServiceScanner;
import com.xpc.rpc.provider.server.api.Server;
import com.xpc.rpc.register.common.RegisterService;
import com.xpc.rpc.register.common.cache.RegisterConfigCacheManager;
import com.xpc.rpc.register.common.config.RegisterConfig;
import com.xpc.rpc.register.zookeeper.ZookeeperRegisterServiceImpl;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
public class BaseServer implements Server {
private static final Logger LOGGER = LoggerFactory.getLogger(BaseServer.class);
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private ServerBootstrap bootstrap;
/**
* @RpcService标注的类对应的接口的class
*/
protected Map handlerMap = new HashMap();
private RegisterService registerService;
private RegisterConfig registerConfig;
public BaseServer() {
//从缓存中获取配置类信息
this.registerConfig = RegisterConfigCacheManager.getRegisterConfig();
}
@Override
public void startNettyServer() {
registerService = new ZookeeperRegisterServiceImpl();
registerService.init(registerConfig);
try {
this.handlerMap = DubboServiceScanner.doScanDubboServiceByPackages(this.registerConfig.getPackageName(),registerService,registerConfig);
} catch (Exception e) {
LOGGER.error("scan @DubboService error: {}",e);
}
/**
* 性能优化
*/
if(useEpoll()) {
bossGroup = new EpollEventLoopGroup();
workerGroup = new EpollEventLoopGroup();
}else {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
}
bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
//我们自己的编解码器
pipeline.addLast(new RpcDecoder());
pipeline.addLast(new RpcEncoder());
//我们自己实现的处理器
pipeline.addLast(new RpcProviderHandler(handlerMap));
}
});
try {
ChannelFuture channelFuture = bootstrap.bind(registerConfig.getServerAddress(), registerConfig.getRegisterPort()).sync();
LOGGER.info("Netty 服务端启动成功............");
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
LOGGER.error("Netty 服务端启动失败:{}",e);
}finally {
shutDown();
}
}
@Override
public void shutDown() {
if(bossGroup != null) {
bossGroup.shutdownGracefully();
}
if(workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
private boolean useEpoll() {
return RemotingUtil.isLinuxPlatform() && Epoll.isAvailable();
}
}
测试
我们新建一个子模块 xpc-rpc-web,并创建对应的项目
新建 xpc-rpc-web-interfaces,然后创建一个接口 com.xpc.interfaces.UserService
package com.xpc.interfaces;
public interface UserService {
String hello(String name);
}
创建服务提供者 xpc-rpc-web-provider,pom.xml文件如下
org.springframework.boot
spring-boot-starter-parent
2.2.6.RELEASE
4.0.0
xpc-rpc-web-provider
8
8
UTF-8
2.2.6.RELEASE
org.springframework.boot
spring-boot-dependencies
${spring.boot.version}
pom
import
org.springframework.boot
spring-boot-starter-test
provided
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-tomcat
org.springframework.boot
spring-boot-starter-logging
org.springframework.boot
spring-boot-starter-undertow
com.xpc
xpc-rpc-provider
1.0-SNAPSHOT
com.xpc
xpc-rpc-wen-interfaces
1.0-SNAPSHOT
在 resources目录下创建 application.yml,内容如下:
xpc:
registerAddress: 127.0.0.1:2181
registerType: zookeeper
registerPort: 21770
packageName: com.xpc
创建启动类:
package com.xpc;
import com.xpc.rpc.provider.server.base.BaseServer;
import com.xpc.rpc.register.common.config.RegisterConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class,args);
}
@Resource
private RegisterConfig registerConfig;
@GetMapping("/test")
public String test() {
BaseServer baseServer = new BaseServer();
baseServer.startNettyServer();
return null;
}
}
启动之后,在浏览器访问:localhost:8080/test
我们可以发现服务提供者已经成功启动了
接下来我们看下服务提供者是否已经注册到zookeeper中去了
可以看到,服务已经成功注册到zookeeper中了