手撸RPC框架 完善消费者发送消息基础功能
大家好,我是小趴菜,接下来我会从0到1手写一个RPC框架,该专题包括以下专题,有兴趣的小伙伴就跟着我一起学习吧
本章源码地址:gitee.com/baojh123/se…
自定义注解 -> opt-01 服务提供者收发消息基础实现 -> opt-01 自定义网络传输协议的实现 -> opt-02 自定义编解码实现 -> opt-03 服务提供者调用真实方法实现 -> opt-04 完善服务消费者发送消息基础功能 -> opt-05 注册中心基础功能实现 -> opt-06 服务提供者整合注册中心 -> opt-07 服务消费者整合注册中心 -> opt-08 完善服务消费者接收响应结果 -> opt-09 服务消费者,服务提供者整合SpringBoot -> opt-10 动态代理屏蔽RPC服务调用底层细节 -> opt-10 SPI机制基础功能实现 -> opt-11 SPI机制扩展随机负载均衡策略 -> opt-12 SPI机制扩展轮询负载均衡策略 -> opt-13 SPI机制扩展JDK序列化 -> opt-14 SPI机制扩展JSON序列化 -> opt-15 SPI机制扩展protustuff序列化 -> opt-16
前言
在之前的章节中,我们已经实现了服务提供者的收发消息功能,并且服务提供者能够调用真实方法并返回响应结果。
但是我们发现,发送消息是在服务消费者注册的时候,发送了一条消息,这显然不符合我们的需求。
这一章我们完善下服务消费者发送消息的基础功能
实现
修改服务消费者:com.xpc.rpc.consumer.RpcConsumer
package com.xpc.rpc.consumer; import com.xpc.rpc.codec.RpcDecoder; import com.xpc.rpc.codec.RpcEncoder; import com.xpc.rpc.consumer.handler.RpcConsumerHandler; import com.xpc.rpc.protocol.ProtocolMessage; import com.xpc.rpc.protocol.request.RpcRequest; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class RpcConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(RpcConsumer.class); private Bootstrap bootstrap; private EventLoopGroup eventLoopGroup; private Map handlerMap = new ConcurrentHashMap(); public RpcConsumer() { eventLoopGroup = new NioEventLoopGroup(); bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); //我们自己的编解码器 pipeline.addLast(new RpcDecoder()); pipeline.addLast(new RpcEncoder()); pipeline.addLast(new RpcConsumerHandler()); } }); } public void sendRequest(ProtocolMessage requestProtocolMessage) { //先写死,后续会通过注册中心获取 String host = "127.0.0.1"; int port = 21778; RpcRequest request = requestProtocolMessage.getT(); String key = buildHandlerMapKey(request.getClassName(), request.getMethodName()); RpcConsumerHandler consumerHandler; if(handlerMap.containsKey(key)) { consumerHandler = handlerMap.get(key); Channel channel = consumerHandler.getChannel(); if(!channel.isOpen() || !channel.isActive()) { consumerHandler = getConsumerHandler(key, host, port); handlerMap.put(buildHandlerMapKey(request.getClassName(),request.getMethodName()),consumerHandler); } }else { consumerHandler = getConsumerHandler(key, host, port); if(consumerHandler == null) { throw new RuntimeException(""); } handlerMap.put(buildHandlerMapKey(request.getClassName(),request.getMethodName()),consumerHandler); } //发送消息 consumerHandler.sendRequest(requestProtocolMessage); } private RpcConsumerHandler getConsumerHandler(String key,String host, int port) { try { ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); channelFuture.addListener(new GenericFutureListener