写在前面
本文看下如何使用redis来实现一个类似于redis官方提供的redis-cli.exe的客户端工具。
1:用到的模块
主要需要用到netty针对redis的编解码模块,可以解析redis的协议,从而可以实现和redis交互的功能。
2:正文
首先来定义客户端类:
package com.dahuyou.netty.redis.cli;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.GenericFutureListener;import javax.swing.plaf.synth.SynthRadioButtonMenuItemUI;
import java.io.BufferedReader;
import java.io.InputStreamReader;public class RedisClient {String host; // 目标主机int port; // 目标主机端口public RedisClient(String host,int port){this.host = host;this.port = port;}public void start() throws Exception{EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new RedisClientInitializer());Channel channel = bootstrap.connect(host, port).sync().channel();System.out.println(" connected to host : " + host + ", port : " + port);System.out.println(" type redis's command to communicate with redis-server or type 'quit' to shutdown ");BufferedReader in = new BufferedReader(new InputStreamReader(System.in));ChannelFuture lastWriteFuture = null;for (;;) {Thread.sleep(1000);RedisClientHandler.serialNum = 0;System.out.println(host + ":" + port + ">");String s = in.readLine();if(s.equalsIgnoreCase("quit")) {break;}
// System.out.print(">");lastWriteFuture = channel.writeAndFlush(s);lastWriteFuture.addListener(new GenericFutureListener<ChannelFuture>() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {System.err.print("write failed: ");future.cause().printStackTrace(System.err);}}});}if (lastWriteFuture != null) {lastWriteFuture.sync();}System.out.println(" bye ");}finally {group.shutdownGracefully();}}public static void main(String[] args) throws Exception{
// RedisClient client = new RedisClient("192.168.56.10",6379);RedisClient client = new RedisClient("127.0.0.1",6379);client.start();}}
这里redis server其实就是一个tcp server的角色了。
在启动类中同一个for (;;) {
的死循环来等待用户录入信息,类似于redis-cli.exe的如下功能:
另外,通过RedisClientInitializer设置协议解析的编解码器,如下:
package com.dahuyou.netty.redis.cli;import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.redis.RedisArrayAggregator;
import io.netty.handler.codec.redis.RedisBulkStringAggregator;
import io.netty.handler.codec.redis.RedisDecoder;
import io.netty.handler.codec.redis.RedisEncoder;public class RedisClientInitializer extends ChannelInitializer<Channel> {@Overrideprotected void initChannel(Channel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new RedisDecoder());pipeline.addLast(new RedisBulkStringAggregator());pipeline.addLast(new RedisArrayAggregator());pipeline.addLast(new RedisEncoder());pipeline.addLast(new RedisClientHandler());}
}
RedisDecoder,RedisBulkStringAggregator,RedisArrayAggregator,RedisEncoder这几个类都是redis codec模块提供的编解码类,如下:
RedisClientHandler是我们自定义的业务处理类,源码如下:
package com.dahuyou.netty.redis.cli;import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.CodecException;
import io.netty.handler.codec.redis.*;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;import java.util.ArrayList;
import java.util.List;public class RedisClientHandler extends ChannelDuplexHandler {// 发送 redis 命令@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {String[] commands = ((String) msg).split("\\s+");List<RedisMessage> children = new ArrayList<>(commands.length);for (String cmdString : commands) {children.add(new FullBulkStringRedisMessage(ByteBufUtil.writeUtf8(ctx.alloc(), cmdString)));}RedisMessage request = new ArrayRedisMessage(children);ctx.write(request, promise);}// 接收 redis 响应数据@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {RedisMessage redisMessage = (RedisMessage) msg;// 打印响应消息printAggregatedRedisResponse(redisMessage);// 是否资源ReferenceCountUtil.release(redisMessage);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {System.err.print("exceptionCaught: ");cause.printStackTrace(System.err);ctx.close();}private static void printAggregatedRedisResponse(RedisMessage msg) {if (msg instanceof SimpleStringRedisMessage) {System.out.println(((SimpleStringRedisMessage) msg).content());} else if (msg instanceof ErrorRedisMessage) {System.out.println(((ErrorRedisMessage) msg).content());} else if (msg instanceof IntegerRedisMessage) {System.out.println(((IntegerRedisMessage) msg).value());} else if (msg instanceof FullBulkStringRedisMessage) {System.out.println(getString((FullBulkStringRedisMessage) msg));} else if (msg instanceof ArrayRedisMessage) {for (RedisMessage child : ((ArrayRedisMessage) msg).children()) {printAggregatedRedisResponse(child);}} else {throw new CodecException("unknown message type: " + msg);}}public static int serialNum = 0;private static String getString(FullBulkStringRedisMessage msg) {if (msg.isNull()) {return "(null)";}return ++serialNum + ") " +msg.content().toString(CharsetUtil.UTF_8);}}
运行测试:
写在后面
参考文章列表
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。 。
netty之导入源码到idea 。