手撸RPC框架 自定义编解码实现

2023年 7月 13日 98.8k 0

大家好,我是小趴菜,接下来我会从0到1手写一个RPC框架,该专题包括以下专题,有兴趣的小伙伴就跟着我一起学习吧

本章源码地址:gitee.com/baojh123/se…

自定义注解 -> opt-01
服务提供者收发消息基础实现 -> opt-01
自定义网络传输协议的实现 -> opt-02
自定义编解码实现 -> opt-03
服务提供者调用真实方法实现 -> opt-04
完善服务消费者发送消息基础功能 -> opt-05
注册中心基础功能实现 -> opt-06
服务提供者整合注册中心 -> opt-07
服务消费者整合注册中心 -> opt-08
完善服务消费者接收响应结果 -> opt-09
服务消费者,服务提供者整合SpringBoot -> opt-10
动态代理屏蔽RPC服务调用底层细节 -> opt-10
SPI机制基础功能实现 -> opt-11
SPI机制扩展随机负载均衡策略 -> opt-12
SPI机制扩展轮询负载均衡策略 -> opt-13
SPI机制扩展JDK序列化 -> opt-14
SPI机制扩展JSON序列化 -> opt-15
SPI机制扩展protustuff序列化 -> opt-16

前言

在上一章中我们设计了自己的网络传输协议,并且实现了。但是现在的服务端只能接收String类型的协议。也就是Netty自带的。如果我们此时直接使用是会有问题的。所以我们还需要实现自己的编解码器

设计

image.png

服务消费者通过自定义网络传输协议发送请求,首先会通过编码器的处理,然后经过网络传输到达服务提供者,服务提供者通过解码器处理之后,拿到我们能够处理的数据。之后返回响应数据

实现

创建子项目 xpc-rpc-codec,pom.xml文件如下



    
        xpc-rpc
        com.xpc
        1.0-SNAPSHOT
    
    4.0.0

    xpc-rpc-codec


    
        
            com.xpc
            xpc-rpc-protocol
            1.0-SNAPSHOT
        
        
            com.xpc
            xpc-rpc-serialization-jdk
            1.0-SNAPSHOT
        
        
            com.xpc
            xpc-rpc-common
            1.0-SNAPSHOT
        
    

创建一个通用的接口 com.xpc.rpc.codec.RpcCoder

package com.xpc.rpc.codec;

import com.xpc.rpc.common.serialization.Serialization;

/**
 * 通用接口
 */
public interface RpcCoder {

    /**
     * 获取具体的序列化接口
     * 后续会使用SPI机制进行扩展,现在默认给一个JDK的序列化方式
     */
    Serialization getJdkSerialization();
}

创建解码器:com.xpc.rpc.codec.RpcDecoder

package com.xpc.rpc.codec;

import com.xpc.rpc.common.enums.RpcMsgType;
import com.xpc.rpc.common.serialization.Serialization;
import com.xpc.rpc.protocol.ProtocolMessage;
import com.xpc.rpc.protocol.header.RpcHeader;
import com.xpc.rpc.protocol.request.RpcRequest;
import com.xpc.rpc.protocol.response.RpcResponse;
import com.xpc.rpc.serialization.jdk.JdkSerialization;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**
 * 解码器
 */
public class RpcDecoder extends ByteToMessageDecoder implements RpcCoder {


    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
        if(in.readableBytes() < 16) {
            return;
        }
        //拿到请求消息类型
        int msgType = in.readInt();

        //拿到请求体长度
        int bodyLen = in.readInt();

        //拿到请求id
        long requestId = in.readLong();

        if(in.readableBytes() < bodyLen) {
            //表示读取的数据还不够
            in.resetReaderIndex();
            return;
        }

        //获取请求体
        byte[] data = new byte[bodyLen];
        in.readBytes(data);

        RpcHeader rpcHeader = new RpcHeader();
        rpcHeader.setRequestId(requestId);
        rpcHeader.setBodyLen(bodyLen);
        rpcHeader.setMsgType(msgType);

        ProtocolMessage protocolMessage = new ProtocolMessage();
        protocolMessage.setRpcHeader(rpcHeader);

        //获取序列化接口,后续会使用SPI机制扩展
        Serialization jdkSerialization = getJdkSerialization();
        RpcMsgType rpcMsgType = RpcMsgType.findByType(msgType);
        switch (rpcMsgType){
            case REQUEST:
                RpcRequest request = jdkSerialization.deserialize(data, RpcRequest.class);
                protocolMessage.setT(request);
                break;
            case RESPONSE:
                RpcResponse response = jdkSerialization.deserialize(data, RpcResponse.class);
                protocolMessage.setT(response);
                break;
        }

        //更新读索引
        in.markReaderIndex();

        //最后通过管道写出去
        out.add(protocolMessage);
    }

    @Override
    public Serialization getJdkSerialization() {
        return new JdkSerialization();
    }
}

对于该类中第一个判断为什么是16,这是解释一下,一个完整的请求包括请求头和请求体,我们请求头包括 消息类型,请求体长度,请求唯一ID,int是4字节,long占8字节,所以请求头一共占16字节,那还有请求体呢? 其实一个请求的请求体为空也是有可能的,也就是 bodyLen = 0的时候,所以一个完整的请求大小至少要等于 16字节

/**
 * 请求类型,是普通请求,或者是心跳消息等
 */
private int msgType;

/**
 * 消息体的长度
 */
private int bodyLen;

/**
 * 请求的唯一ID
 */
private Long requestId;

创建编码器:com.xpc.rpc.codec.RpcEncoder

package com.xpc.rpc.codec;

import com.xpc.rpc.common.serialization.Serialization;
import com.xpc.rpc.protocol.ProtocolMessage;
import com.xpc.rpc.serialization.jdk.JdkSerialization;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;


public class RpcEncoder extends MessageToByteEncoder implements RpcCoder {


    @Override
    public Serialization getJdkSerialization() {
        return new JdkSerialization();
    }


    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, ProtocolMessage protocolMessage, ByteBuf out) throws Exception {
        out.writeInt(protocolMessage.getRpcHeader().getMsgType());
        byte[] bytes = getJdkSerialization().serialize(protocolMessage.getT());
        out.writeInt(bytes.length);
        out.writeLong(protocolMessage.getRpcHeader().getRequestId());
        out.writeBytes(bytes);
    }
}

然后将服务端和客户端的编解码器都替换成我们自己实现的编解码器

服务端:com.xpc.rpc.provider.server.base.BaseServer,修改startNettyServer()方法

@Override
public void startNettyServer() {
    
    //其它代码省略
    bootstrap.group(bossGroup,workerGroup)
            .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG,128)
            .childOption(ChannelOption.SO_KEEPALIVE,true)
            .childHandler(new ChannelInitializer() {
                @Override
                protected void initChannel(SocketChannel channel) throws Exception {
                    ChannelPipeline pipeline = channel.pipeline();
                    
                    //Netty自带的编解码器
                    //pipeline.addLast(new StringDecoder());
                    //pipeline.addLast(new StringEncoder());
                    
                    //替换成我们自己的编解码器
                    pipeline.addLast(new RpcDecoder());
                    pipeline.addLast(new RpcEncoder());
                    //我们自己实现的处理器
                    pipeline.addLast(new RpcProviderHandler(handlerMap));
                }
            });

    try {
        ChannelFuture channelFuture = bootstrap.bind("127.0.0.1", 21778).sync();
        LOGGER.info("Netty 服务端启动成功............");
        channelFuture.channel().closeFuture().sync();
    } catch (InterruptedException e) {
        LOGGER.error("Netty 服务端启动失败:{}",e);
    }finally {
        shutDown();
    }
}

修改com.xpc.rpc.provide.handler.RpcProviderHandler

package com.xpc.rpc.provider.handler;

import com.xpc.rpc.common.enums.RpcMsgType;
import com.xpc.rpc.protocol.ProtocolMessage;
import com.xpc.rpc.protocol.header.RpcHeader;
import com.xpc.rpc.protocol.request.RpcRequest;
import com.xpc.rpc.protocol.response.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class RpcProviderHandler extends SimpleChannelInboundHandler {

    private static final Logger LOGGER = LoggerFactory.getLogger(RpcProviderHandler.class);

    private Map handlerMap;

    public RpcProviderHandler(Map handlerMap) {
        this.handlerMap = handlerMap;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ProtocolMessage msg) throws Exception {
        LOGGER.info("className: {}",msg.getT().getClassName());
        LOGGER.info("method:{}",msg.getT().getMethodName());
        LOGGER.info("ParameterTypes:{}",msg.getT().getParameterTypes());
        LOGGER.info("Parameters:{}",msg.getT().getParameters());

        //写回响应数据
        ProtocolMessage protocolMessage = new ProtocolMessage();
        RpcHeader rpcHeader = msg.getRpcHeader();
        rpcHeader.setMsgType(RpcMsgType.RESPONSE.getType());
        protocolMessage.setRpcHeader(rpcHeader);

        RpcResponse response = new RpcResponse();
        response.setCode(200);
        response.setData("success");
        protocolMessage.setT(response);
        ctx.writeAndFlush(protocolMessage);
    }
}

修改 com.xpc.rpc.consumer.RpcConsumer 的构造方法方法

public RpcConsumer() {
eventLoopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();

bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();

//Netty自带的编解码器
//pipeline.addLast(new StringDecoder());
//pipeline.addLast(new StringEncoder());

//替换成我们自己的编解码器
pipeline.addLast(new RpcDecoder());
pipeline.addLast(new RpcEncoder());
pipeline.addLast(new RpcConsumerHandler());
}
});
try {
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 21778).sync();
channelFuture.addListener(new GenericFutureListener

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论