Netty是如何解析Redis的RESP协议—响应篇

2024年 5月 16日 50.5k 0

这篇是响应篇,一起来看看  RedisDecoderTest 中,是怎么模拟 client-cli 接受处理 server 响应的👇

RedisDecoderTest

public class RedisDecoderTest {
    public static void main(String[] args) {
         EmbeddedChannel channel = newChannel(false);


        System.out.println(channel.writeInbound(byteBufOf("$6\r\nfoobar\r")));
        System.out.println(channel.writeInbound(byteBufOf("\n")));


        RedisMessage msg = channel.readInbound();
        System.out.println(msg instanceof FullBulkStringRedisMessage);

        String bytes = stringOf(((FullBulkStringRedisMessage) msg).content());
        System.out.println(bytes);


        ReferenceCountUtil.release(msg);

        channel.finish();
    }
    private static EmbeddedChannel newChannel(boolean decodeInlineCommands) {
        return new EmbeddedChannel(
                new RedisDecoder(decodeInlineCommands),
                new RedisBulkStringAggregator(),
                new RedisArrayAggregator());
    }
}

图解

这里的重点就是这 3 个 ChannelInboundHandler 了。

Netty是如何解析Redis的RESP协议—响应篇-1图片

具备 decode 能力 👇

Netty是如何解析Redis的RESP协议—响应篇-2图片

下面进入源码解读:

何时调用到 decode 方法

当进行 channelRead  时进行 decode,比如  MessageToMessageDecoder  👇

Netty是如何解析Redis的RESP协议—响应篇-3图片

RedisDecoder

里面定义了 5 种 State

Netty是如何解析Redis的RESP协议—响应篇-4图片

比如上面例子中,传输的  $6\r\nfoobar\r\n  ,就属于 RESP 协议中的 Bulk strings  大字符串,需要解析出 length 和 content,格式如下 :

$\r\n\r\n
比如
$5\r\nhello\r\n
$0\r\n\r\n

关键步骤

Netty是如何解析Redis的RESP协议—响应篇-5图片

decode 时,由于默认的 state 都是  DECODE_TYPE ,所以会先调用 decodeType 方法。

Netty是如何解析Redis的RESP协议—响应篇-6图片

decodeType

看看是不是 inline 的,默认是 false,我们也是设置了 false。

Netty是如何解析Redis的RESP协议—响应篇-7图片

decodeLength

Netty是如何解析Redis的RESP协议—响应篇-8图片

这里可以看到官网 Fast to parse 的影子。

Netty是如何解析Redis的RESP协议—响应篇-9图片

Netty是如何解析Redis的RESP协议—响应篇-10图片

decodeBulkString

创建 BulkStringHeaderRedisMessage,再把 state 切换到 DECODE_BULK_STRING_CONTENT ,最后调用 decodeBulkStringContent 。

Netty是如何解析Redis的RESP协议—响应篇-11图片

decodeBulkStringContent

创建 DefaultBulkStringRedisContent,并添加到 out 这个 list 中(2个)

Netty是如何解析Redis的RESP协议—响应篇-12图片

接着,就来到第二个 handler 了 ,RedisBulkStringAggregator

RedisBulkStringAggregator

起到一个聚合的作用,将消息包装成 FullBulkStringRedisMessage。

Netty是如何解析Redis的RESP协议—响应篇-13图片

这个 decode 方法超过 100 行了,就粗略讲一下。

在上面的方法中,我们往 out 中添加了 BulkStringHeaderRedisMessage 和 DefaultBulkStringRedisContent 这两个。

Netty是如何解析Redis的RESP协议—响应篇-14图片

消息头处理

先处理 BulkStringHeaderRedisMessage ,

Netty是如何解析Redis的RESP协议—响应篇-15图片

包装成 FullBulkStringRedisMessage 。

Netty是如何解析Redis的RESP协议—响应篇-16图片

消息体处理

Netty是如何解析Redis的RESP协议—响应篇-17图片

appendPartialContent,把这个 ByteBuf 整合到 CompositeByteBuf 中。

Netty是如何解析Redis的RESP协议—响应篇-18图片

aggregate,扩展方法,目前是空实现。

最后,判断是不是消息尾

Netty是如何解析Redis的RESP协议—响应篇-19图片

到了这里,handler 就处理完了,因为这个消息不是数组类型的,用不到 RedisArrayAggregator 。

第二次 writeInbound

上面代码中共调用了两次 writeInbound

System.out.println(channel.writeInbound(byteBufOf("$6\r\nfoobar\r")));
 System.out.println(channel.writeInbound(byteBufOf("\n")));

第二次时,会把之前的 bytebuf 拿出来计算下。

Netty是如何解析Redis的RESP协议—响应篇-20图片

可以看到,oldBytes 是 \r ,newBytes 则是 \n ,重新组合成新的 ByteBuf。

Netty是如何解析Redis的RESP协议—响应篇-21图片

这样才能去创建这个 DefaultLastBulkStringRedisContent

Netty是如何解析Redis的RESP协议—响应篇-22图片

进而完成  RedisBulkStringAggregator 中的 last 条件分支。

Netty是如何解析Redis的RESP协议—响应篇-23图片

最后消息被包装成 FullBulkStringRedisMessage。

尾节点  TailContext

经过上面的层层处理,foobar 这个 FullBulkStringRedisMessage 消息是怎么存到 EmbeddedChannel 中呢?

可以看到这里继承了 DefaultChannelPipeline,并重写了 onUnhandledInboundMessage 方法。

Netty是如何解析Redis的RESP协议—响应篇-24图片

DefaultChannelPipeline 中有尾节点 TailContext,它会去调用这个 onUnhandledInboundMessage 。

Netty是如何解析Redis的RESP协议—响应篇-25图片

进而将消息存到队列中。

Netty是如何解析Redis的RESP协议—响应篇-26图片

最后,readInbound 就是从里面 poll 出来这个消息,再进行打印等操作即可。

Netty是如何解析Redis的RESP协议—响应篇-27图片

官方例子

我从 Netty 的 example 里 CV 了一份,大家可以快速上手。

使用时,主要还是注意这个 inbound ,outbound 的顺序问题(如图)。

Netty是如何解析Redis的RESP协议—响应篇-28图片

/**
 * Simple Redis client that demonstrates Redis commands against a Redis server.
 */
public class RedisClient {
    private static final String HOST = System.getProperty("host", "192.168.200.128");
    private static final int PORT = Integer.parseInt(System.getProperty("port", "6379"));

    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .handler(new ChannelInitializer() {
                 @Override
                 protected void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     p.addLast(new RedisDecoder());
                     p.addLast(new RedisBulkStringAggregator());
                     p.addLast(new RedisArrayAggregator());
                     p.addLast(new RedisEncoder());
                     p.addLast(new RedisClientHandler());
                 }
             });

            // Start the connection attempt.
            Channel ch = b.connect(HOST, PORT).sync().channel();

            // Read commands from the stdin.
            System.out.println("Enter Redis commands (quit to end)");
            ChannelFuture lastWriteFuture = null;
            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
            for (;;) {
                final String input = in.readLine();
                final String line = input != null ? input.trim() : null;
                if (line == null || "quit".equalsIgnoreCase(line)) { // EOF or "quit"
                    ch.close().sync();
                    break;
                } else if (line.isEmpty()) { // skip `enter` or `enter` with spaces.
                    continue;
                }
                // Sends the received line to the server.
                lastWriteFuture = ch.writeAndFlush(line);
                lastWriteFuture.addListener(new GenericFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            System.err.print("write failed: ");
                            future.cause().printStackTrace(System.err);
                        }
                    }
                });
            }

            // Wait until all messages are flushed before closing the channel.
            if (lastWriteFuture != null) {
                lastWriteFuture.sync();
            }
        } finally {
            group.shutdownGracefully();
        }
    }
}


/**
 * An example Redis client handler. This handler read input from STDIN and write output to STDOUT.
 */
public class RedisClientHandler extends ChannelDuplexHandler {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        String[] commands = ((String) msg).split("\\s+");
        List children = new ArrayList(commands.length);
        for (String cmdString : commands) {
            children.add(new FullBulkStringRedisMessage(ByteBufUtil.writeUtf8(ctx.alloc(), cmdString)));
        }
        RedisMessage request = new ArrayRedisMessage(children);
        ctx.write(request, promise);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        RedisMessage redisMessage = (RedisMessage) msg;
        printAggregatedRedisResponse(redisMessage);
        ReferenceCountUtil.release(redisMessage);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        System.err.print("exceptionCaught: ");
        cause.printStackTrace(System.err);
        ctx.close();
    }

    private static void printAggregatedRedisResponse(RedisMessage msg) {
        if (msg instanceof SimpleStringRedisMessage) {
            System.out.println(((SimpleStringRedisMessage) msg).content());
        } else if (msg instanceof ErrorRedisMessage) {
            System.out.println(((ErrorRedisMessage) msg).content());
        } else if (msg instanceof IntegerRedisMessage) {
            System.out.println(((IntegerRedisMessage) msg).value());
        } else if (msg instanceof FullBulkStringRedisMessage) {
            System.out.println(getString((FullBulkStringRedisMessage) msg));
        } else if (msg instanceof ArrayRedisMessage) {
            for (RedisMessage child : ((ArrayRedisMessage) msg).children()) {
                printAggregatedRedisResponse(child);
            }
        } else {
            throw new CodecException("unknown message type: " + msg);
        }
    }

    private static String getString(FullBulkStringRedisMessage msg) {
        if (msg.isNull()) {
            return "(null)";
        }
        return msg.content().toString(CharsetUtil.UTF_8);
    }
}

结尾

这篇比请求篇稍微复杂些,还有 TailContext 这个隐藏的细节。

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论