`
halloffame
  • 浏览: 54646 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

Apache Thrift 初学小讲(二)【一个简单示例】

阅读更多

一 生成代码

首先写一个接口定义文件ThriftTest.thrift,里面定义了一个User的struct和一个getUser接口,以java为例,cmd执行命令thrift --gen java ThriftTest.thrift,成功后会在ThriftTest.thrift文件所在的目录生成一个gen-java的文件夹,里面包含了生成的代码文件:gen-java\thrift\test\ThriftTest.java和gen-java\thrift\test\User.java。

 

接口定义文件ThriftTest.thrift:

namespace java thrift.test

struct User
{
    1: i32 id,
    2: string name
}

service ThriftTest
{
    User getUser(1: i32 id)
}

 

实现ThriftTest.thrift里的getUser接口(编写TestHandler类):

import org.apache.thrift.TException;

import thrift.test.ThriftTest;
import thrift.test.User;

public class TestHandler implements ThriftTest.Iface {

	@Override
	public User getUser(int id) throws TException {
		if (id == 2 ) {
			User user = new User();
			user.setId(2);
			user.setName("另外一个烟火");
			return user;
		}
		return null;
	}

}

 

二 编写server

 编写thrift的server端需要指定transport_type(通信方式),protocol_type(通信协议),server_type(服务器模式)。本例中实现了根据启动程序的命令行参数值来动态的指定transport_type,protocol_type和server_type。

 

1--transport_type本例中只列举使用了以下三种:

 

(1) buffered:使用经典的缓冲流Socket;

(2) framed:基于帧的方式的Socket,每个帧都是按照4字节的帧长加上帧的内容来组织,帧内容就是我们要收发的数据。读的时候按长度预先将整Frame数据读入Buffer,再从Buffer慢慢读取。写的时候,每次flush将Buffer中的所有数据写成一个Frame。framed这种方式有点类似于http协议的chunked编码

(3) fastframed:和framed相比是内存利用率更高的一个内存读写缓存区,它使用自动增长的byte[](不够长度才new),而不是每次都new一个byte[],提高了内存的使用率。framed的ReadBuffer每次读入Frame时都会创建新的byte[],WriteBuffer每次flush时如果大于初始1K也会重新创建byte[]。

 

2--protocol_type本例中只列举使用了以下三种:

 

(1) binary:二进制编码格式进行数据传输;

(2) json:使用JSON的数据编码协议进行数据传输;

(3) compact:高效率的,密集的二进制编码格式进行数据传输。

 

3--server_type本例中只列举使用了以下四种:

 

(1) simple:单线程阻塞io;

(2) thread-pool:多线程阻塞io;

(3) nonblocking:单条线程非阻塞io;

(4) threaded-selector:非阻塞io,有一条线程专门负责accept,若干条Selector线程处理网络IO,一个Worker线程池处理消息。

 

PS:

其它的transport_type请参考下载的thrift-0.9.3\lib\java\test\org\apache\thrift\transport目录下的例子。

其它的protocol_type请参考下载的thrift-0.9.3\lib\java\test\org\apache\thrift\protocol目录下的例子。 

其它的server_type请参考下载的thrift-0.9.3\lib\java\test\org\apache\thrift\server目录下的例子。

 

server_type中有个叫THsHaServer,相比threaded-selector的区别是只有一条AcceptSelect线程处理关于网络的一切,一个Worker线程池处理消息。threaded-selector模式是目前Thrift提供的最高级的模式,对于大部分应用场景性能都不会差,因此,如果实在不知道选择哪种工作模式,使用threaded-selector就可以。

 

服务端TestServer.java文件:

import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TJSONProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TSimpleServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TSSLTransportFactory;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportFactory;

import thrift.test.ThriftTest;

public class TestServer {
	public static void main(String [] args) {
		try {
	        int port = 9090;
	        boolean ssl = false; //传输是否加密
	        String transport_type = "buffered"; //需要和客户端的一致才能正常通信
	        String protocol_type = "binary"; //需要和客户端的一致才能正常通信
	        String server_type = "thread-pool";
	        
	        try {
	        	//从启动程序的命令行参数获取一些值,包括protocol_type,server_type和transport_type等
	        	for (int i = 0; i < args.length; i++) {
	        		if (args[i].startsWith("--port")) {
	        			port = Integer.valueOf(args[i].split("=")[1]);
	        		} else if (args[i].startsWith("--server-type")) {
	        			server_type = args[i].split("=")[1];
	        			server_type.trim();
	        		} else if (args[i].startsWith("--port")) {
	        			port = Integer.parseInt(args[i].split("=")[1]);
	        		} else if (args[i].startsWith("--protocol")) {
	        			protocol_type = args[i].split("=")[1];
	        			protocol_type.trim();
	        		} else if (args[i].startsWith("--transport")) {
	        			transport_type = args[i].split("=")[1];
	        			transport_type.trim();
	        		} else if (args[i].equals("--ssl")) {
	        			ssl = true;
	        		} else if (args[i].equals("--help")) {
			            System.out.println("Allowed options:");
			            System.out.println("  --help\t\t\tProduce help message");
			            System.out.println("  --port=arg (=" + port + ")\tPort number to connect");
			            System.out.println("  --transport=arg (=" + transport_type + ")\n\t\t\t\tTransport: buffered, framed, fastframed");
			            System.out.println("  --protocol=arg (=" + protocol_type + ")\tProtocol: binary, json, compact");
			            System.out.println("  --ssl\t\t\tEncrypted Transport using SSL");
			            System.out.println("  --server-type=arg (=" + server_type +")\n\t\t\t\tType of server: simple, thread-pool, nonblocking, threaded-selector");
			            System.exit(0);
	        		}
	        	}
	        } catch (Exception e) {
	        	System.err.println("Can not parse arguments! See --help");
	        	System.exit(1);
	        }
	
	        try {
	        	//检查传入的变量值是否正确
		        if (server_type.equals("simple")) {
		        } else if (server_type.equals("thread-pool")) {
		        } else if (server_type.equals("nonblocking")) {
		        	if (ssl == true) {
		        		throw new Exception("SSL is not supported over nonblocking servers!");
		        	}
		        } else if (server_type.equals("threaded-selector")) {
		        	if (ssl == true) {
		        		throw new Exception("SSL is not supported over nonblocking servers!");
		        	}
		        } else {
		        	throw new Exception("Unknown server type! " + server_type);
		        }
		        
		        if (protocol_type.equals("binary")) {
		        } else if (protocol_type.equals("json")) {
		        } else if (protocol_type.equals("compact")) {
		        } else {
		        	throw new Exception("Unknown protocol type! " + protocol_type);
		        }
		        if (transport_type.equals("buffered")) {
		        } else if (transport_type.equals("framed")) {
		        } else if (transport_type.equals("fastframed")) {
		        } else {
		        	throw new Exception("Unknown transport type! " + transport_type);
		        }
	        } catch (Exception e) {
	        	System.err.println("Error: " + e.getMessage());
	        	System.exit(1);
	        }
	
	        // Processor
	        TestHandler testHandler = new TestHandler(); //具体的业务逻辑类,实现ThriftTest.thrift里的getUser接口
	        //ThriftTest.Processor是生成的服务端代码
	        ThriftTest.Processor testProcessor = new ThriftTest.Processor(testHandler);
	
	        // Protocol factory
	        TProtocolFactory tProtocolFactory = null; //指定的通信协议
	        if (protocol_type.equals("json")) {
	        	tProtocolFactory = new TJSONProtocol.Factory();
	        } else if (protocol_type.equals("compact")) {
	        	tProtocolFactory = new TCompactProtocol.Factory();
	        } else {
	        	tProtocolFactory = new TBinaryProtocol.Factory();
	        }
	
	        TTransportFactory tTransportFactory = null; //指定的通信方式
	
	        if (transport_type.equals("framed")) {
	        	tTransportFactory = new TFramedTransport.Factory();
	        } else if (transport_type.equals("fastframed")) {
	        	tTransportFactory = new TFastFramedTransport.Factory();
	        } else { // .equals("buffered") => default value
	        	tTransportFactory = new TTransportFactory();
	        }
	
	        TServer serverEngine = null; //指定的服务器模式	
	
	        if (server_type.equals("nonblocking") || server_type.equals("threaded-selector")) {
	        	// Nonblocking servers
	        	TNonblockingServerSocket tNonblockingServerSocket =
	        			new TNonblockingServerSocket(new TNonblockingServerSocket.NonblockingAbstractServerSocketArgs().port(port));
	
		        if (server_type.equals("nonblocking")) {
		        	// Nonblocking Server
		        	TNonblockingServer.Args tNonblockingServerArgs
		              	= new TNonblockingServer.Args(tNonblockingServerSocket);
		        	tNonblockingServerArgs.processor(testProcessor);
		        	tNonblockingServerArgs.protocolFactory(tProtocolFactory);
		        	tNonblockingServerArgs.transportFactory(tTransportFactory);
		
		        	serverEngine = new TNonblockingServer(tNonblockingServerArgs);
		        } else { // server_type.equals("threaded-selector")
		        	// ThreadedSelector Server
		        	TThreadedSelectorServer.Args tThreadedSelectorServerArgs
		              	= new TThreadedSelectorServer.Args(tNonblockingServerSocket);
		        	tThreadedSelectorServerArgs.processor(testProcessor);
		        	tThreadedSelectorServerArgs.protocolFactory(tProtocolFactory);
		        	tThreadedSelectorServerArgs.transportFactory(tTransportFactory);
		
		        	serverEngine = new TThreadedSelectorServer(tThreadedSelectorServerArgs);
		        }
	        } else {
	        	// Blocking servers	
	        	// SSL socket
	        	TServerSocket tServerSocket = null;
	        	if (ssl) {
	        		tServerSocket = TSSLTransportFactory.getServerSocket(port, 0);
	        	} else {
	        		tServerSocket = new TServerSocket(new TServerSocket.ServerSocketTransportArgs().port(port));
	        	}
	
	        	if (server_type.equals("simple")) {
	        		// Simple Server
	        		TServer.Args tServerArgs = new TServer.Args(tServerSocket);
	        		tServerArgs.processor(testProcessor);
	        		tServerArgs.protocolFactory(tProtocolFactory);
	        		tServerArgs.transportFactory(tTransportFactory);
	
	        		serverEngine = new TSimpleServer(tServerArgs);
	        	} else { // server_type.equals("threadpool")
	        		// ThreadPool Server
	        		TThreadPoolServer.Args tThreadPoolServerArgs
	        			= new TThreadPoolServer.Args(tServerSocket);
	        		tThreadPoolServerArgs.processor(testProcessor);
	        		tThreadPoolServerArgs.protocolFactory(tProtocolFactory);
	        		tThreadPoolServerArgs.transportFactory(tTransportFactory);
	
	        		serverEngine = new TThreadPoolServer(tThreadPoolServerArgs);
	        	}
	        }
	
	        // Run it
	        System.out.println("Starting the server on port " + port + "...");
	        System.out.println("transport_type:" + transport_type);
	        System.out.println("protocol_type:" + protocol_type);
	        System.out.println("server_type:" + server_type);
	        serverEngine.serve();
	
	    } catch (Exception x) {
	    	x.printStackTrace();
	    }
	    System.out.println("done.");
	}
}

 

三 编写client 

编写thrift的client 端需要指定transport_type(通信方式),protocol_type(通信协议),这两个需要跟服务端的一致才能正常通信。

 

客户端TestClient.java文件:

import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TJSONProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.THttpClient;
import org.apache.thrift.transport.TSSLTransportFactory;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

import thrift.test.ThriftTest;
import thrift.test.User;

public class TestClient {
    public static void main(String [] args) {
	    String host = "localhost";
	    int port = 9090;
	    String protocol_type = "binary"; //需要和服务端的一致才能正常通信
	    String transport_type = "buffered"; //需要和服务端的一致才能正常通信
	    boolean ssl = false; //传输是否加密	
	    int socketTimeout = 1000;
	
	    try {
	    	//从启动程序的命令行参数获取一些值,包括protocol_type和transport_type等
	    	for (int i = 0; i < args.length; ++i) {
		        if (args[i].startsWith("--host")) {
		            host = args[i].split("=")[1];
		            host.trim();
		        } else if (args[i].startsWith("--port")) {
		            port = Integer.valueOf(args[i].split("=")[1]); 
		        } else if (args[i].equals("--timeout")) {
		            socketTimeout = Integer.valueOf(args[i].split("=")[1]);
		        } else if (args[i].startsWith("--protocol")) {
		            protocol_type = args[i].split("=")[1];
		            protocol_type.trim();
		        } else if (args[i].startsWith("--transport")) {
		            transport_type = args[i].split("=")[1];
		            transport_type.trim();
		        } else if (args[i].equals("--ssl")) {
		            ssl = true;
		        } else if (args[i].equals("--help")) {
		            System.out.println("Allowed options:");
		            System.out.println("  --help\t\t\tProduce help message"); 
		            System.out.println("  --host=arg (=" + host + ")\tHost to connect");
		            System.out.println("  --port=arg (=" + port + ")\tPort number to connect");
		            System.out.println("  --transport=arg (=" + transport_type + ")\n\t\t\t\tTransport: buffered, framed, fastframed, http");
		            System.out.println("  --protocol=arg (=" + protocol_type + ")\tProtocol: binary, json, compact");
		            System.out.println("  --ssl\t\t\tEncrypted Transport using SSL");
		            System.exit(0);
		        }
	        }
	    } catch (Exception x) {
	        System.err.println("Can not parse arguments! See --help");
	        System.exit(1);
	    }
	
	    try {
	    	//检查传入的变量值是否正确
	        if (protocol_type.equals("binary")) {
	        } else if (protocol_type.equals("compact")) {
	        } else if (protocol_type.equals("json")) {
	        } else {
	            throw new Exception("Unknown protocol type! " + protocol_type); 
	        }
	        
	        if (transport_type.equals("buffered")) {
	        } else if (transport_type.equals("framed")) {
	        } else if (transport_type.equals("fastframed")) {
	        } else if (transport_type.equals("http")) {
	        } else {
	            throw new Exception("Unknown transport type! " + transport_type);
	        }
	        
	        if (transport_type.equals("http") && ssl == true) { //不支持https
	            throw new Exception("SSL is not supported over http.");
	        }
	    } catch (Exception e) {
	        System.err.println("Error: " + e.getMessage());
	        System.exit(1);
	    }
	
	    TTransport transport = null; //指定的通信方式
	
	    try {
	        if (transport_type.equals("http")) { //http的transport_type(通信方式)在下一节中写个例子
	        	String url = "http://" + host + ":" + port + "/service";
	        	transport = new THttpClient(url);
	        } else {
	        	TSocket socket = null;
		        if (ssl == true) {
		            socket = TSSLTransportFactory.getClientSocket(host, port, 0);
		        } else {
		            socket = new TSocket(host, port);
		        }
		        
		        socket.setTimeout(socketTimeout);
		        
		        transport = socket;
		        if (transport_type.equals("buffered")) {
		        } else if (transport_type.equals("framed")) {
		            transport = new TFramedTransport(transport);
		        } else if (transport_type.equals("fastframed")) {
		            transport = new TFastFramedTransport(transport);
		        }
	        }
	    } catch (Exception x) {
	        x.printStackTrace();
	        System.exit(1);
	    }
	
	    TProtocol tProtocol = null; //指定的通信协议
	    if (protocol_type.equals("json")) {
	        tProtocol = new TJSONProtocol(transport);
	    } else if (protocol_type.equals("compact")) {
	        tProtocol = new TCompactProtocol(transport);
	    } else {
	        tProtocol = new TBinaryProtocol(transport);
	    }
	
	    //ThriftTest.Client是生成的客户端代码
	    ThriftTest.Client testClient = new ThriftTest.Client(tProtocol);
    
	    try {
	        System.out.println("connect " + host + ":" + port);	
	        System.out.println("protocol_type:" + protocol_type);
	        System.out.println("transport_type:" + transport_type);
	        if (transport.isOpen() == false) {
		        try {
		            transport.open();
		        } catch (TTransportException ttx) {
		            ttx.printStackTrace();
		            System.out.println("Connect failed: " + ttx.getMessage());
		            System.exit(1);
		        }
	        }

	        User user = testClient.getUser(2); //getUser就是ThriftTest.thrift所定义的接口
	        System.out.println("名字:"+ user.getName());

	        transport.close();
	    } catch (Exception x) {
	        x.printStackTrace();
	        System.exit(1);
	    }

	    System.exit(0);
    }
}

 

四 运行结果

--服务端--

Starting the server on port 9090...

transport_type:fastframed

protocol_type:binary

server_type:simple

 

--客户端--

connect localhost:9090

protocol_type:binary

transport_type:fastframed

名字:另外一个烟火

 

五 工程文件结构

 


 有不明的地方建议先看Apache Thrift 初学小讲(一),附件src.rar是源代码文件。

  • src.rar (12.5 KB)
  • 下载次数: 21
2
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics