`

mina 服务端启动源码阅读

    博客分类:
  • mina
阅读更多
$home=org.apache.mina
启动过程中,主要业务开始于
$home.core.polling.AbstractPollingIoAcceptor.Acceptor
线程类监听,

case1:未初始化server,进行绑定初始化
具体绑定代码在
$home.transport.socket.nio.NioSocketAcceptor.open(SocketAddress localAddress){
	           channel.configureBlocking(false);
		   //部分代码
	    ServerSocketChannel channel = ServerSocketChannel.open();
            ServerSocket socket = channel.socket();
            socket.setReuseAddress(isReuseAddress());
            socket.bind(localAddress, getBacklog());
            channel.register(selector, SelectionKey.OP_ACCEPT);
}


case2:clent的其他事件

  $home.polling.AbstractPollingIoAcceptor.Acceptor.processHandles(Iterator<H> handles){
	
	A.获取session
	 S session = accept(processor, handle);
	B.初始化session
	initSession(session, null, null);
	C.启动线程处理事件
	session.getProcessor().add(session);
	
  } 



  A中accept方法调用
  $home.transport.socket.nio.NioSocketAcceptor.accept(IoProcessor<NioSession> processor, ServerSocketChannel handle){
		      SelectionKey key = handle.keyFor(selector);

               // 部分代码
		SocketChannel ch = handle.accept();
		return new NioSocketSession(this, processor, ch);
 }	


C中add方法会调用
 $home.core.polling.AbstractPollingIoProcessor.startupProcessor(){
        //D
	Processor processor = processorRef.get();
	//部分代码,启动新线程处理事件
        if (processor == null) {
            processor = new Processor();

            if (processorRef.compareAndSet(null, processor)) {
                executor.execute(new NamePreservingRunnable(processor, threadName));
            }
        }
 }


D处的Processor是内部类$HOME.polling.AbstractPollingIoProcessor.Processor.
是一个线程类,其中主要业务方法
 read(S session){
        //部分代码

        IoSessionConfig config = session.getConfig();
	//用到了启动server时的session配置
        int bufferSize = config.getReadBufferSize();
        IoBuffer buf = IoBuffer.allocate(bufferSize);

        final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
        try {
            int readBytes = 0;
            int ret;
            try {
                if (hasFragmentation) {
                    while ((ret = read(session, buf)) > 0) {
                        readBytes += ret;

                        if (!buf.hasRemaining()) {
                            break;
                        }
                    }
                } else {
                    ret = read(session, buf);

                    if (ret > 0) {
                        readBytes = ret;
                    }
                }
            } finally {
                buf.flip();
            }
            //有内容,依次调用配置的filter
            if (readBytes > 0) {
                IoFilterChain filterChain = session.getFilterChain();
                filterChain.fireMessageReceived(buf);
                buf = null;
                if (hasFragmentation) {
                    if (readBytes << 1 < config.getReadBufferSize()) {
                        session.decreaseReadBufferSize();
                    } else if (readBytes == config.getReadBufferSize()) {
                        session.increaseReadBufferSize();
                    }
                }
            }
 }
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics