`
nkliuliu
  • 浏览: 207920 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Scalable IO in Java

阅读更多

       说实话转载的这篇文章没有看明白!

       当前分布式计算 Web Services盛行天下,这些网络服务的底层都离不开对socket的操作。他们都有一个共同的结构:
1. Read request
2. Decode request
3. Process service
4. Encode reply
5. Send reply

经典的网络服务的设计如下图,在每个线程中完成对数据的处理:

但这种模式在用户负载增加时,性能将下降非常的快。我们需要重新寻找一个新的方案,保持数据处理的流畅,很显然,事件触发机制是最好的解决办法,当有事件发生时,会触动handler,然后开始数据的处理。

Reactor模式类似于AWT中的Event处理:

Reactor模式参与者

1.Reactor 负责响应IO事件,一旦发生,广播发送给相应的Handler去处理,这类似于AWT的thread
2.Handler 是负责非堵塞行为,类似于AWT ActionListeners;同时负责将handlers与event事件绑定,类似于AWT addActionListener

如图:

Java的NIO为reactor模式提供了实现的基础机制,它的Selector当发现某个channel有数据时,会通过SlectorKey来告知我们,在此我们实现事件和handler的绑定。

我们来看看Reactor模式代码:


public class Reactor implements Runnable{

  final Selector selector;
  final ServerSocketChannel serverSocket;

  Reactor(int port) throws IOException {
    selector = Selector.open();
    serverSocket = ServerSocketChannel.open();
    InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(),port);
    serverSocket.socket().bind(address);

    serverSocket.configureBlocking(false);
    //向selector注册该channel
     SelectionKey sk =serverSocket.register(selector,SelectionKey.OP_ACCEPT);

    logger.debug("-->Start serverSocket.register!");

    //利用sk的attache功能绑定Acceptor 如果有事情,触发Acceptor
    sk.attach(new Acceptor());
    logger.debug("-->attach(new Acceptor()!");
  }


  public void run() { // normally in a new Thread
    try {
    while (!Thread.interrupted())
    {
      selector.select();
      Set selected = selector.selectedKeys();
      Iterator it = selected.iterator();
      //Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。
      while (it.hasNext())
        //来一个事件 第一次触发一个accepter线程
        //以后触发SocketReadHandler
        dispatch((SelectionKey)(it.next()));
        selected.clear();
      }
    }catch (IOException ex) {
        logger.debug("reactor stop!"+ex);
    }
  }

  //运行Acceptor或SocketReadHandler
  void dispatch(SelectionKey k) {
    Runnable r = (Runnable)(k.attachment());
    if (r != null){
      // r.run();

    }
  }

  class Acceptor implements Runnable { // inner
    public void run() {
    try {
      logger.debug("-->ready for accept!");
      SocketChannel c = serverSocket.accept();
      if (c != null)
        //调用Handler来处理channel
        new SocketReadHandler(selector, c);
      }
    catch(IOException ex) {
      logger.debug("accept stop!"+ex);
    }
    }
  }
}

以上代码中巧妙使用了SocketChannel的attach功能,将Hanlder和可能会发生事件的channel链接在一起,当发生事件时,可以立即触发相应链接的Handler。

再看看Handler代码:

public class SocketReadHandler implements Runnable {

  public static Logger logger = Logger.getLogger(SocketReadHandler.class);

  private Test test=new Test();

  final SocketChannel socket;
  final SelectionKey sk;

   static final int READING = 0, SENDING = 1;
  int state = READING;

  public SocketReadHandler(Selector sel, SocketChannel c)
    throws IOException {

    socket = c;

    socket.configureBlocking(false);
     sk = socket.register(sel, 0);

    //将SelectionKey绑定为本Handler 下一步有事件触发时,将调用本类的run方法。
    //参看dispatch(SelectionKey k)
    sk.attach(this);

    //同时将SelectionKey标记为可读,以便读取。
    sk.interestOps(SelectionKey.OP_READ);
    sel.wakeup();
  }

  public void run() {
    try{
    // test.read(socket,input);
      readRequest() ;
    }catch(Exception ex){
    logger.debug("readRequest error"+ex);
    }
  }


/**
* 处理读取data
* @param key
* @throws Exception
*/
private void readRequest() throws Exception {

  ByteBuffer input = ByteBuffer.allocate(1024);
  input.clear();
  try{

    int bytesRead = socket.read(input);

    ......

    //激活线程池 处理这些request
    requestHandle(new Request(socket,btt));

    .....


  }catch(Exception e) {
  }

}

注意在Handler里面又执行了一次attach,这样,覆盖前面的Acceptor,下次该Handler又有READ事件发生时,将直接触发Handler.从而开始了数据的读 处理 写 发出等流程处理。

将数据读出后,可以将这些数据处理线程做成一个线程池,这样,数据读出后,立即扔到线程池中,这样加速处理速度:

更进一步,我们可以使用多个Selector分别处理连接和读事件。

一个高性能的Java网络服务机制就要形成,激动人心的集群并行计算即将实现。

分享到:
评论

相关推荐

    Scalable IO in Java.zip

    Scalable IO in Java是java.util.concurrent包的作者,大师Doug Lea关于分析与构建可伸缩的高性能IO服务的一篇经典文章,在文章中Doug Lea通过各个角度,循序渐进的梳理了服务开发中的相关问题,以及在解决问题的...

    scalable io in java

    Scalable in in java .Doug Lee的著作,学习线程模型必备。

    Scalable io in java.doc

    nio.pdf 博文链接:https://shenkun-918.iteye.com/blog/965654

    Scalable IO in Java -Doug Lea.rar

    Scalable IO in Java -Doug Lea.rarScalable IO in Java -Doug Lea.rar

    doug lea《Scalable IO in Java》

    doug lea 大神 Reactor 说明 Scalable IO in Java 原文必读 原文必读 原文必读

    Scalable IO in Java原文和翻译

    Doug Lea大神的一篇PPT文档和翻译文档,里面介绍了多种Reactor模型的设计,注意是借助翻译软件进行的翻译。

    Scalable IO in Java -Doug Lea

    Scalable IO in Java -Doug Lea 描述java nio 和reactor 设计模式之间的关系

    Scalable IO in Java doug lea.zip

    由doug lea 写的pdf,全是英文的,需要大家自己翻译一下

    Scalable IO in Java.pdf

    Scalable IO in Java.pdf

    Doug Lea Scalable IO in Java

    Scalable IO in Java -Doug Lea 学习NIO必看经典 描述java nio 和reactor 设计模式之间的关系

    scalable-io-in-java-中文.pdf

    网上都是不带书签,并且有些地方翻译有歧义。 所以我整理了一个。 特点:带书签 Scalable io in java 中文版,并且对有歧义的语义进行了修改。

    Scalable IO in Java.rar

    在服务端,注册服务对某个端口进行监听,然后使用阻塞的accept()函数,来取出请求队列中的socket,或者一直等待,直到收到客户端的请求。连接建立后,服务端接受输入流进行处理,返回结果给客户端。...

    Scalable IO in Java by Doug Lea

    Java NIO学习文档。值得学习。reactor 设计模式。作者Doug Lea。java.util.concurrent包的作者

    Scalable IO in Java -- Doug Lea

    Scalable network services,Event-driven processing,Reactor pattern and so on。

    scalable-io-in-java-中文1

    背景:AWT中的事件事件驱动 IO 使用类似的想法,但在设计上有所不同Reactor 模式• Reactor 通过分派合适的Handler来响应IO事件类似于

Global site tag (gtag.js) - Google Analytics