`
uniseraph
  • 浏览: 82665 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

【mina指南】mina中的reactor模式(二)

阅读更多
NioProcessor是mina中的另一个核心部分,与NioSocketAcceptor类似,NioProcessor三个主要功能是:
1、接受一个NioSession
2、出来NioSession上的read、write等事件
3、关闭一个NioSession

与NioSocketAcceptor类似,NioProcessor的实现采用了template模式,以上功能整体流程在NioProcessor的父类AbstractPollingIoProcessor中基本完成了,NioSocketAcceptor只是针对Nio的情况完成实现。

创建NioProcessor

如上图,NioSocketAcceptor创建了SimpleIoProcessorPool,SimpleIoProcessorPool中默认存在cpu数+1个NioProcessor,并且这些NioProcessor的工作者线程共享一个线程池。

接受一个NioSession
与NioSocketAcceptor新增一个端口绑定类似,NioProcessor.addSession只是将NioSocketAcceptor新建的NioSession放入一个消息队列中,由工作者线程负责初始化该NioSession,在selector为该session注册OP_READ事件。

关闭一个NioSession
与接受一个NioSession类似,不再描述

NioSession的数据处理
为NioProcessor继承自AbstractPollingIoProcessor的工作者线程中完成主要功能
1. 在循环中,selector监听所有端口,注意在NioProcessor中的select超时时间为1秒,这意味着最多一秒钟的时候,NioProcessor.Worker线程唤醒一次。而在NioSocketAcceptor.Work.run中select是没有超时时间的。下面Worker线程两次唤醒之间简称为一个周期,易知一个周期的长度小于等于一秒。
        public void run() {
            int nSessions = 0;
            lastIdleCheckTime = System.currentTimeMillis();

            for (;;) {
                try {
                    boolean selected = select(1000);

                    nSessions += add();
                    updateTrafficMask();

                    if (selected) {
                        process();
                    }

                    long currentTime = System.currentTimeMillis();
                    flush(currentTime);
                    nSessions -= remove();
                    notifyIdleSessions(currentTime);

                    if (nSessions == 0) {
                        synchronized (lock) {
                            if (newSessions.isEmpty() && isSelectorEmpty()) {
                                worker = null;
                                break;
                            }
                        }
                    }

                    // Disconnect all sessions immediately if disposal has been
                    // requested so that we exit this loop eventually.
                    if (isDisposing()) {
                        for (Iterator<T> i = allSessions(); i.hasNext(); ) {
                            scheduleRemove(i.next());
                        }
                        wakeup();
                    }
                } catch (Throwable t) {
                    ExceptionMonitor.getInstance().exceptionCaught(t);

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e1) {
                        ExceptionMonitor.getInstance().exceptionCaught(e1);
                    }
                }
            }

            try {
                synchronized (disposalLock) {
                    if (isDisposing()) {
                        dispose0();
                    }
                }
            } catch (Throwable t) {
                ExceptionMonitor.getInstance().exceptionCaught(t);
            } finally {
                disposalFuture.setValue(true);
            }
        }

2. 在addSession中从新增session队列newSeesion获取一个新增NioSession,并开始监控之。
    private int add() {
        int addedSessions = 0;
        
        // Loop on the new sessions blocking queue, to count
        // the number of sessions who has been created
        for (;;) {
            T session = newSessions.poll();

            if (session == null) {
                // We don't have anymore new sessions
                break;
            }


            if (addNow(session)) {
                // The new session has been added to the 
                addedSessions ++;
            }
        }

        return addedSessions;
    }

2.1 在addNow(T session)中完成了单个NioSession的初始化
  private boolean addNow(T session) {

        boolean registered = false;
        boolean notified = false;
        try {
            init(session);
            registered = true;

            // Build the filter chain of this session.
            session.getService().getFilterChainBuilder().buildFilterChain(
                    session.getFilterChain());

            // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
            // in AbstractIoFilterChain.fireSessionOpened().
            ((AbstractIoService) session.getService()).getListeners().fireSessionCreated(session);
            notified = true;
        } catch (Throwable e) {
            if (notified) {
                // Clear the DefaultIoFilterChain.CONNECT_FUTURE attribute
                // and call ConnectFuture.setException().
                scheduleRemove(session);
                session.getFilterChain().fireExceptionCaught(e);
                wakeup();
            } else {
                ExceptionMonitor.getInstance().exceptionCaught(e);
                try {
                    destroy(session);
                } catch (Exception e1) {
                    ExceptionMonitor.getInstance().exceptionCaught(e1);
                } finally {
                    registered = false;
                }
            }
        }
        return registered;
    }


2.1.1 初始化session,这里是Template Method的又一个体现,因为不同的类型的session初始化实现不同, 在NioProcessor中包括:设置非堵塞模式.为该session注册OP_READ事件。
 @Override
    protected void init(NioSession session) throws Exception {
        SelectableChannel ch = (SelectableChannel) session.getChannel();
        ch.configureBlocking(false);
        session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
    }


2.1.2 构建IoFilterChain,具体请参考http://uniseraph.iteye.com/blog/228194

2.1.3 触发NioSession上的相关事件,依次为sessionCreated->sessionOpened ->
IoServiceListener的sessionCreated

3 修改sessio的traffic参数
从trafficControllingSessions获取需要修改session的traffic参数,具体与新增NioSession类似,不再详细描述。

4 接受处理socket数据并应答
关键内容来了,如果有NioSession的channel处理于OP_READ状态,则处理之
     if (selected) {
                        process();
                    }


process方法对于所有发生了OP_READ或OP_WRITE的NioSession依次进行处理,注意虽然在NioSession初始化的时候只注册了OP_READ事件,但是在上一周期调用session.write方法的时候,上一周期的flush方法将会注册OP_WRITE方法。本周期发送的数据都是上一周期确定的。

    private void process(T session) {

        if (isReadable(session) && session.getTrafficMask().isReadable()) {
            read(session);
        }

        if (isWritable(session) && session.getTrafficMask().isWritable()) {
            scheduleFlush(session);
        }
    }


在read方法中读取socket上的数据,调用发IoFilter,逐层传递到IoHander(具体参考http://uniseraph.iteye.com/blog/228194);如果读到-1,则增加一个关闭连接消息到队列中;如果发生异常,则异常调用IoFilter和IoHandler的fireExceptionCaught方法
    private void read(T session) {
        IoSessionConfig config = session.getConfig();
        IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());

        final boolean hasFragmentation =
            session.getTransportMetadata().hasFragmentation();

        try {
            int readBytes = 0;
            int ret;

            try {
                if (hasFragmentation) {
                    while ((ret = read(session, buf)) > 0) {
                        readBytes += ret;
                        if (!buf.hasRemaining()) {
                            break;
                        }
                    }
                } else {
                    ret = read(session, buf);
                    if (ret > 0) {
                        readBytes = ret;
                    }
                }
            } finally {
                buf.flip();
            }

            if (readBytes > 0) {
                session.getFilterChain().fireMessageReceived(buf);
                buf = null;

                if (hasFragmentation) {
                    if (readBytes << 1 < config.getReadBufferSize()) {
                        session.decreaseReadBufferSize();
                    } else if (readBytes == config.getReadBufferSize()) {
                        session.increaseReadBufferSize();
                    }
                }
            }
            if (ret < 0) {
                scheduleRemove(session);
            }
        } catch (Throwable e) {
            if (e instanceof IOException) {
                scheduleRemove(session);
            }
            session.getFilterChain().fireExceptionCaught(e);
        }
    }


发送数据


6 关闭连接及其他
如果没有消息积累,也没有新创建的连接,则关闭线程池

  • 大小: 9 KB
5
0
分享到:
评论
2 楼 wangwenjunHi 2010-01-13  
Aaronlee 写道
想问下,对于客户端传入的不同的MESSAGE ,在服务器端怎么解码?

至于怎么解码,就要看你如何进行解码器的实现了,通常来说服务端接收到的来自不同客户端的之间的通信协议都会是一致的,所以就按照协议的格式进行解码了
1 楼 Aaronlee 2009-06-15  
想问下,对于客户端传入的不同的MESSAGE ,在服务器端怎么解码?

相关推荐

Global site tag (gtag.js) - Google Analytics