手撸RPC框架 完善消费者发送消息基础功能

2023年 7月 12日 18.7k 0

大家好,我是小趴菜,接下来我会从0到1手写一个RPC框架,该专题包括以下专题,有兴趣的小伙伴就跟着我一起学习吧

本章源码地址:gitee.com/baojh123/se…

自定义注解 -> opt-01
服务提供者收发消息基础实现 -> opt-01
自定义网络传输协议的实现 -> opt-02
自定义编解码实现 -> opt-03
服务提供者调用真实方法实现 -> opt-04
完善服务消费者发送消息基础功能 -> opt-05
注册中心基础功能实现 -> opt-06
服务提供者整合注册中心 -> opt-07
服务消费者整合注册中心 -> opt-08
完善服务消费者接收响应结果 -> opt-09
服务消费者,服务提供者整合SpringBoot -> opt-10
动态代理屏蔽RPC服务调用底层细节 -> opt-10
SPI机制基础功能实现 -> opt-11
SPI机制扩展随机负载均衡策略 -> opt-12
SPI机制扩展轮询负载均衡策略 -> opt-13
SPI机制扩展JDK序列化 -> opt-14
SPI机制扩展JSON序列化 -> opt-15
SPI机制扩展protustuff序列化 -> opt-16

前言

在之前的章节中,我们已经实现了服务提供者的收发消息功能,并且服务提供者能够调用真实方法并返回响应结果。

但是我们发现,发送消息是在服务消费者注册的时候,发送了一条消息,这显然不符合我们的需求。

这一章我们完善下服务消费者发送消息的基础功能

实现

修改服务消费者:com.xpc.rpc.consumer.RpcConsumer

package com.xpc.rpc.consumer;

import com.xpc.rpc.codec.RpcDecoder;
import com.xpc.rpc.codec.RpcEncoder;
import com.xpc.rpc.consumer.handler.RpcConsumerHandler;
import com.xpc.rpc.protocol.ProtocolMessage;
import com.xpc.rpc.protocol.request.RpcRequest;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class RpcConsumer {

private static final Logger LOGGER = LoggerFactory.getLogger(RpcConsumer.class);

private Bootstrap bootstrap;

private EventLoopGroup eventLoopGroup;

private Map handlerMap = new ConcurrentHashMap();

public RpcConsumer() {
eventLoopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();

bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
//我们自己的编解码器
pipeline.addLast(new RpcDecoder());
pipeline.addLast(new RpcEncoder());
pipeline.addLast(new RpcConsumerHandler());
}
});
}

public void sendRequest(ProtocolMessage requestProtocolMessage) {
//先写死,后续会通过注册中心获取
String host = "127.0.0.1";
int port = 21778;
RpcRequest request = requestProtocolMessage.getT();
String key = buildHandlerMapKey(request.getClassName(), request.getMethodName());
RpcConsumerHandler consumerHandler;
if(handlerMap.containsKey(key)) {
consumerHandler = handlerMap.get(key);
Channel channel = consumerHandler.getChannel();
if(!channel.isOpen() || !channel.isActive()) {
consumerHandler = getConsumerHandler(key, host, port);
handlerMap.put(buildHandlerMapKey(request.getClassName(),request.getMethodName()),consumerHandler);
}
}else {
consumerHandler = getConsumerHandler(key, host, port);
if(consumerHandler == null) {
throw new RuntimeException("");
}
handlerMap.put(buildHandlerMapKey(request.getClassName(),request.getMethodName()),consumerHandler);
}
//发送消息
consumerHandler.sendRequest(requestProtocolMessage);
}

private RpcConsumerHandler getConsumerHandler(String key,String host, int port) {
try {
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
channelFuture.addListener(new GenericFutureListener

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论