大家好,我是小趴菜,接下来我会从0到1手写一个RPC框架,该专题包括以下专题,有兴趣的小伙伴就跟着我一起学习吧
本章源码地址:gitee.com/baojh123/se…
自定义注解 -> opt-01
服务提供者收发消息基础实现 -> opt-01
自定义网络传输协议的实现 -> opt-02
自定义编解码实现 -> opt-03
服务提供者调用真实方法实现 -> opt-04
完善服务消费者发送消息基础功能 -> opt-05
注册中心基础功能实现 -> opt-06
服务提供者整合注册中心 -> opt-07
服务消费者整合注册中心 -> opt-08
完善服务消费者接收响应结果 -> opt-09
服务消费者,服务提供者整合SpringBoot -> opt-10
动态代理屏蔽RPC服务调用底层细节 -> opt-10
SPI机制基础功能实现 -> opt-11
SPI机制扩展随机负载均衡策略 -> opt-12
SPI机制扩展轮询负载均衡策略 -> opt-13
SPI机制扩展JDK序列化 -> opt-14
SPI机制扩展JSON序列化 -> opt-15
SPI机制扩展protustuff序列化 -> opt-16
前言
在之前的章节中,我们已经实现了服务提供者的收发消息功能,并且服务提供者能够调用真实方法并返回响应结果。
但是我们发现,发送消息是在服务消费者注册的时候,发送了一条消息,这显然不符合我们的需求。
这一章我们完善下服务消费者发送消息的基础功能
实现
修改服务消费者:com.xpc.rpc.consumer.RpcConsumer
package com.xpc.rpc.consumer;
import com.xpc.rpc.codec.RpcDecoder;
import com.xpc.rpc.codec.RpcEncoder;
import com.xpc.rpc.consumer.handler.RpcConsumerHandler;
import com.xpc.rpc.protocol.ProtocolMessage;
import com.xpc.rpc.protocol.request.RpcRequest;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class RpcConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcConsumer.class);
private Bootstrap bootstrap;
private EventLoopGroup eventLoopGroup;
private Map handlerMap = new ConcurrentHashMap();
public RpcConsumer() {
eventLoopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
//我们自己的编解码器
pipeline.addLast(new RpcDecoder());
pipeline.addLast(new RpcEncoder());
pipeline.addLast(new RpcConsumerHandler());
}
});
}
public void sendRequest(ProtocolMessage requestProtocolMessage) {
//先写死,后续会通过注册中心获取
String host = "127.0.0.1";
int port = 21778;
RpcRequest request = requestProtocolMessage.getT();
String key = buildHandlerMapKey(request.getClassName(), request.getMethodName());
RpcConsumerHandler consumerHandler;
if(handlerMap.containsKey(key)) {
consumerHandler = handlerMap.get(key);
Channel channel = consumerHandler.getChannel();
if(!channel.isOpen() || !channel.isActive()) {
consumerHandler = getConsumerHandler(key, host, port);
handlerMap.put(buildHandlerMapKey(request.getClassName(),request.getMethodName()),consumerHandler);
}
}else {
consumerHandler = getConsumerHandler(key, host, port);
if(consumerHandler == null) {
throw new RuntimeException("");
}
handlerMap.put(buildHandlerMapKey(request.getClassName(),request.getMethodName()),consumerHandler);
}
//发送消息
consumerHandler.sendRequest(requestProtocolMessage);
}
private RpcConsumerHandler getConsumerHandler(String key,String host, int port) {
try {
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
channelFuture.addListener(new GenericFutureListener