浏览 5792 次
锁定老帖子 主题:xSocket multiplexed介绍
精华帖 (0) :: 良好帖 (19) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2010-10-14
最后修改:2010-10-18
相关socket介绍请看 xsocket documentation xsocket Multiplexed是xSocket的一个扩展模块,xsocket Multiplexed的定义是支持一个物理TCP连接之上运行的多个逻辑连接。 说白了其实就是长连接的利用,所谓长连接就是Client方与Server方先建立通讯连接,连接建立后不断开, 然后再进行报文发送和接收。对于点对点的通讯,比较适合。使用xsocket Multiplexed进行持久连接通讯,对业务开发更加友好。 下面是一个同步客户端的例子: // 建立一个客户端的 multiplexed connection, //这个连接里面套了一个NonBlockingConnection,后面所有的逻辑连接都通过这个连接传输数据 INonBlockingConnection nativeCon = NonBlockingConnection(hostname, 9011); IMultiplexedConnection multiplexedCon = new MultiplexedConnection(nativeCon); // 建立一个逻辑连接 pipeline,然后所做的事情就是,向服务端写了一个header,默认实现是写了18个字节。 //VERSION byte(1) ; PIPELINE_OPENED byte(1) ; UUID(16) //服务端收到header后就会调用注册handler的onPipelineOpend方法 String controlPipelineId = multiplexedCon.createPipeline(); IBlockingPipeline controlPipeline = multiplexedCon.getBlockingPipeline(controlPipelineId); // 在发送数据时,会再向服务端写一个header //VERSION byte(1) ; PIPELINE_DATA byte(1) ; UUID(16) //告诉服务端,我要写正文了,服务端会调用注册handler的onPipelineData方法,开始准备接收数据处理业务逻辑 //在处理过程中建立新的逻辑连接进行处理,多个逻辑连接之间不会有影响 controlPipeline.write("NEW_MESSAGE_PIPELINE\r\n"); String messagePipelineId = commandPipeline.readStringByDelimiter("\r\n"); IBlockingPipeline messagePipeline = multiplexedCon.getBlockingPipeline(messagePipelineId); messagePipeline.write(messsagepart_1); messagePipeline.write(messsagepart_2); ... controlPipeline.write("CLOSE_MESSAGE_PIPELINE" + messagePipelineId + "\r\n"); //最后关闭业务连接,向服务端发送一个header //VERSION byte(1) ; PIPELINE_CLOSED byte(1) ; UUID(16) //服务端收到后就会调用注册handler的onPipelineClosed方法进行后续清理 //这个关闭仅仅是逻辑链接的关闭,并非主连接的关闭。 messagePipeline.close(); 服务端的处理 对于一个新的IMultiplexedConnection服务端会调度到线程池执行,而对于IMultiplexedConnection的逻辑链接默认是在IMultiplexedConnection所在的线程中执行的,你也可以将逻辑链接中的业务处理自行调度到线程池中去执行。 IServer server = new Server(9011, new MultiplexedProtocolAdapter(new CommandPipelineHandler())); ConnectionUtils.start(server); // pipeline data handler which is assigned to each new pipeline class CommandPipelineHandler implements IPipelineDataHandler { public boolean onData(INonBlockingPipeline pipeline) throws IOException { String cmd = pipeline.readStringByDelimiter("\r\n") if (cmd.equals("NEW_MESSAGE_PIPELINE")) { IMultiplexedConnection mplCon = pipeline.getMultiplexedConnection(); //这里定义了一个新的逻辑连接,并不和其他逻辑连接冲突 String msgPipelineId = mplCon.createPipeline(); INonBlockingPipeline msgPipeline = mplCon.getNonBlockingPipeline(msgPipelineId); // replace the CommandPipelineHandler of the new pipeline // 为新的逻辑连接指派一个handler, // 和当前handler在一个线程中,即使设定@Execution(Execution.MULTITHREADED) 也是没用的 msgPipeline.setHandler(new DataHandler()); pipeline.write(msgPipelineId + "\r\n"); //耗时业务处理可以自行进行线程池调度,比如 pipeline.getWorkerpool().execute(task..); } ... ... } } // A pipeline handler could also be a ordinary handler like IDataHandler class DataHandler implements IDataHandler { private FileChannel fc = null; DataHandler() { File file = ... fc = new RandomAccessFile(file, "rw").getChannel(); ... } public boolean onData(INonBlockingConnection pipeline) throws IOException { pipeline.transferTo(fc, pipeline.available()); ... return true; } } Multiplexer的实现 在Multiplexer可以定义发送header及如何读取数据的的各项细节,参见默认的实现可以了解的更多,基本上一目了然了。 默认实现是org.xsocket.connection.multiplexed.multiplexer.SimpleMultiplexer 你可以自定义个Multiplexer: // client-side custom multiplexed handling IMultiplexedConnection multiplexedCon = new MultiplexedConnection(nativeCon, new MyMultiplexer()); // server-side custom multiplexed handling IHandler adpater = new MultiplexedProtocolAdapter(new CommandPipelineHandler(), new MyMultiplexer())) IServer server = new Server(9011, adpater); class MyMultiplexer implements IMultiplexer { //打开一个新的逻辑连接,向对端发送一个header public String openPipeline(INonBlockingConnection nbc) throws IOException, ClosedException { ... create a unique pipelineId ... create a network packet ... and send it to the peer return pipelineId; } //关闭逻辑连接,向对端发送一个header public void closePipeline(INonBlockingConnection nbc, String pipelineId) throws IOException, ClosedException { ... create a network packet ... and send it to the peer } //发送正文的之前执行,想对端发送一个header public void multiplex(INonBlockingConnection nbc, String pipelineId, ByteBuffer[] dataToWrite) throws IOException, ClosedException { ... create a network packet based on the pipeline data ... and send it to the peer } public void multiplex(INonBlockingConnection nbc, String pipelineId, ByteBuffer[] dataToWrite, IWriteCompletionHandler completionHandler) throws IOException, ClosedChannelException { ... create a network packet based on the pipeline data ... and send it to the peer } //这个方法定义数据的接收处理 public void demultiplex(INonBlockingConnection nbc, IDemultiplexResultHandler rsltHdl) throws IOException, ClosedException { ... read the network packet ... notify the specific pipeline message //默认实现是: byte dataType = nbc.readByte(); ... switch (dataType) { case PIPELINE_DATA:...rsltHdl.onPipelineData(pipelineId, data); case PIPELINE_OPENED:...rsltHdl.onPipelineOpend(pipelineId, data); case PIPELINE_CLOSED:...rsltHdl.onPipelineClosed(pipelineId, data); default:... } ... } } 总的来说这个api还是非常轻巧实用的。 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2010-10-16
图是用啥画的?
|
|
返回顶楼 | |
发表时间:2010-10-17
请问一下 这个东西的源代码 或者jar包能提供吗?
|
|
返回顶楼 | |
发表时间:2010-10-17
http://xsocket.sourceforge.net/上有下载
http://xsocket.sourceforge.net/download.htm |
|
返回顶楼 | |
发表时间:2010-10-17
xzqttt 写道 图是用啥画的?
官方网站上的图,看不出来具体什么画的,不过感觉用ppt的3d效果也能做出来的。 |
|
返回顶楼 | |
发表时间:2011-01-11
最后修改:2011-01-11
有中文注释的代码?强大哦
|
|
返回顶楼 | |