Netty是如何解析Redis的RESP协议——响应篇

上文请求篇中,通过给 channel 添加 RedisEncoder 来处理不同类型的 RedisMessage ,比如 简单字符串,大字符串 等。

这篇是响应篇,一起来看看  RedisDecoderTest 中,是怎么模拟 client-cli 接受处理 server 响应的👇

RedisDecoderTest

public class RedisDecoderTest {
    public static void main(String[] args) {
         EmbeddedChannel channel = newChannel(false);


        System.out.println(channel.writeInbound(byteBufOf("$6\r\nfoobar\r")));
        System.out.println(channel.writeInbound(byteBufOf("\n")));


        RedisMessage msg = channel.readInbound();
        System.out.println(msg instanceof FullBulkStringRedisMessage);

        String bytes = stringOf(((FullBulkStringRedisMessage) msg).content());
        System.out.println(bytes);


        ReferenceCountUtil.release(msg);

        channel.finish();
    }
    private static EmbeddedChannel newChannel(boolean decodeInlineCommands) {
        return new EmbeddedChannel(
                new RedisDecoder(decodeInlineCommands),
                new RedisBulkStringAggregator(),
                new RedisArrayAggregator());
    }
}

图解

这里的重点就是这 3 个 ChannelInboundHandler 了。

图片[1]-Netty是如何解析Redis的RESP协议——响应篇-编程社

具备decode能力

图片[2]-Netty是如何解析Redis的RESP协议——响应篇-编程社

下面进入源码解读

何时调用到decode方法

当进行 channelRead  时进行 decode,比如  MessageToMessageDecoder

图片[3]-Netty是如何解析Redis的RESP协议——响应篇-编程社

RedisDecoder

里面定义了 5 种 State

图片[4]-Netty是如何解析Redis的RESP协议——响应篇-编程社

比如上面例子中,传输的  $6\r\nfoobar\r\n  ,就属于 RESP 协议中的 Bulk strings  大字符串,需要解析出 length 和 content,格式如下

$<length>\r\n<data>\r\n
比如
$5\r\nhello\r\n
$0\r\n\r\n

关键步骤

图片[5]-Netty是如何解析Redis的RESP协议——响应篇-编程社

decode 时,由于默认的 state 都是  DECODE_TYPE ,所以会先调用 decodeType 方法。

图片[6]-Netty是如何解析Redis的RESP协议——响应篇-编程社

decodeType

看看是不是 inline 的,默认是 false,我们也是设置了 false。

图片[7]-Netty是如何解析Redis的RESP协议——响应篇-编程社

decodeLength

图片[8]-Netty是如何解析Redis的RESP协议——响应篇-编程社

这里可以看到官网 Fast to parse 的影子。

图片[9]-Netty是如何解析Redis的RESP协议——响应篇-编程社
图片[10]-Netty是如何解析Redis的RESP协议——响应篇-编程社

decodeBulkString

创建 BulkStringHeaderRedisMessage,再把 state 切换到 DECODE_BULK_STRING_CONTENT ,最后调用 decodeBulkStringContent 。

图片[11]-Netty是如何解析Redis的RESP协议——响应篇-编程社

decodeBulkStringContent

创建 DefaultBulkStringRedisContent,并添加到 out 这个 list 中(2个)

图片[12]-Netty是如何解析Redis的RESP协议——响应篇-编程社

接着,就来到第二个 handler 了 ,RedisBulkStringAggregator

RedisBulkStringAggregator

起到一个聚合的作用,将消息包装成 FullBulkStringRedisMessage。

图片[13]-Netty是如何解析Redis的RESP协议——响应篇-编程社

这个 decode 方法超过 100 行了,就粗略讲一下。

在上面的方法中,我们往 out 中添加了 BulkStringHeaderRedisMessage 和 DefaultBulkStringRedisContent 这两个。

图片[14]-Netty是如何解析Redis的RESP协议——响应篇-编程社

消息头处理

先处理 BulkStringHeaderRedisMessage ,

图片[15]-Netty是如何解析Redis的RESP协议——响应篇-编程社

包装成 FullBulkStringRedisMessage 。

图片[16]-Netty是如何解析Redis的RESP协议——响应篇-编程社

消息体处理

图片[17]-Netty是如何解析Redis的RESP协议——响应篇-编程社

appendPartialContent,把这个 ByteBuf 整合到 CompositeByteBuf 中。

图片[18]-Netty是如何解析Redis的RESP协议——响应篇-编程社

aggregate,扩展方法,目前是空实现。

最后,判断是不是消息尾

图片[19]-Netty是如何解析Redis的RESP协议——响应篇-编程社

到了这里,handler 就处理完了,因为这个消息不是数组类型的,用不到 RedisArrayAggregator 。

第二次 writeInbound

上面代码中共调用了两次 writeInbound

 System.out.println(channel.writeInbound(byteBufOf("$6\r\nfoobar\r")));
 System.out.println(channel.writeInbound(byteBufOf("\n")));

第二次时,会把之前的 bytebuf 拿出来计算下。

图片[20]-Netty是如何解析Redis的RESP协议——响应篇-编程社

可以看到,oldBytes 是 \r ,newBytes 则是 \n ,重新组合成新的 ByteBuf。

图片[21]-Netty是如何解析Redis的RESP协议——响应篇-编程社

这样才能去创建这个 DefaultLastBulkStringRedisContent

图片[22]-Netty是如何解析Redis的RESP协议——响应篇-编程社

进而完成  RedisBulkStringAggregator 中的 last 条件分支。

图片[23]-Netty是如何解析Redis的RESP协议——响应篇-编程社

最后消息被包装成 FullBulkStringRedisMessage。

尾节点  TailContext

经过上面的层层处理,foobar 这个 FullBulkStringRedisMessage 消息是怎么存到 EmbeddedChannel 中呢?

可以看到这里继承了 DefaultChannelPipeline,并重写了 onUnhandledInboundMessage 方法。

图片[24]-Netty是如何解析Redis的RESP协议——响应篇-编程社

DefaultChannelPipeline 中有尾节点 TailContext,它会去调用这个 onUnhandledInboundMessage 。

图片[25]-Netty是如何解析Redis的RESP协议——响应篇-编程社

进而将消息存到队列中。

图片[26]-Netty是如何解析Redis的RESP协议——响应篇-编程社

最后,readInbound 就是从里面 poll 出来这个消息,再进行打印等操作即可。

图片[27]-Netty是如何解析Redis的RESP协议——响应篇-编程社

官方例子

我从 Netty 的 example 里 CV 了一份,大家可以快速上手。

使用时,主要还是注意这个 inbound ,outbound 的顺序问题(如图)。

图片[28]-Netty是如何解析Redis的RESP协议——响应篇-编程社
/**
 * Simple Redis client that demonstrates Redis commands against a Redis server.
 */
public class RedisClient {
    private static final String HOST = System.getProperty("host", "192.168.200.128");
    private static final int PORT = Integer.parseInt(System.getProperty("port", "6379"));

    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     p.addLast(new RedisDecoder());
                     p.addLast(new RedisBulkStringAggregator());
                     p.addLast(new RedisArrayAggregator());
                     p.addLast(new RedisEncoder());
                     p.addLast(new RedisClientHandler());
                 }
             });

            // Start the connection attempt.
            Channel ch = b.connect(HOST, PORT).sync().channel();

            // Read commands from the stdin.
            System.out.println("Enter Redis commands (quit to end)");
            ChannelFuture lastWriteFuture = null;
            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
            for (;;) {
                final String input = in.readLine();
                final String line = input != null ? input.trim() : null;
                if (line == null || "quit".equalsIgnoreCase(line)) { // EOF or "quit"
                    ch.close().sync();
                    break;
                } else if (line.isEmpty()) { // skip `enter` or `enter` with spaces.
                    continue;
                }
                // Sends the received line to the server.
                lastWriteFuture = ch.writeAndFlush(line);
                lastWriteFuture.addListener(new GenericFutureListener<ChannelFuture>() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            System.err.print("write failed: ");
                            future.cause().printStackTrace(System.err);
                        }
                    }
                });
            }

            // Wait until all messages are flushed before closing the channel.
            if (lastWriteFuture != null) {
                lastWriteFuture.sync();
            }
        } finally {
            group.shutdownGracefully();
        }
    }
}


/**
 * An example Redis client handler. This handler read input from STDIN and write output to STDOUT.
 */
public class RedisClientHandler extends ChannelDuplexHandler {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        String[] commands = ((String) msg).split("\\s+");
        List<RedisMessage> children = new ArrayList<RedisMessage>(commands.length);
        for (String cmdString : commands) {
            children.add(new FullBulkStringRedisMessage(ByteBufUtil.writeUtf8(ctx.alloc(), cmdString)));
        }
        RedisMessage request = new ArrayRedisMessage(children);
        ctx.write(request, promise);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        RedisMessage redisMessage = (RedisMessage) msg;
        printAggregatedRedisResponse(redisMessage);
        ReferenceCountUtil.release(redisMessage);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        System.err.print("exceptionCaught: ");
        cause.printStackTrace(System.err);
        ctx.close();
    }

    private static void printAggregatedRedisResponse(RedisMessage msg) {
        if (msg instanceof SimpleStringRedisMessage) {
            System.out.println(((SimpleStringRedisMessage) msg).content());
        } else if (msg instanceof ErrorRedisMessage) {
            System.out.println(((ErrorRedisMessage) msg).content());
        } else if (msg instanceof IntegerRedisMessage) {
            System.out.println(((IntegerRedisMessage) msg).value());
        } else if (msg instanceof FullBulkStringRedisMessage) {
            System.out.println(getString((FullBulkStringRedisMessage) msg));
        } else if (msg instanceof ArrayRedisMessage) {
            for (RedisMessage child : ((ArrayRedisMessage) msg).children()) {
                printAggregatedRedisResponse(child);
            }
        } else {
            throw new CodecException("unknown message type: " + msg);
        }
    }

    private static String getString(FullBulkStringRedisMessage msg) {
        if (msg.isNull()) {
            return "(null)";
        }
        return msg.content().toString(CharsetUtil.UTF_8);
    }
}

结尾

这篇比请求篇稍微复杂些,还有 TailContext 这个隐藏的细节。

© 版权声明
THE END
喜欢就支持一下吧
点赞5赞赏 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容