上文请求篇中,通过给 channel 添加 RedisEncoder 来处理不同类型的 RedisMessage ,比如 简单字符串,大字符串 等。
这篇是响应篇,一起来看看 RedisDecoderTest 中,是怎么模拟 client-cli 接受处理 server 响应的👇
RedisDecoderTest
public class RedisDecoderTest {
public static void main(String[] args) {
EmbeddedChannel channel = newChannel(false);
System.out.println(channel.writeInbound(byteBufOf("$6\r\nfoobar\r")));
System.out.println(channel.writeInbound(byteBufOf("\n")));
RedisMessage msg = channel.readInbound();
System.out.println(msg instanceof FullBulkStringRedisMessage);
String bytes = stringOf(((FullBulkStringRedisMessage) msg).content());
System.out.println(bytes);
ReferenceCountUtil.release(msg);
channel.finish();
}
private static EmbeddedChannel newChannel(boolean decodeInlineCommands) {
return new EmbeddedChannel(
new RedisDecoder(decodeInlineCommands),
new RedisBulkStringAggregator(),
new RedisArrayAggregator());
}
}
图解
这里的重点就是这 3 个 ChannelInboundHandler 了。
具备decode能力
下面进入源码解读
何时调用到decode方法
当进行 channelRead 时进行 decode,比如 MessageToMessageDecoder
RedisDecoder
里面定义了 5 种 State
比如上面例子中,传输的 $6\r\nfoobar\r\n
,就属于 RESP 协议中的 Bulk strings 大字符串,需要解析出 length 和 content,格式如下
$<length>\r\n<data>\r\n
比如
$5\r\nhello\r\n
$0\r\n\r\n
关键步骤
decode 时,由于默认的 state 都是 DECODE_TYPE ,所以会先调用 decodeType 方法。
decodeType
看看是不是 inline 的,默认是 false,我们也是设置了 false。
decodeLength
这里可以看到官网 Fast to parse 的影子。
decodeBulkString
创建 BulkStringHeaderRedisMessage,再把 state 切换到 DECODE_BULK_STRING_CONTENT ,最后调用 decodeBulkStringContent 。
decodeBulkStringContent
创建 DefaultBulkStringRedisContent,并添加到 out 这个 list 中(2个)
接着,就来到第二个 handler 了 ,RedisBulkStringAggregator
RedisBulkStringAggregator
起到一个聚合的作用,将消息包装成 FullBulkStringRedisMessage。
这个 decode 方法超过 100 行了,就粗略讲一下。
在上面的方法中,我们往 out 中添加了 BulkStringHeaderRedisMessage 和 DefaultBulkStringRedisContent 这两个。
消息头处理
先处理 BulkStringHeaderRedisMessage ,
包装成 FullBulkStringRedisMessage 。
消息体处理
appendPartialContent,把这个 ByteBuf 整合到 CompositeByteBuf 中。
aggregate,扩展方法,目前是空实现。
最后,判断是不是消息尾
到了这里,handler 就处理完了,因为这个消息不是数组类型的,用不到 RedisArrayAggregator 。
第二次 writeInbound
上面代码中共调用了两次 writeInbound
System.out.println(channel.writeInbound(byteBufOf("$6\r\nfoobar\r")));
System.out.println(channel.writeInbound(byteBufOf("\n")));
第二次时,会把之前的 bytebuf 拿出来计算下。
可以看到,oldBytes 是 \r
,newBytes 则是 \n
,重新组合成新的 ByteBuf。
这样才能去创建这个 DefaultLastBulkStringRedisContent
进而完成 RedisBulkStringAggregator 中的 last 条件分支。
最后消息被包装成 FullBulkStringRedisMessage。
尾节点 TailContext
经过上面的层层处理,foobar
这个 FullBulkStringRedisMessage 消息是怎么存到 EmbeddedChannel 中呢?
可以看到这里继承了 DefaultChannelPipeline,并重写了 onUnhandledInboundMessage 方法。
DefaultChannelPipeline 中有尾节点 TailContext,它会去调用这个 onUnhandledInboundMessage 。
进而将消息存到队列中。
最后,readInbound 就是从里面 poll 出来这个消息,再进行打印等操作即可。
官方例子
我从 Netty 的 example 里 CV 了一份,大家可以快速上手。
使用时,主要还是注意这个 inbound ,outbound 的顺序问题(如图)。
/**
* Simple Redis client that demonstrates Redis commands against a Redis server.
*/
public class RedisClient {
private static final String HOST = System.getProperty("host", "192.168.200.128");
private static final int PORT = Integer.parseInt(System.getProperty("port", "6379"));
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new RedisDecoder());
p.addLast(new RedisBulkStringAggregator());
p.addLast(new RedisArrayAggregator());
p.addLast(new RedisEncoder());
p.addLast(new RedisClientHandler());
}
});
// Start the connection attempt.
Channel ch = b.connect(HOST, PORT).sync().channel();
// Read commands from the stdin.
System.out.println("Enter Redis commands (quit to end)");
ChannelFuture lastWriteFuture = null;
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
for (;;) {
final String input = in.readLine();
final String line = input != null ? input.trim() : null;
if (line == null || "quit".equalsIgnoreCase(line)) { // EOF or "quit"
ch.close().sync();
break;
} else if (line.isEmpty()) { // skip `enter` or `enter` with spaces.
continue;
}
// Sends the received line to the server.
lastWriteFuture = ch.writeAndFlush(line);
lastWriteFuture.addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
System.err.print("write failed: ");
future.cause().printStackTrace(System.err);
}
}
});
}
// Wait until all messages are flushed before closing the channel.
if (lastWriteFuture != null) {
lastWriteFuture.sync();
}
} finally {
group.shutdownGracefully();
}
}
}
/**
* An example Redis client handler. This handler read input from STDIN and write output to STDOUT.
*/
public class RedisClientHandler extends ChannelDuplexHandler {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
String[] commands = ((String) msg).split("\\s+");
List<RedisMessage> children = new ArrayList<RedisMessage>(commands.length);
for (String cmdString : commands) {
children.add(new FullBulkStringRedisMessage(ByteBufUtil.writeUtf8(ctx.alloc(), cmdString)));
}
RedisMessage request = new ArrayRedisMessage(children);
ctx.write(request, promise);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
RedisMessage redisMessage = (RedisMessage) msg;
printAggregatedRedisResponse(redisMessage);
ReferenceCountUtil.release(redisMessage);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.err.print("exceptionCaught: ");
cause.printStackTrace(System.err);
ctx.close();
}
private static void printAggregatedRedisResponse(RedisMessage msg) {
if (msg instanceof SimpleStringRedisMessage) {
System.out.println(((SimpleStringRedisMessage) msg).content());
} else if (msg instanceof ErrorRedisMessage) {
System.out.println(((ErrorRedisMessage) msg).content());
} else if (msg instanceof IntegerRedisMessage) {
System.out.println(((IntegerRedisMessage) msg).value());
} else if (msg instanceof FullBulkStringRedisMessage) {
System.out.println(getString((FullBulkStringRedisMessage) msg));
} else if (msg instanceof ArrayRedisMessage) {
for (RedisMessage child : ((ArrayRedisMessage) msg).children()) {
printAggregatedRedisResponse(child);
}
} else {
throw new CodecException("unknown message type: " + msg);
}
}
private static String getString(FullBulkStringRedisMessage msg) {
if (msg.isNull()) {
return "(null)";
}
return msg.content().toString(CharsetUtil.UTF_8);
}
}
结尾
这篇比请求篇稍微复杂些,还有 TailContext 这个隐藏的细节。
暂无评论内容