`
QING____
  • 浏览: 2232169 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Redis编程实践【protocol】

 
阅读更多

    Redis Protocol即为client与server交互时,所使用的数据格式;符合格式的数据能够被server端解析并返回结果,client端如果按照格式要求既可以解析“结果”并将结构化数据反馈给调用者。有些时候,我们可以通过改造协议的方式构建redis-client套层或者server端的Proxy。

    Redis-Client与Server之间进行的任何通讯,均是通过普通的TCP链接进行,因为TCP通讯是面向字节流的,因此它和其他任何基于字节流的信息交互的平台一样,需要“protocol”(协议);任何protocol本身需要至少具备2种能力:1)字节码成帧策略 2)字符序列格式约束。

    “字节码成帧”被广泛的应用在基于字节流的网络交互中,以TCP通讯为例,在Socket通讯中所发送的数据(特别是长链接,持续交付packet的场景下),对于socket的一端都需要知道每个packet的“终止符”位置,以及packet中每个数据field的偏移量;只有这样,对于socket的数据的接收端才能将“无边界流数据”有效的转化成结构化/可读的字符序列。如下展示一个packet帧的结构,其中"[""]"只是为了标记参数:

    [magic-header][packet-bytes-length][field-name-bytes-length][field-name-bytes][field-data-bytes-length][field-data-bytes]

    实际数据大概为:

    [“magic-header”][10][4]["name".getBytes()][6]["012345".getBytes()]

    不同的设计者可能考虑成帧的方式不同,但是都需要描述当前packet字节的长度/filed顺序/每个filed的字节长度等。上述例子则表达:此packet需要以“magic-header”开头(主要用来防止字节流被意外破坏或者乱序,同时用来表示一个新帧的开始),此后的总字节长度为10,其中“name”这个filed名称占4个字节,“name”这个filed对应的数据占6个字节;最终我们还原成name=012345这么一个信息。

    “字符序列格式约束”即将字符串按照一定的规则进行解析,并获得有效数据;其中“规则”就是格式约束,对于符合规则的字符串才能被接受和实施,否则将会被丢弃。比如xml,json等等,只有符合格式约束的字符串才能被相应的引擎所解析。

    Redis协议非常简单且容易理解,request和reply数据都遵循同一个协议;对于client端的每个request,最终会被描述为一个command,command所包含的信息只会包括:指令名称 + 参数列表。那么对于server端的response,最终会被描述为一个result,那么result可能包括此次操作的状态(status)码、错误信息、结果内容等。协议就是为双端交互中的数据格式提供约束。

    1. Request部分:对于指令操作[command-name][arg...]最终将会转换成如下格式在网络中传输并交付给server端:

 

*[参数数量] \r\n
$[参数字节个数]\r\n
[参数字节序列]
...
##以“SET key value”指令为例:
*3	#表示有3个参数
$3	#表示“参数”有三个字节("SET"字符串为3个字节)
SET
$4
name
$5
01234


##流的方式
*3\r\n$3\r\nSET\r\n$4\r\nname\r\n$5\r\n01234\r\n
   

 

    2.Reply部分:server端需要返回结果的类型,redis中,reply的首个字符用来表示结果的类型,最终以“\r\n”结束.

  • “+”:“状态”类型reply,对于无需实际数据返回的相应,只是用来表示此次操作成功与否,例如SET KEY-VALUE指令。比如“+OK”表示操作成功,如果不成功将会返回“-ERROR”。
  • “-”:“异常”信息类型reply,对于操作失败时,将会返回此类型的信息告知client端。
    lrange testtt w x
    -ERR value is not an integer or out of range
     在“-”之后为“错误类型”,此后为一个空格或者“\r\n”,然后为异常信息内容,内容可能有多行。
  • “:”:表示返回结果为integer类型,此结果只包括一个数字,也可以用来表示true/false的结果类型,比如INCR/DECR/EXISTS/SISMEMBER等指令;需要注意的是,integer结果也是按照“字符串”方式传输的,你不能按照“4个字节=integer”的思路去使用它。比如integer = 23456,那么“23456”实际上是5个字节
    exists fck-me
    :0
    exists fck-you
    :1
  • “$”:表示返回的结果为普通数据结果,格式为:[$][字节个数][\r\n][字节序列][\r\n],如果结果中存在$-1则表示当前请求的数据为“null”。
    get fck-you
    $3
    123
    
  • “*”:表示返回的结果为复合数据结果,结果中包括多个子数据集合。
    lrange testlist 0 -1
    *2
    $1
    b
    $1
    a
    
     
    Redis支持“行内指令”,我们可以通过telnet的方式,执行指令和查看协议:
qing@qing-tp:~$ telnet 127.0.0.1 6379
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
SET test 1   
+OK
incr test
:2
exist test
-ERR unknown command 'exist'
 
     3.协议与实践:在下文的代码中,我们将通过代码展示如何直接通过Socket-IO的方式直接与redis-server交互,可以比较直观的明白protocol数据格式;其实,可以认为它是一个java redis-client,不过我本人采取了和Jedis不同的请求处理手段,本人采取了“请求队列 + 同步”的方式进行,其实如下代码可以在极少的修改下,改变为“异步”的方式。在此需要提醒:protocol中的字节流需要UTF-8编码之后。
TestMain.java
public class TestMain {

	public static void main(String[] args){
		Client client = new Client("127.0.0.1", 6379);
		client.set("testset", "012xyz中国_?");
		System.out.println(client.get("testset"));
		List<String> list = client.lrange("testlist", 0, -1);
		for(String item : list){
			System.out.println("--:" + item);
		}
		System.out.println("incr:" + client.incr("testincr"));
	}
}
 
Client.java(完整代码,参见附件)
public class Client {
	
	private BlockingQueue<Request> requests = new LinkedBlockingQueue<Request>();
	private Charset charset = Charset.forName("utf-8");
	private Handler handler;
	Client(String host,int port){
		handler = new Handler(host, port);
		handler.setDaemon(true);
		handler.start();
	}
	
	public void set(String key,String value){
		Request request = new Request(Command.SET, key,value);
		try{
			synchronized (request) {
				requests.put(request);
				request.wait();
			}
			Reply reply = request.reply;
			if(reply == null || reply.code == -1 || !reply.success){
				throw new RuntimeException("operation fail..");
			}
		}catch(InterruptedException e){
			return;
		}
	}	

	
	public String get(String key){
		Request request = new Request(Command.GET, key);
		try{
			synchronized (request) {
				requests.put(request);
				request.wait();
			}
			Reply reply = request.reply;
			if(reply == null || reply.code == -1 || !reply.success){
				throw new RuntimeException("operation fail..");
			}
			return reply.result;
			
		}catch(InterruptedException e){
			return null;
		}
	}
	
	public List<String> lrange(String key,int from,int to){
		Request request = new Request(Command.LRANGE, key,String.valueOf(from),String.valueOf(to));
		try{
			synchronized (request) {
				requests.put(request);
				request.wait();
			}
			Reply reply = request.reply;
			if(reply == null || reply.code == -1 || !reply.success){
				throw new RuntimeException("operation fail..");
			}
			return reply.lresult;
			
		}catch(InterruptedException e){
			return null;
		}
	}
	
	public Integer incr(String key){
		Request request = new Request(Command.INCR,key);
		try{
			synchronized (request) {
				requests.put(request);
				request.wait();
			}
			Reply reply = request.reply;
			if(reply == null || reply.code == -1 || !reply.success){
				throw new RuntimeException("operation fail..");
			}
			return Integer.valueOf(reply.result);
			
		}catch(InterruptedException e){
			return null;
		}
	}

	public void close(){
		handler.close();
	}
	
	class Handler extends Thread{
		Socket socket = null;
		boolean closed = false;
		BufferedReader is = null;
		OutputStream os = null;
		String host;
		int port;
		Handler(String host,int port){
			try{
				this.host = host;
				this.port = port;
				connect();
			}catch(Exception e){
				e.printStackTrace();
			}
		}
		
		private void connect() throws IOException{
			socket = new Socket();
			SocketAddress addr = new InetSocketAddress(host,port);
			socket.setKeepAlive(true);
			//socket.setSoTimeout(10000);
			socket.setSoLinger(true,0);
			//socket.setReceiveBufferSize(1024);
			socket.setTcpNoDelay(true);
			socket.connect(addr,10000); //blocking
			is = new BufferedReader(new InputStreamReader(socket.getInputStream(),charset));
			os = socket.getOutputStream();
		}
		
		public void close(){
			closed = true;
			this.interrupt();
		}
		
		
		private void write(Request request) throws IOException{
			os.write('*');
			String[] args = request.args;
			os.write(String.valueOf(args.length + 1).getBytes(charset));
			os.write('\r');
			os.write('\n');
			//*2
			os.write('$');
			byte[] cb = request.command.name().getBytes(charset);
			os.write(String.valueOf(cb.length).getBytes(charset));
			os.write('\r');
			os.write('\n');
			//$3
			os.write(cb);
			os.write('\r');
			os.write('\n');
			//GET
			for(String arg : args){
				byte[] ab = arg.getBytes(charset);
				os.write('$');
				os.write(String.valueOf(ab.length).getBytes(charset));
				os.write('\r');
				os.write('\n');
				os.write(ab);
				os.write('\r');
				os.write('\n');
			}
		}
		
		@Override
		public void run(){
			try{
				while(!closed){
					Request request = requests.take();
					try{
						write(request);
						char status = (char)is.read();
						Reply reply = new Reply();
						if(status != '-'){
							reply.success = true;
						}
						if(status == '+' || status == '-'){
							reply.message = read();
						}else if(status == '$'){
							reply.result = readString();
						}else if(status == '*'){
							reply.lresult = readMulti();
						}else if(status == ':'){
							reply.result = read();
						}else{
							request.reply = new Reply(-1);
							throw new RuntimeException("packet error..");
						}
						synchronized (request) {
							request.reply = reply;
							request.notifyAll();
						}
					}catch(Exception e){
						try{
							socket.close();
							this.connect();
						}catch(Exception ex){
							//
						}
						synchronized (request) {
							request.notifyAll();
						}
					}
				}
			}catch(InterruptedException e){
				try{
					closed = true;
					socket.close();
					for(Request request : requests){
						request.blocker.interrupt();
					}
					requests.clear();
				}catch(Exception ex){
					//
				}
			}
		}
		//read line
		private String read() throws IOException{
			StringBuilder sb = new StringBuilder();
			//\r\n必须互为成对
			//不能直接使用is.readline()
			boolean lfcr = false;
			while(true){
				char _char = (char)is.read();
				if(_char == -1){
					close();
					break;
				}
				//如果上一个字符为\r
				if(lfcr == true){
					if(_char == '\n'){
						break;
					}
					sb.append('\r');
					lfcr = false;
				}
				if(_char == '\r'){
					lfcr = true;
					continue;
				}
				sb.append(_char);
			}
			return sb.toString();
		}
		
		private List<String> readMulti() throws IOException{
			Integer size = Integer.valueOf(read());
			List<String> lresult = new ArrayList<String>();
			//eg: *3
			if(size > 0) {
				for(int i=0;i<size;i++){
					while(true){
						char _char = (char)is.read();//$3
						if(_char == '$'){
							lresult.add(readString());
							break;
						}
					}
				}
			}
			return lresult;
		}
		//such as:
		//$3
		//012
		private String readString() throws IOException{
			Integer size = Integer.valueOf(read());
			//-1 is null
			if(size > 0){
				return read();
			}
			return null;
		}
	}
}
 

   

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics