`
阅读更多


1. 传统Socket:阻塞式通信
在java传统socket技术中,每建立一个Socket连接时,须同时创建一个新线程对该Socket进行单独通信(采用阻塞的方式通信)。
这种方式具有很高的响应速度,并且控制起来也很简单,在连接数较少的时候非常有效,但是如果对每一个连接都产生一个线程无疑是对系统资源的一种浪费,如果连接数较多将会出现资源不足的情况。下面的代码就说明了这一点。
a) server code:
package Socket;
 
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
 
public class MultiUserServer extends Thread {
 
    private Socket client;
 
    public MultiUserServer(Socket c) {
        this.client = c;
    }
 
    public void run() {
        try {
            BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
 
            // Mutil User but can’t parallel
            while (true) {
                String str = in.readLine();
 
                System.out.println("receive message: " + str);
                if (str.equals("end")) break;
            }
 
            client.close();
 
        } catch (IOException ex) {
 
        }
    }
 
    public static void main(String[] args) throws IOException {
        int port = 10086;
 
        if (args.length > 0)
            port = Integer.parseInt(args[0]);
        ServerSocket server = new ServerSocket(port);
        System.out.println("the server socket application is created!");
 
        while (true) {
            // transfer location change Single User or Multi User
            MultiUserServer mu = new MultiUserServer(server.accept());
            mu.start();
        }
    }
}
b) client code:
package Socket;
 
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
 
public class Client {
    static Socket server;
 
    public static void main(String[] args) throws Exception {
        String host = "192.168.0. 10";
        int port = 10086;
 
        if (args.length > 1) {
            host = args[0];
            port = Integer.parseInt(args[1]);
        }
 
        System.out.println("connetioning:" + host + ":" + port);
        server = new Socket(host, port);
        PrintWriter out = new PrintWriter(server.getOutputStream());
        BufferedReader wt = new BufferedReader(new InputStreamReader(System.in));
 
        while (true) {
            String str = wt.readLine();
            out.println(str);
            out.flush();
 
            if (str.equals("end")) break;
        }
        server.close();
    }
}
2. nio socket: 非阻塞通讯模式
a) NIO 设计背后的基石:反应器模式
反应器模式: 用于事件多路分离和分派的体系结构模式。
反应器模式的核心功能如下:
将事件多路分用
将事件分派到各自相应的事件处理程序
b) NIO 的非阻塞 I/O 机制是围绕 选择器和 通道构建的。
选择器(Selector类): 是 Channel 的多路复用器。Selector 类将传入客户机请求多路分用并将它们分派到各自的请求处理程序。
通道(Channel 类):表示服务器和客户机之间的一种通信机制,一个通道负责处理一类请求/事件。
简单的来说:
NIO是一个基于事件的IO架构,最基本的思想就是:有事件我会通知你,你再去做与此事件相关的事情。而且NIO的主线程只有一个,不像传统的模型,需要多个线程以应对客户端请求,也减轻了JVM的工作量。
c) 当Channel注册至Selector以后,经典的调用方法如下:
while (somecondition) {
        int n = selector.select(TIMEOUT);
 
        if (n == 0) continue;
            for (Iterator iter = selector.selectedKeys().iterator(); iter.hasNext();) {
                if (key.isAcceptable()) doAcceptable(key);
                if (key.isConnectable()) doConnectable(key);
                if (key.isValid() && key.isReadable()) doReadable(key);
                if (key.isValid() && key.isWritable()) doWritable(key);
 
                iter.remove();
            }
        }
NIO 有一个主要的类Selector,这个类似一个观察者,只要我们把需要探知的socketchannel告诉Selector,我们接着做别的事情,当有事件发生时,他会通知我们,传回一组SelectionKey,我们读取这些Key,就会获得我们刚刚注册过的socketchannel,然后,我们从这个Channel中读取数据,放心,包准能够读到,接着我们可以处理这些数据。
Selector内部原理实际是在做一个对所注册的channel的轮询访问,不断的轮询(目前就这一个算法),一旦轮询到一个channel有所注册的事情发生,比如数据来了,他就会站起来报告,交出一把钥匙,让我们通过这把钥匙来读取这个channel的内容。
d) Sample01
package NIO;
// 程序目的:学习Java NIO#SocketChannel

import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
 
public class SocketChannelDemo {
    public static int PORT_NUMBER = 23;// 监听端口
    static String line = "";
    ServerSocketChannel serverChannel;
    ServerSocket serverSocket;
    Selector selector;
 
    private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
 
    public static void main(String[] args) throws Exception {
        SocketChannelDemo server = new SocketChannelDemo();
        server.init(args);
        server.startWork();
    }
 
    public void init(String[] argv) throws Exception {
        int port = PORT_NUMBER;
 
        if (argv.length > 0) port = Integer.parseInt(argv[0]);
 
        System.out.println("Listening on port " + port);
 
        // 分配一个ServerSocketChannel
        serverChannel = ServerSocketChannel.open();
 
        // 从ServerSocketChannel里获得一个对应的Socket
        serverSocket = serverChannel.socket();
 
        // 生成一个Selector
        selector = Selector.open();
 
        // 把Socket绑定到端口上
        serverSocket.bind(new InetSocketAddress(port));
 
        // serverChannel为非bolck
        serverChannel.configureBlocking(false);
 
        // 通过Selector注册ServerSocetChannel
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
   }
 
   public void startWork() throws Exception {
       while (true) {
           int n = selector.select();// 获得IO准备就绪的channel数量
 
           if (n == 0) continue; // 没有channel准备就绪,继续执行
 
           // 用一个iterator返回Selector的selectedkeys
           Iterator it = selector.selectedKeys().iterator();
 
           // 处理每一个SelectionKey
           while (it.hasNext()) {
               SelectionKey key = (SelectionKey) it.next();
 
               // 判断是否有新的连接到达
               if (key.isAcceptable()) {
                   // 返回SelectionKey的ServerSocketChannel
                   ServerSocketChannel server = (ServerSocketChannel) key.channel();
                   SocketChannel channel = server.accept();
                   registerChannel(selector, channel, SelectionKey.OP_READ);
                   doWork(channel);
               }
 
               // 判断是否有数据在此channel里需要读取
               if (key.isReadable()) processData(key);
 
               // 删除 selectedkeys
               it.remove();
           }
       }
   }
 
   protected void registerChannel(Selector selector, SelectableChannel channel, int ops) throws Exception {
       if (channel == null) return;
 
       channel.configureBlocking(false);
       channel.register(selector, ops);
   }
 
   // 处理接收的数据
   protected void processData(SelectionKey key) throws Exception {
       SocketChannel socketChannel = (SocketChannel) key.channel();
       int count;
 
       buffer.clear(); // 清空buffer
 
       // 读取所有的数据
       while ((count = socketChannel.read(buffer)) > 0) {
           buffer.flip();
 
           // send the data, don′t assume it goes all at once
           while (buffer.hasRemaining()) {
               char c = (char) buffer.get();
               line += c;
 
               // 如果收到回车键,则在返回的字符前增加[echo]$字样,并且server端打印出字符串
               if (c == (char) 13) {
                   buffer.clear();
                   buffer.put("[echo]$".getBytes());
                   buffer.flip();
                   System.out.println(line); //
                   line = "";
               }
 
               socketChannel.write(buffer);// 在Socket里写数据
           }
 
           buffer.clear(); // 清空buffer
       }
       if (count < 0) socketChannel.close(); // count<0,说明已经读取完毕
   }
 
   private void doWork(SocketChannel channel) throws Exception {
       buffer.clear();
       buffer.put("Hello,I am working,please input some thing,and i will echo to you![echo]$".getBytes());
       buffer.flip();
       channel.write(buffer);
   }
}
运行此程序,然后在控制台输入命令telnet localhost 23。
e) Server code:
public class NonBlockingServer {
    public Selector sel = null;
    public ServerSocketChannel server = null;
    public SocketChannel socket = null;
    public int port = 4900;
    String result = null;
    public NonBlockingServer() {
        System.out.println("Inside default ctor");
    }
 
    public NonBlockingServer(int port) {
        System.out.println("Inside the other ctor");
        this.port = port;
    }
 
    public void initializeOperations() throws IOException,UnknownHostException {
        System.out.println("Inside initialization");
        sel = Selector.open();
        server = ServerSocketChannel.open();
        server.configureBlocking(false);
        InetAddress ia = InetAddress.getLocalHost();
        InetSocketAddress isa = new InetSocketAddress(ia,port);
        server.socket().bind(isa);
    }
 
    public void startServer() throws IOException {
        System.out.println("Inside startserver");
        initializeOperations();
 
        System.out.println("Abt to block on select()");
        SelectionKey acceptKey = server.register(sel, SelectionKey.OP_ACCEPT );
        while (acceptKey.selector().select() > 0 ) {
            Set readyKeys = sel.selectedKeys();
            Iterator it = readyKeys.iterator();
 
            while (it.hasNext()) {
                SelectionKey key = (SelectionKey)it.next();
                it.remove();
                if (key.isAcceptable()) {
                    System.out.println("Key is Acceptable");
                    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                    socket = (SocketChannel) ssc.accept();
                    socket.configureBlocking(false);
                    SelectionKey another = socket.register(sel,SelectionKey.OP_READ|SelectionKey.OP_WRITE);
                }
                if (key.isReadable()) {
                    System.out.println("Key is readable");
                    String ret = readMessage(key);
                    if (ret.length() > 0) writeMessage(socket,ret);
                }
                if (key.isWritable()) {
                    System.out.println("THe key is writable");
                    String ret = readMessage(key);
                    socket = (SocketChannel)key.channel();
 
                    if (result.length() > 0 ) writeMessage(socket,ret);
                }
            }
        }
    }
 
    public void writeMessage(SocketChannel socket,String ret) {
        System.out.println("Inside the loop");
        if (ret.equals("quit") || ret.equals("shutdown")) return;
 
        try {
            String s = "This is content from server!—————————————–";
            Charset set = Charset.forName("us-ascii");
            CharsetDecoder dec = set.newDecoder();
            CharBuffer charBuf = dec.decode(ByteBuffer.wrap(s.getBytes()));
 
            System.out.println(charBuf.toString());
 
            int nBytes = socket.write(ByteBuffer.wrap((charBuf.toString()).getBytes()));
            System.out.println("nBytes = "+nBytes);
            result = null;
        } catch(Exception e) {
            e.printStackTrace();
        }
    }
 
    public String readMessage(SelectionKey key) {
        int nBytes = 0;
        socket = (SocketChannel)key.channel();
        ByteBuffer buf = ByteBuffer.allocate(1024);
        try {
            nBytes = socket.read(buf);
            buf.flip();
            Charset charset = Charset.forName("us-ascii");
            CharsetDecoder decoder = charset.newDecoder();
            CharBuffer charBuffer = decoder.decode(buf);
            result = charBuffer.toString();
        } catch(IOException e) {
            e.printStackTrace();
        }
        return result;
    }
 
    public static void main(String args[]) {
        NonBlockingServer nb;
        if (args.length < 1) nb = new NonBlockingServer()
        else {
            int port = Integer.parseInt(args[0]);
            nb = new NonBlockingServer(port);
        }
 
        try {
            nb.startServer();
            System.out.println("the nonBlocking server is started!");
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(-1);
        }
    }
}
2.2.4.2 Client code:
public class Client {
    public SocketChannel client = null;
    public InetSocketAddress isa = null;
    public RecvThread rt = null;
    private String host;
    private int port;
 
    public Client(String host, int port) {
        this.host = host;
        this.port = port;
    }
 
    public void makeConnection() {
        String proxyHost = "192.168.254.212";
        String proxyPort = "1080";
 
        System.getProperties().put("socksProxySet", "true");
        System.getProperties().put("socksProxyHost", proxyHost);
        System.getProperties().put("socksProxyPort", proxyPort);
 
        int result = 0;
        try {
            client = SocketChannel.open();
            isa = new InetSocketAddress(host, port);
            client.connect(isa);
            client.configureBlocking(false);
            receiveMessage();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
 
        long begin = System.currentTimeMillis();
        sendMessage();
        long end = System.currentTimeMillis();
        long userTime = end - begin;
        System.out.println("use tiem: " + userTime);
 
        try {
            interruptThread();
            client.close();
            System.exit(0);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
 
    public int sendMessage() {
        System.out.println("Inside SendMessage");
        String msg = null;
        ByteBuffer bytebuf;
        int nBytes = 0;
 
        try {
            msg = "It’s message from client!";
            System.out.println("msg is "+msg);
            bytebuf = ByteBuffer.wrap(msg.getBytes());
 
            for (int i = 0; i < 1000; i++) {
                nBytes = client.write(bytebuf);
                System.out.println(i + " finished");
            }
 
            interruptThread();
 
            try {
                Thread.sleep(5000);
            } catch (Exception e) {
                e.printStackTrace();
            }
 
            client.close();
            return -1;
        } catch (IOException e) {
            e.printStackTrace();
        }
 
        return nBytes;
    }
 
    public void receiveMessage() {
        rt = new RecvThread("Receive THread", client);
        rt.start();
    }
 
    public void interruptThread() {
        rt.val = false;
    }
 
    public static void main(String args[]) {
        if (args.length < 2) {
            System.err.println("You should put 2 args: host,port");
        } else {
            String host = args[0];
            int port = Integer.parseInt(args[1]);
 
            Client cl = new Client(host, port);
            cl.makeConnection();
        }
 
        BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
        String msg;
    }
 
    public class RecvThread extends Thread {
        public SocketChannel sc = null;
        public boolean val = true;
 
        public RecvThread(String str, SocketChannel client) {
            super(str);
            sc = client;
        }
 
        public void run() {
            int nBytes = 0;
            ByteBuffer buf = ByteBuffer.allocate(2048);
 
            try {
                while (val) {
                    while ((nBytes = nBytes = client.read(buf)) > 0) {
                        buf.flip();
                        Charset charset = Charset.forName("us-ascii");
                        CharsetDecoder decoder = charset.newDecoder();
                        CharBuffer charBuffer = decoder.decode(buf);
                        String result = charBuffer.toString();
                        System.out.println("the server return: " + result);
                        buf.flip();
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
Reactor模式和NIO
当前分布式计算 Web Services盛行天下,这些网络服务的底层都离不开对socket的操作。他们都有一个共同的结构:
Read request
Decode request
Process service
Encode reply
Send reply
经典的网络服务的设计如下图,在每个线程中完成对数据的处理:
但这种模式在用户负载增加时,性能将下降非常的快。我们需要重新寻找一个新的方案,保持数据处理的流畅,很显然,事件触发机制是最好的解决办法,当有事件发生时,会触动handler,然后开始数据的处理。
Reactor模式类似于AWT中的Event处理:
Reactor模式参与者
1.Reactor 负责响应IO事件,一旦发生,广播发送给相应的Handler去处理,这类似于AWT的thread
2.Handler 是负责非堵塞行为,类似于AWT ActionListeners;同时负责将handlers与event事件绑定,类似于AWT addActionListener
如图:
Java的NIO为reactor模式提供了实现的基础机制,它的Selector当发现某个channel有数据时,会通过SlectorKey来告知我们,在此我们实现事件和handler的绑定。
我们来看看Reactor模式代码:
public class Reactor implements Runnable{
 
  final Selector selector;
  final ServerSocketChannel serverSocket;
 
  Reactor(int port) throws IOException {
    selector = Selector.open();
    serverSocket = ServerSocketChannel.open();
    InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(),port);
    serverSocket.socket().bind(address);
 
    serverSocket.configureBlocking(false);
    //向selector注册该channel
     SelectionKey sk =serverSocket.register(selector,SelectionKey.OP_ACCEPT);
 
    logger.debug("–>Start serverSocket.register!");
 
    //利用sk的attache功能绑定Acceptor 如果有事情,触发Acceptor
    sk.attach(new Acceptor());
    logger.debug("–>attach(new Acceptor()!");
  }
 
  public void run() { // normally in a new Thread
    try {
    while (!Thread.interrupted())
    {
      selector.select();
      Set selected = selector.selectedKeys();
      Iterator it = selected.iterator();
      //Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。
      while (it.hasNext())
        //来一个事件第一次触发一个accepter线程
        //以后触发SocketReadHandler
        dispatch((SelectionKey)(it.next()));
        selected.clear();
      }
    }catch (IOException ex) {
        logger.debug("reactor stop!"+ex);
    }
  }
 
  //运行Acceptor或SocketReadHandler
  void dispatch(SelectionKey k) {
    Runnable r = (Runnable)(k.attachment());
    if (r != null){
      // r.run();
 
    }
  }

  class Acceptor implements Runnable { // inner
    public void run() {
    try {
      logger.debug("–>ready for accept!");
      SocketChannel c = serverSocket.accept();
      if (c != null)
        //调用Handler来处理channel
        new SocketReadHandler(selector, c);
      }
    catch(IOException ex) {
      logger.debug("accept stop!"+ex);
    }
    }
  }
}
以上代码中巧妙使用了SocketChannel的attach功能,将Hanlder和可能会发生事件的channel链接在一起,当发生事件时,可以立即触发相应链接的Handler。
再看看Handler代码:
public class SocketReadHandler implements Runnable {
 
  public static Logger logger = Logger.getLogger(SocketReadHandler.class);
 
  private Test test=new Test();
 
  final SocketChannel socket;
  final SelectionKey sk;
 
   static final int READING = 0, SENDING = 1;
  int state = READING;
 
  public SocketReadHandler(Selector sel, SocketChannel c)
    throws IOException {
 
    socket = c;
 
    socket.configureBlocking(false);
     sk = socket.register(sel, 0);
    //将SelectionKey绑定为本Handler 下一步有事件触发时,将调用本类的run方法。
    //参看dispatch(SelectionKey k)
    sk.attach(this);
 
    //同时将SelectionKey标记为可读,以便读取。
    sk.interestOps(SelectionKey.OP_READ);
    sel.wakeup();
  }
 
  public void run() {
    try{
    // test.read(socket,input);
      readRequest() ;
    }catch(Exception ex){
    logger.debug("readRequest error"+ex);
    }
  }
private void readRequest() throws Exception {
 
  ByteBuffer input = ByteBuffer.allocate(1024);
  input.clear();
  try {
    int bytesRead = socket.read(input);
    ……
    //激活线程池处理这些request
    requestHandle(new Request(socket,btt));
    …..
  }catch(Exception e) {
  }
}
注意在Handler里面又执行了一次attach,这样,覆盖前面的Acceptor,下次该Handler又有READ事件发生时,将直接触发Handler.从而开始了数据的读 处理 写 发出等流程处理。
将数据读出后,可以将这些数据处理线程做成一个线程池,这样,数据读出后,立即扔到线程池中,这样加速处理速度:
更进一步,我们可以使用多个Selector分别处理连接和读事件。
一个高性能的Java网络服务机制就要形成,激动人心的集群并行计算即将实现。
3. Socket网络框架 MINA
a) Overview
MINA是一个网络应用框架,在不牺牲性能和可扩展性的前提下用于解决如下问题:
快速开发自己的应用。
高可维护性,高可复用性:网络I/O编码,消息的编/解码,业务逻辑互相分离。
相对容易的进行单元测试。
b) MINA架构:
IoSessionManager: Where real I/O occurs
IoFilters: Filters I/O events • requests
IoHandler: Your protocol logic
IoSession: Represents a connection

IoFilters:
IoFilter为MINA的功能扩展提供了接口。它拦截所有的IO事件进行事件的预处理和河畜处理(AOP)。我们可以把它想象成Servlet的filters。
IoFilter能够实现以下几种目的:
事件日志
性能检测
数据转换(e.g. SSL support),codec
防火墙…等等
codec: ProtocolCodecFactory
MINA提供了方便的Protocol支持。如上说讲,codec在IoFilters中设置。
通过它的Encoder和Decoder,可以方便的扩展并支持各种基于Socket的网络协议,比如HTTP服务器、FTP服务器、Telnet服务器等等。
要实现自己的编码/解码器(codec)只需要实现interface: ProtocolCodecFactory即可.
在MINA 1.0版本,MINA已经实现了几个常用的(codec factory):
DemuxingProtocolCodecFactory,
NettyCodecFactory,
ObjectSerializationCodecFactory,
TextLineCodecFactory
其中:
TextLineCodecFactory:
A ProtocolCodecFactory that performs encoding and decoding between a text line data and a Java
string object. This codec is useful especially when you work with a text-based protocols such as SMTP and IMAP.
ObjectSerializationCodecFactory:
A ProtocolCodecFactory that serializes and deserializes Java objects. This codec is very useful when
you have to prototype your application rapidly without any specific codec.
DemuxingProtocolCodecFactory:
A composite ProtocolCodecFactory that consists of multiple MessageEncoders and MessageDecoders. ProtocolEncoder and ProtocolDecoder this factory returns demultiplex incoming messages and buffers to appropriate MessageEncoders and MessageDecoders.
NettyCodecFactory:
A MINA ProtocolCodecFactory that provides encoder and decoder for Netty2 Messages and MessageRecognizers.
IoHandler :business logic
MINA中,所有的业务逻辑都在实现了IoHandler的class完成。
Interface Handle:
all protocol events fired by MINA. There are 6 event handler methods, and they are all invoked by MINA automatically.
当事件发生时,将触发IoHandler中的方法:
sessionCreated:当一个session创建的时候调用;
sessionOpened:在sessionCreated调用之后被调用;
sessionClosed:当IO连接被关闭时被调用;
sessionIdle:当在远程实体和用户程序之间没有数据传输的时候被调用;
exceptionCaught:当IoAcceptor 或者IoHandler.中出现异常时被调用;
messageReceived:当接受到消息时调用;
messageSent:当发出请求时调用。
MINA 1.0中,IoHandler的实现类:
ChainedIoHandler
DemuxingIoHandler,
IoHandlerAdapter
SingleSessionIoHandlerDelegate
StreamIoHandler
具体细节可参考javadoc。
c) MINA的高级主题:线程模式
MINA通过它灵活的filter机制来提供多种线程模型。
没有线程池过滤器被使用时MINA运行在一个单线程模式。
如果添加了一个IoThreadPoolFilter到IoAcceptor,将得到一个leader-follower模式的线程池。
如果再添加一个ProtocolThreadPoolFilter,server将有两个线程池:
一个(IoThreadPoolFilter)被用于对message对象进行转换,另外一个(ProtocolThreadPoolFilter)被用于处理业务逻辑。
SimpleServiceRegistry加上IoThreadPoolFilter和ProtocolThreadPoolFilter的缺省实现即可适用于需要高伸缩性的应用。如果想使用自己的线程模型,请参考SimpleServiceRegistry的源代码,并且自己初始化Acceptor。
IoThreadPoolFilter threadPool = new IoThreadPoolFilter();threadPool.start();
IoAcceptor acceptor = new SocketAcceptor();
acceptor.getFilterChain().addLast( "threadPool", threadPool);
ProtocolThreadPoolFilter threadPool2 = new ProtocolThreadPoolFilter();
threadPool2.start();
ProtocolAcceptor acceptor2 = new IoProtocolAcceptor( acceptor );
acceptor2.getFilterChain().addLast( "threadPool", threadPool2 );

threadPool2.stop();
threadPool.stop();
d) 采用MINA进行socket开发,一般步骤如下:
Begin:
IoAcceptor acceptor = new SocketAcceptor(); //建立client接收器
or client:
SocketConnector connector = new SocketConnector(); //建立一个连接器
server的属性配置:
SocketAcceptorConfig cfg = new SocketAcceptorConfig();
cfg.setReuseAddress(true);
cfg.getFilterChain().addLast( "codec",
                     new ProtocolCodecFilter( new ObjectSerializationCodecFactory() ) ); //对象序列化 codec factory
cfg.getFilterChain().addLast( "logger", new LoggingFilter() );
绑定address和business logic
server:
acceptor.bind(new InetSocketAddress( SERVER_PORT ),
              new ServerSessionHandler( ), cfg ); // 绑定address和handler
client:
connector.connect(new InetSocketAddress( HOSTNAME, PORT ),
                  new ClientSessionHandler(msg), cfg );
实现自己的业务逻辑: IoHandler
如有必要,实现自己的CODEC
下面的代码演示了采用ObjectSerializationCodecFactory给服务端传送文件:
e) Client
public class Client {
    private static final String HOSTNAME = "192.168.0.81";
    private static final int PORT = 8080;
    private static final int CONNECT_TIMEOUT = 30; // seconds
 
    public static void main( String[] args ) throws Throwable {
        System.out.println("in nio client");
        SocketConnector connector = new SocketConnector();
        // Configure the service.
        SocketConnectorConfig cfg = new SocketConnectorConfig();
        cfg.setConnectTimeout( CONNECT_TIMEOUT );
        cfg.getFilterChain().addLast(
                    "codec",
                    new ProtocolCodecFilter( new ObjectSerializationCodecFactory() ) );
        cfg.getFilterChain().addLast( "logger", new LoggingFilter() );
        IoSession session;
        if(args.length > 1) {
           connector.connect(new InetSocketAddress( HOSTNAME, PORT ),
                    new ClientSessionHandler(args), cfg );
        } else {
           String[] files = {"E:/music/lcl/juhuatai.mp3",
                             "E:/music/lcl/jimosazhouleng.mp3"};
           connector.connect(new InetSocketAddress( HOSTNAME, PORT ),
                    new ClientSessionHandler(files), cfg );
        }
    }
}
f) Clint handle(client端的业务代码)
public class ClientSessionHandler extends IoHandlerAdapter {
    private String[] files;
 
    public ClientSessionHandler(String[] files) {
        this.files = files;
    }
 
    public void sessionOpened( IoSession session ) {
        for (int i = 0; i < this.files.length; i++) {
            Thread sendMessageThread = new SendMessageThread("Thread" + i, session,files[i]);
            sendMessageThread.start();
        }
    }
 
    public void messageReceived( IoSession session, Object message ) {
        System.out.println("in messageReceived!");
    }
 
    public void exceptionCaught( IoSession session, Throwable cause ) {
        session.close();
    }
 
    public class SendMessageThread extends Thread {
        private IoSession session;
        private String filename;
 
        public SendMessageThread(String name, IoSession session, String filename) {
            super(name);
            this.session = session;
            this.filename = filename;
        }
 
        public void run() {
            System.out.println("start thread: " + this.getName());
            try {
                ByteBuffer buf = ByteBuffer.allocate(Constants.BUF_SIZE);
 
                FileChannel fc = new FileInputStream(filename).getChannel();
 
                int index;
                while ((index = NioFileUtil.readFile(fc, buf)) > 0) {
                  buf.flip();
                  byte[] bs;
 
                  if (index == buf.capacity()) {
                      bs = buf.array();
                  } else {
                      bs = new byte[index];
 
                      int i = 0;
                      while (buf.hasRemaining()) {
                          bs[i++] = buf.get();
                      }
                  }
 
                  Message msg = new Message(filename,Constants.CMD_SEND, bs);
                  session.write(msg);
                }
 
                Message msg = new Message(filename, Constants.CMD_FINISHED, null);
                session.write(msg);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
g) Server:
public class Server {
    private static final int SERVER_PORT = 8080;
 
    public static void main( String[] args ) throws Throwable {
        IoAcceptor acceptor = new SocketAcceptor();
        // Prepare the service configuration.
        SocketAcceptorConfig cfg = new SocketAcceptorConfig();
        cfg.setReuseAddress( true );
        cfg.getFilterChain().addLast(
                    "codec",
                    new ProtocolCodecFilter( new ObjectSerializationCodecFactory() ) );
        cfg.getFilterChain().addLast( "logger", new LoggingFilter() );
        acceptor.bind(
                new InetSocketAddress( SERVER_PORT ),
                new ServerSessionHandler( ), cfg );
        System.out.println( "nioFileServer Listening on port " + SERVER_PORT );
    }
}
h) Server handle:(Server端业务代码)
public class ServerSessionHandler extends IoHandlerAdapter {
    public void sessionOpened( IoSession session ) {
        // set idle time to 60 seconds
        System.out.println("in sessionOpened");
        session.setIdleTime( IdleStatus.BOTH_IDLE, 60 );
        session.setAttribute("times",new Integer(0));
    }
 
    public void messageReceived( IoSession session, Object message ) {
        System.out.println("in messageReceived");
        Message msg = (Message) message;
        System.out.println("the file name is: " + msg.getFileName() + ""n");
       this.process(session, msg);
   }
 
   private void process(IoSession session, Message message) {
       String[] temparray = message.getFileName().split("[//]");
        String filename ="d:/" + temparray[temparray.length - 1];
        if (session.containsAttribute(message.getFileName())) {
            FileChannel channel = (FileChannel)session.getAttribute(message.getFileName());
            if (message.getType().equals(Constants.CMD_SEND)) {
                try {
                    NioFileUtil.writeFile(channel, ByteBuffer.wrap(message.getContent()));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            } else {
                try {
                    channel.close();
                    channel = null;
                    session.removeAttribute(message.getFileName());
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        } else {
            try {
                FileChannel channel = new FileOutputStream(filename).getChannel();
                NioFileUtil.writeFile(channel, ByteBuffer.wrap(message.getContent()));
                session.setAttribute(message.getFileName(), channel);
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
 
    public void sessionIdle( IoSession session, IdleStatus status ) {
        SessionLog.info( session, "Disconnecting the idle." );
        // disconnect an idle client
        session.close();
    }
 
    public void exceptionCaught( IoSession session, Throwable cause ) {
        // close the connection on exceptional situation
        session.close();
    }
}
i) 文件操作:
public class NioFileUtil {
 
    public static void writeFile(FileChannel fileChannel, ByteBuffer buf) throws Exception {
        buf.clear();
        fileChannel.write(buf);
    }
 
    public static int readFile(FileChannel fileChannel,ByteBuffer buf) throws IOException {
        buf.rewind();
        int index = fileChannel.read(buf);
        return index;
    }
}
j) 常量:

public class Constants {
    public static final String CMD_FINISHED = "FINISHED";
    public static final String CMD_SEND = "SEND";
    public static final int BUF_SIZE = 10240;
    private Constants(){}
}
Demo
Introduction
org.apache.mina.example.chat
Chat server which demonstates using the text line codec and Spring integration.
org.apache.mina.example.chat.client
Swing based chat client.
org.apache.mina.example.echoserver
Echo server which demonstate

分享到:
评论
2 楼 nocb 2008-08-04  
好高深啊,学习中
你好,我想用mina写个客户端,从数据的队列中取数据,发送给服务端,如何写啊?
我不想每条数据建一个连接 。谢谢
hansen.nocb@gmail.com
1 楼 nocb 2008-08-04  
好高深啊,学习中
你好,我想用mina写个客户端,从数据的队列中取数据,发送给客户端,如何写啊?
我不想每条数据建一个连接 。谢谢
hansen.nocb@gmail.com

相关推荐

Global site tag (gtag.js) - Google Analytics