大家好,我是小趴菜,接下来我会从0到1手写一个RPC框架,该专题包括以下专题,有兴趣的小伙伴就跟着我一起学习吧
本章源码地址:gitee.com/baojh123/se…
自定义注解 -> opt-01
服务提供者收发消息基础实现 -> opt-01
自定义网络传输协议的实现 -> opt-02
自定义编解码实现 -> opt-03
服务提供者调用真实方法实现 -> opt-04
完善服务消费者发送消息基础功能 -> opt-05
注册中心基础功能实现 -> opt-06
服务提供者整合注册中心 -> opt-07
服务消费者整合注册中心 -> opt-08
完善服务消费者接收响应结果 -> opt-09
服务消费者,服务提供者整合SpringBoot -> opt-10
动态代理屏蔽RPC服务调用底层细节 -> opt-10
SPI机制基础功能实现 -> opt-11
SPI机制扩展随机负载均衡策略 -> opt-12
SPI机制扩展轮询负载均衡策略 -> opt-13
SPI机制扩展JDK序列化 -> opt-14
SPI机制扩展JSON序列化 -> opt-15
SPI机制扩展protustuff序列化 -> opt-16
前言
在前面几章我们已经实现了服务提供者收发消息的功能了,但是作为服务提供者,是应该调用我们真实的方法来获取返回结果,然后在封装成响应信息来返回给服务消费者
本章我们就实现一下服务提供者调用真实方法的功能
开干
在 com.xpc.rpc.provider.server.base.BaseServer 类新增二个成员变量
/**
* @RpcService标注的类对应的接口的class
*/
protected Map handlerMap = new HashMap();
//扫描的包路径
private String packageName;
在 startNettyServer()方法中对handlerMap进行赋值
@Override
public void startNettyServer() {
try {
this.handlerMap = DubboServiceScanner.doScanDubboServiceByPackages(packageName);
} catch (Exception e) {
LOGGER.error("scan @DubboService error: {}",e);
}
//其它代码省略
bootstrap.group(bossGroup,workerGroup)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
//我们自己的编解码器
pipeline.addLast(new RpcDecoder());
pipeline.addLast(new RpcEncoder());
//我们自己实现的处理器
//将handlerMap传入进去
pipeline.addLast(new RpcProviderHandler(handlerMap));
}
});
}
新增构造方法,对packageName进行赋值
public BaseServer(String packageName) {
this.packageName = packageName;
}
接下来就是改造服务提供者的自定义处理器了 com.xpc.rpc.provider.handler.RpcProviderHandler
package com.xpc.rpc.provider.handler;
import com.xpc.rpc.common.enums.RpcMsgType;
import com.xpc.rpc.protocol.ProtocolMessage;
import com.xpc.rpc.protocol.header.RpcHeader;
import com.xpc.rpc.protocol.request.RpcRequest;
import com.xpc.rpc.protocol.response.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Method;
import java.util.Map;
public class RpcProviderHandler extends SimpleChannelInboundHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcProviderHandler.class);
private Map handlerMap;
public RpcProviderHandler(Map handlerMap) {
this.handlerMap = handlerMap;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ProtocolMessage protocolMessage) throws Exception {
RpcHeader rpcHeader = protocolMessage.getRpcHeader();
RpcRequest request = protocolMessage.getT();
//处理请求消息
ProtocolMessage protocolResponseMessage = handlerMessage(rpcHeader,request);
//写会响应结果
ctx.writeAndFlush(protocolResponseMessage);
}
/**
* 利用反射技术调用真实方法获取结果
* @param rpcHeader
* @param request
*/
private ProtocolMessage handlerMessage(RpcHeader rpcHeader, RpcRequest request) {
ProtocolMessage protocolMessage = new ProtocolMessage();
rpcHeader.setMsgType(RpcMsgType.RESPONSE.getType());
protocolMessage.setRpcHeader(rpcHeader);
RpcResponse response = new RpcResponse();
String className = request.getClassName();
if(!handlerMap.containsKey(className)) {
throw new RuntimeException("no find class for " + className);
}
Object classBean = handlerMap.get(className);
Class beanClass = classBean.getClass();
Method method;
Object result = null;
try {
method = beanClass.getMethod(request.getMethodName(), request.getParameterTypes());
method.setAccessible(true);
result = method.invoke(classBean,request.getParameters());
response.setData(result);
response.setCode(200);
} catch (Exception e) {
response.setErrMsg(e.getMessage());
response.setCode(500);
}
protocolMessage.setT(response);
return protocolMessage;
}
}
测试
修改:com.xpc.test.scanner.DemoService,新增一个方法
package com.xpc.test.scanner;
public interface DemoService {
String hello(String name);
}
修改它的实现类:com.xpc.test.scanner.DemoServiceImpl
package com.xpc.test.scanner;
import com.xpc.rpc.annotation.DubboService;
@DubboService(interfaceClass = DemoService.class)
public class DemoServiceImpl implements DemoService{
@Override
public String hello(String name) {
return "hello " + name;
}
}
修改:com.xpc.consumer.handler.RpcConsumerHandler
package com.xpc.rpc.consumer.handler;
import com.xpc.rpc.common.enums.RpcMsgType;
import com.xpc.rpc.protocol.ProtocolMessage;
import com.xpc.rpc.protocol.header.RpcHeader;
import com.xpc.rpc.protocol.request.RpcRequest;
import com.xpc.rpc.protocol.response.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RpcConsumerHandler extends SimpleChannelInboundHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcConsumerHandler.class);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//发送一个请求
ProtocolMessage protocolMessage = new ProtocolMessage();
RpcHeader rpcHeader = new RpcHeader();
rpcHeader.setMsgType(RpcMsgType.REQUEST.getType());
rpcHeader.setRequestId(1L);
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setClassName("com.xpc.test.scanner.DemoService");
rpcRequest.setMethodName("hello");
rpcRequest.setParameterTypes(new Class[]{String.class});
rpcRequest.setParameters(new Object[]{"coco"});
protocolMessage.setRpcHeader(rpcHeader);
protocolMessage.setT(rpcRequest);
ctx.writeAndFlush(protocolMessage);
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ProtocolMessage protocolMessage) throws Exception {
LOGGER.info("code: {}",protocolMessage.getT().getCode());
LOGGER.info("data: {}",protocolMessage.getT().getData());
}
}
先启动服务提供者:com.xpc.test.netty.ProviderTest
package com.xpc.test.netty;
import com.xpc.rpc.provider.server.base.BaseServer;
import org.junit.Test;
public class ProviderTest {
@Test
public void startNetty() {
BaseServer baseServer = new BaseServer("com.xpc");
baseServer.startNettyServer();
}
}
然后启动服务消费者:com.xpc.test.netty.ConsumerTest
package com.xpc.test.netty;
import com.xpc.rpc.consumer.RpcConsumer;
import org.junit.Test;
public class ConsumerTest {
@Test
public void startConsumer() {
new RpcConsumer();
}
}
服务提供者日志:
服务消费者日志: