- 浏览: 139019 次
- 性别:
- 来自: 上海
文章分类
最新评论
-
zping:
7年后看这个文章,感触很深!
如何定义程序员的成功? -
hy2012_campus:
请问 怎么在一个独立的系统中使用,分库分表了,我在外部的一个独 ...
去年底写的mysql分库分表中间件heisenberg -
天塔上的猫:
很棒的想法!
去年底写的mysql分库分表中间件heisenberg -
zhbf5156:
第一眼就觉得和cobar很像,是基于cobar二次开发的吗?
去年底写的mysql分库分表中间件heisenberg -
atomduan:
熊~,暴露什么了啊[WARNING] Could not tr ...
去年底写的mysql分库分表中间件heisenberg
最近看上了mina,其性能多多,完全不用说了。。,本来是想搞个file+msg传输的,结果。。被这个handler给 block 了,不知道这个东西扎用。。。按其Iohandler interface implement methods+ processStreamIo abstract method 写了一个sample,发现死活发不了包。。。哥怒了。
没办法,开源的东西还是有好处地,可以深入,下了源码,发现其结构真的很不错,IoService(类似与和customer communication 的一个接口: session )<==IoProcessor(这个好象在code business 的时候没看到,后来才知道是用于处理IoFilter 的)<===IoFilter(*) ,IoHandler(business handler) 。
原来,StreamIoHandler 的 里面wrap 了两个内部out,in,并且对IoHandler 的每个method 都做了实现,迷底马上揭晓了。。。see the blew:
@Override public void sessionOpened(IoSession session) { // Set timeouts session.getConfig().setWriteTimeout(writeTimeout); session.getConfig().setIdleTime(IdleStatus.READER_IDLE, readTimeout); // Create streams InputStream in = new IoSessionInputStream(); OutputStream out = new IoSessionOutputStream(session); session.setAttribute(KEY_IN, in); session.setAttribute(KEY_OUT, out); processStreamIo(session, in, out); }
划克,把processStreamIo template method 了。。(PS:曾经偶在写portal 里用到过这种pattern)
然后其他的iohandler 的method 里面也全写满了io logic....最后session closed 才把in ,out stream close .
我想,这样一来应该每传一个文件就回connect 一次了。。
费话说完了。。code ==>
/* * To change this template, choose Tools | Templates * and open the template in the editor. */ package minastudy.t2; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder; import org.apache.mina.core.session.IoSession; import org.apache.mina.handler.demux.DemuxingIoHandler; import org.apache.mina.handler.stream.StreamIoHandler; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; /** * * @author zx04741 */ public class Server { public static void main(String args[]) throws IOException { NioSocketAcceptor acceptor = new NioSocketAcceptor(); // DefaultIoFilterChainBuilder chain = acceptor.getFilterChain(); // chain.addLast("codec", new ProtocolCodecFilter( // new TextLineCodecFactory())); StreamIoHandler handler = new StreamIoHandler() { private ExecutorService pool = Executors.newCachedThreadPool(); // public void exceptionCaught(IoSession session, Throwable cause) { // super.exceptionCaught(session, cause); // System.out.println(cause); // } // // public void sessionOpened(IoSession session) { // super.sessionOpened(session); // System.out.println(" session open...."); // } @Override protected void processStreamIo(IoSession session, InputStream in, OutputStream out) { System.out.println("process stream..."); pool.execute(new Work(session, in, out)); } }; acceptor.setHandler(handler); acceptor.bind(new InetSocketAddress(8889)); } } class Work extends Thread { private InputStream in; private OutputStream out; private IoSession session; public Work(IoSession session, InputStream in, OutputStream out) { this.in = in; this.out = out; this.session = session; } public void run() { try { FileOutputStream fos = new FileOutputStream(new File("c:/jre(1).z01")); byte[] buf = new byte[2048]; int offset = 0; while ((offset = in.read(buf)) != -1) { System.out.println(offset); fos.write(buf, 0, offset); fos.flush(); } fos.close(); System.out.println("session over.."); } catch (IOException ex) { Logger.getLogger(Server.class.getName()).log(Level.SEVERE, null, ex); } } } /* * To change this template, choose Tools | Templates * and open the template in the editor. */ package minastudy.t2; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; import java.nio.channels.FileChannel; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.session.IoSession; import org.apache.mina.handler.stream.StreamIoHandler; import org.apache.mina.transport.socket.nio.NioSocketConnector; /** * * @author zx04741 */ public class Client { public static void main(String args[]) throws FileNotFoundException { NioSocketConnector connector = new NioSocketConnector(); DefaultIoFilterChainBuilder chain = connector.getFilterChain(); connector.setHandler(new StreamIoHandler() { // private ExecutorService pool = Executors.newCachedThreadPool(); // public void exceptionCaught(IoSession session, Throwable cause) { // super.exceptionCaught(session, cause); // System.out.println(cause); // } // public void messageSent(IoSession session, Object message) throws Exception { // // Empty handler // super.messageSent(session, message); //// System.out.println("sent===>"); // } // // public void messageReceived(IoSession session, Object buf) { // super.messageReceived(session, buf); //// System.out.println("recv===>"); // } @Override protected void processStreamIo(IoSession session, InputStream in, OutputStream out) { //pool.execute(new Work(session, in, out)); System.out.println("client in process stream.."); try { String fileName = "C:/jdk15910/jre(1).z01"; File f = new File(fileName); byte[] buf = new byte[2048]; FileInputStream fis = new FileInputStream(f); int offset = 0; while (true) { offset = fis.read(buf); if (offset == -1) { break; } System.out.println(offset); out.write(buf, 0, offset); } //important must be waiting for over..
session.close(false); fis.close(); System.out.println("over.."); } catch (IOException ex) { Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex); } } }); ConnectFuture future1 = connector.connect(new InetSocketAddress(8889)); future1.awaitUninterruptibly(); if (!future1.isConnected()) { System.out.println("no connect..."); } IoSession session = future1.getSession(); System.out.println("session==>start"); } }
有很多忌讳的地方,client里面的out 不能flush ,否则回block住,。。。有点怪异,我看了下IoSessionOutputStream
@Override public synchronized void flush() throws IOException { if (lastWriteFuture == null) { return; } lastWriteFuture.awaitUninterruptibly(); if (!lastWriteFuture.isWritten()) { throw new IOException( "The bytes could not be written to the session"); } }
重写的flush 方法。。synchonized...要等到上次写完成后才可以写下次。。但是也不可能block 住所有的啊。。难道有相关的锁没有释放??先到这,回家再研究。。
problem traces 到 AbstractIoSession
原来真的是的,在AbstractIoSession里面有一个WriteFuture 的object锁,会将client当前的线程读写锁定在当前buffer之中,OK,这样就真相大白了.现在要做的就是把client改成多线程来处理这件事情了和Server一样的,确实如此,另外,在写的过程要保持Thread.sleep,这样CPU就不会达到很高...
code==>
public class Client {
public static void main(String args[]) throws FileNotFoundException { final NioSocketConnector connector = new NioSocketConnector(); DefaultIoFilterChainBuilder chain = connector.getFilterChain(); connector.setHandler(new StreamIoHandler() { private ExecutorService pool = Executors.newCachedThreadPool(); public void exceptionCaught(IoSession session, Throwable cause) { super.exceptionCaught(session, cause); System.out.println(cause); } public void sessionClosed(IoSession session) throws Exception { super.sessionClosed(session); System.out.println(" session close.. "); } public void sessionOpened(IoSession session) { super.sessionOpened(session); System.out.println(" session open...."); } @Override protected void processStreamIo(IoSession session, InputStream in, OutputStream out) { pool.execute(new Worker2(session, in, out)); } }); ConnectFuture future1 = connector.connect(new InetSocketAddress(8889)); future1.awaitUninterruptibly(); if (!future1.isConnected()) { System.out.println("no connect..."); } else { System.out.println("connected session==>start"); } // IoSession session = future1.getSession(); } } class Worker2 extends Thread { private InputStream in; private OutputStream out; private IoSession session; public Worker2(IoSession session, InputStream in, OutputStream out) { this.in = in; this.out = out; this.session = session; } public void run() { System.out.println("client in process stream.."); try { //d:/drivers/sw8-Chipset.rar F:\\IDE\\netbeans-6.7.1-ml-windows.exe String fileName = "F:\\IDE\\netbeans-6.7.1-ml-windows.exe"; File f = new File(fileName); byte[] buf = new byte[2048]; FileInputStream fis = new FileInputStream(f); int offset = 0; int count = 0; while (true) { Thread.sleep(4); offset = fis.read(buf); count++; if(count % 1000==0) System.out.println(offset+","+System.currentTimeMillis()); if (offset == -1) { break; } out.write(buf, 0, offset); out.flush(); } fis.close(); session.close(false).awaitUninterruptibly(); System.out.println("over.."); } catch (InterruptedException ex) { Logger.getLogger(Worker2.class.getName()).log(Level.SEVERE, null, ex); } catch (IOException ex) { Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex); } } }
相关推荐
mina连接,mina心跳连接,mina断线重连。其中客户端可直接用在android上。根据各方参考资料,经过自己的理解弄出来的。CSDN的资源分太难得了。
mina-core-2.0.0-M6.jar mina-example-2.0.0-M6.jar mina-filter-codec-netty-2.0.0-M6.jar mina-filter-compression-2.0.0-M6.jar mina-integration-beans-2.0.0-M6.jar mina-integration-jmx-2.0.0-M6.jar mina-...
mina的使用初步入门mina的使用初步入门mina的使用初步入门
里面包含mina2.0的api(英文)和mina自学手册,还有mina的开发指导
Apache MINA是 Apache 组织一个较新的项目,它为开发高性能和高可用性的网络应用程序提供了非常便利的框架。 当前发行的 MINA 版本支持基于 Java NIO 技术的 TCP/UDP 应用程序开发、串口通讯程序(只在最新的预览版...
Apache Mina Server 2.0中文参考手册V1.0,Apache Mina2.0学习笔记(修订版)Apache Mina Server 2.0中文参考手册V1.0,Apache Mina2.0学习笔记(修订版)
mina的高级使用,mina文件图片传送,
深入理解Apache_Mina_(1)----_Mina的几个类 深入理解Apache_Mina_(2)----_与IoFilter相关的几个类 深入理解Apache_Mina_(3)----_与IoHandler相关的几个类 深入理解Apache_Mina_(4)----_IoFilter和IoHandler的区别和...
mina内部源码,可以深入的研究下,重构修改后获得的效率更加突出
mina新手案例,mina新手教程源码 mina+springboot最简单的案例。用的IDEA * mina服务端 * 1、添加@Controller注解和 @PostConstruct注解,代表启动springboot项目时也调用该类下的该方法, * 启动springboot项目...
许多刚接触mina的朋友,对于mina的编解码器的编写很迷惑.希望这个文档可以帮助朋友们少走弯路。 资源中是一个比较典型的编解码器写法。生成了可执行文件。并对编解码器的代码有详细注释。
1.mina socket客户度工程相关类,添加mina jar包后可独立运行。 2.mina若有空闲连接则使用已有连接,若无则新建mina连接; 3.mina空闲连接超过保活时间25分钟后,自动删除; 4.mina发送指令后,接收指定时长内收到的...
mina客户端,服务器端的demo
项目包含有mina的服务端与客户端,客户端发送心跳包,服务端响应心跳包
mina.jar 包,内涵mina所需的所有jar包, 解压即可,测试可用。mina.jar 包,内涵mina所需的所有jar包, 解压即可,测试可用。
MINA 2.0 User Guide Part I - Basics Chapter 1 - Getting Started Chapter 2 - Basics Chapter 3 - Service Chapter 4 - Session Chapter 5 - Filters Chapter 6 - Transports Chapter 7 - Handler Part II - ...
mina运行最基本的demo,实现客户端输入数据的返回。其中还包含mina的jar包
mina mina传输对象的示例 mina框架 mina示例
两个mina开发文档,含有所有的mina的简单开发案例与教程
本库是对我在项目中使用的Mina和长连接的一个封装,亲测有效,在网络良好的情况下,几乎能够保证100%的连接和通讯;