`
com0606
  • 浏览: 60373 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

侃侃tomcat的bio,nio

 
阅读更多

对于这2种io以及在socket的应用不作描述,主要探究一下在tomcat中是如何应用这2种io的。找了2个版本的tomcat做一下对比,以tomcat4、tomcat6为例。

1 .tomcat4

在tomcat4中,只有bio的使用。首先看这个类

public final class HttpConnector
    implements Connector, Lifecycle, Runnable

  它实现了Runnable接口以及Lifecycle接口,在tomcat中,实现Lifecycle接口的类都需要实现start和stop等接口,作为一种规范,在类被实例化后start方法会被显式调用;并且在启动和销毁时都会触发相应事件,这里不多说。

HttpConnector在被实例化后它的start方法会被调用

 

private Stack processors = new Stack();

public void start() throws LifecycleException {

    // Validate and update our current state
    if (started)
        throw new LifecycleException
            (sm.getString("httpConnector.alreadyStarted"));
    threadName = "HttpConnector[" + port + "]";
    lifecycle.fireLifecycleEvent(START_EVENT, null);
    started = true;

    // Start our background thread
    threadStart();

    // Create the specified minimum number of processors
    while (curProcessors < minProcessors) {
        if ((maxProcessors > 0) && (curProcessors >= maxProcessors))
            break;
        HttpProcessor processor = newProcessor();
        recycle(processor);
    }

}

private void threadStart() {

    log(sm.getString("httpConnector.starting"));

    thread = new Thread(this, threadName);
    thread.setDaemon(true);
    thread.start();

}

public void run() {
    // Loop until we receive a shutdown command
    while (!stopped) {
        // Accept the next incoming connection from the server socket
        Socket socket = null;
        try {
            //                if (debug >= 3)
            //                    log("run: Waiting on serverSocket.accept()");
            socket = serverSocket.accept();
            //                if (debug >= 3)
            //                    log("run: Returned from serverSocket.accept()");
            if (connectionTimeout > 0)
                socket.setSoTimeout(connectionTimeout);
            socket.setTcpNoDelay(tcpNoDelay);
        } catch (AccessControlException ace) {
            log("socket accept security exception", ace);
            continue;
        } catch (IOException e) {
            //                if (debug >= 3)
            //                    log("run: Accept returned IOException", e);
            try {
                // If reopening fails, exit
                synchronized (threadSync) {
                    if (started && !stopped)
                        log("accept error: ", e);
                    if (!stopped) {
                        //                    if (debug >= 3)
                        //                        log("run: Closing server socket");
                        serverSocket.close();
                        //                        if (debug >= 3)
                        //                            log("run: Reopening server socket");
                        serverSocket = open();
                    }
                }
                //                    if (debug >= 3)
                //                        log("run: IOException processing completed");
            } catch (IOException ioe) {
                log("socket reopen, io problem: ", ioe);
                break;
            } catch (KeyStoreException kse) {
                log("socket reopen, keystore problem: ", kse);
                break;
            } catch (NoSuchAlgorithmException nsae) {
                log("socket reopen, keystore algorithm problem: ", nsae);
                break;
            } catch (CertificateException ce) {
                log("socket reopen, certificate problem: ", ce);
                break;
            } catch (UnrecoverableKeyException uke) {
                log("socket reopen, unrecoverable key: ", uke);
                break;
            } catch (KeyManagementException kme) {
                log("socket reopen, key management problem: ", kme);
                break;
            }

            continue;
        }

        // Hand this socket off to an appropriate processor
        HttpProcessor processor = createProcessor();
        if (processor == null) {
            try {
                log(sm.getString("httpConnector.noProcessor"));
                socket.close();
            } catch (IOException e) {
                ;
            }
            continue;
        }
        //            if (debug >= 3)
        //                log("run: Assigning socket to processor " + processor);
        processor.assign(socket);

        // The processor will recycle itself when it finishes

    }

    // Notify the threadStop() method that we have shut ourselves down
    //        if (debug >= 3)
    //            log("run: Notifying threadStop() that we have shut down");
    synchronized (threadSync) {
        threadSync.notifyAll();
    }

}

 

调用start方法,线程被启动,在此之前会创建好ServerSocket,如果ServerSocket异常关闭会重新启动。接着run方法内会一直while循环,调用ServerSocket的accept从请求队列中获取套接字socket

 

private ServerSocket open()
throws IOException, KeyStoreException, NoSuchAlgorithmException,
       CertificateException, UnrecoverableKeyException,
       KeyManagementException
{

    // Acquire the server socket factory for this Connector
    ServerSocketFactory factory = getFactory();

    // If no address is specified, open a connection on all addresses
    if (address == null) {
        log(sm.getString("httpConnector.allAddresses"));
        try {
            return (factory.createSocket(port, acceptCount));
        } catch (BindException be) {
            throw new BindException(be.getMessage() + ":" + port);
        }
    }

    // Open a server socket on the specified address
    try {
        InetAddress is = InetAddress.getByName(address);
        log(sm.getString("httpConnector.anAddress", address));
        try {
            return (factory.createSocket(port, acceptCount, is));
        } catch (BindException be) {
            throw new BindException(be.getMessage() + ":" + address +
                                    ":" + port);
        }
    } catch (Exception e) {
        log(sm.getString("httpConnector.noAddress", address));
        try {
            return (factory.createSocket(port, acceptCount));
        } catch (BindException be) {
            throw new BindException(be.getMessage() + ":" + port);
        }
    }

}

 

在处理socket的时候,会开启线程异步执行,这样可以保证多线程同时处理多个请求,避免单线程只能处理一个请求而其他请求阻塞的情况。HttpConnector采用线程池的方式来实现线程的复用,用stack维护一定数量的HttpProcessor,在需要的时候从stack中pop线程处理请求

private HttpProcessor createProcessor() {

    synchronized (processors) {
        if (processors.size() > 0) {
            // if (debug >= 2)
            // log("createProcessor: Reusing existing processor");
            return ((HttpProcessor) processors.pop());
        }
        if ((maxProcessors > 0) && (curProcessors < maxProcessors)) {
            // if (debug >= 2)
            // log("createProcessor: Creating new processor");
            return (newProcessor());
        } else {
            if (maxProcessors < 0) {
                // if (debug >= 2)
                // log("createProcessor: Creating new processor");
                return (newProcessor());
            } else {
                // if (debug >= 2)
                // log("createProcessor: Cannot create new processor");
                return (null);
            }
        }
    }

}

 

再看处理请求的类

final class HttpProcessor
    implements Lifecycle, Runnable

 它也是实现了Runnable接口,run方法中一直执行while循环,只要有新的socket,就会调用process处理它。

synchronized void assign(Socket socket) {

    // Wait for the Processor to get the previous Socket
    while (available) {
        try {
            wait();
        } catch (InterruptedException e) {
        }
    }

    // Store the newly available Socket and notify our thread
    this.socket = socket;
    available = true;
    notifyAll();

    if ((debug >= 1) && (socket != null))
        log(" An incoming request is being assigned");

}

public void run() {

    // Process requests until we receive a shutdown signal
    while (!stopped) {

        // Wait for the next socket to be assigned
        Socket socket = await();
        if (socket == null)
            continue;

        // Process the request from this socket
        try {
            process(socket);
        } catch (Throwable t) {
            log("process.invoke", t);
        }

        // Finish up this request
        connector.recycle(this);

    }

    // Tell threadStop() we have shut ourselves down successfully
    synchronized (threadSync) {
        threadSync.notifyAll();
    }

}
 
当assign方法被调用,线程被唤醒,run方法中while循环执行,处理socket请求,当一个socket被处理完,调用recycle归还该线程至线程池。
void recycle(HttpProcessor processor) {

    //        if (debug >= 2)
    //            log("recycle: Recycling processor " + processor);
    processors.push(processor);

}
 可以看到,调用了Stack的push方法。
总结:tomcat4基于阻塞io的方法,使用基于Stack数据结构来维护线程池,可以开启多个线程异步同时处理多个并发请求。
 
2.tomcat6
tomcat6开始支持nio了,可以通过改conf/server.xml配置文件开启 
<Connector port="8080" protocol="org.apache.coyote.http11.Http11NioProtocol" 
               connectionTimeout="20000" 
               redirectPort="8443" /> 
 
相关类在org.apache.catalina.tribes.transport包下。
默认是bio模式,实现方式虽然和tomcat4不一样,但是核心内容大差不差。相关类换成了
//ServerSocket启动,监听
public class BioReceiver extends ReceiverBase implements Runnable, ChannelReceiver, ListenCallback

//具体的处理socket类
public class BioReplicationThread extends WorkerThread
 
线程池的实现也和之前不同,维护了idle,used两个LinkedList,WorkerThread就在两个集合中来回切换。
package org.apache.catalina.tribes.transport;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;

/**
 * @author not attributable
 * @version 1.0
 */

public class ThreadPool
{
    /**
     * A very simple thread pool class.  The pool size is set at
     * construction time and remains fixed.  Threads are cycled
     * through a FIFO idle queue.
     */

    List idle = new LinkedList();
    List used = new LinkedList();
    
    Object mutex = new Object();
    boolean running = true;
    
    private static int counter = 1;
    private int maxThreads;
    private int minThreads;
    
    private ThreadCreator creator = null;

    private static synchronized int inc() {
        return counter++;
    }

    
    public ThreadPool (int maxThreads, int minThreads, ThreadCreator creator) throws Exception {
        // fill up the pool with worker threads
        this.maxThreads = maxThreads;
        this.minThreads = minThreads;
        this.creator = creator;
        //for (int i = 0; i < minThreads; i++) {
        for (int i = 0; i < maxThreads; i++) { //temporary fix for thread hand off problem
            WorkerThread thread = creator.getWorkerThread();
            setupThread(thread);
            idle.add (thread);
        }
    }
    
    protected void setupThread(WorkerThread thread) {
        synchronized (thread) {
            thread.setPool(this);
            thread.setName(thread.getClass().getName() + "[" + inc() + "]");
            thread.setDaemon(true);
            thread.setPriority(Thread.MAX_PRIORITY);
            thread.start();
            try {thread.wait(500); }catch ( InterruptedException x ) {}
        }
    }

    /**
     * Find an idle worker thread, if any.  Could return null.
     */
    public WorkerThread getWorker()
    {
        WorkerThread worker = null;
        synchronized (mutex) {
            while ( worker == null && running ) {
                if (idle.size() > 0) {
                    try {
                        worker = (WorkerThread) idle.remove(0);
                    } catch (java.util.NoSuchElementException x) {
                        //this means that there are no available workers
                        worker = null;
                    }
                } else if ( used.size() < this.maxThreads && creator != null) {
                    worker = creator.getWorkerThread();
                    setupThread(worker);
                } else {
                    try { mutex.wait(); } catch ( InterruptedException x ) {Thread.currentThread().interrupted();}
                }
            }//while
            if ( worker != null ) used.add(worker);
        }
        return (worker);
    }
    
    public int available() {
        return idle.size();
    }

    /**
     * Called by the worker thread to return itself to the
     * idle pool.
     */
    public void returnWorker (WorkerThread worker) {
        if ( running ) {
            synchronized (mutex) {
                used.remove(worker);
                //if ( idle.size() < minThreads && !idle.contains(worker)) idle.add(worker);
                if ( idle.size() < maxThreads && !idle.contains(worker)) idle.add(worker); //let max be the upper limit
                else {
                    worker.setDoRun(false);
                    synchronized (worker){worker.notify();}
                }
                mutex.notify();
            }
        }else {
            worker.setDoRun(false);
            synchronized (worker){worker.notify();}
        }
    }

    public int getMaxThreads() {
        return maxThreads;
    }

    public int getMinThreads() {
        return minThreads;
    }

    public void stop() {
        running = false;
        synchronized (mutex) {
            Iterator i = idle.iterator();
            while ( i.hasNext() ) {
                WorkerThread worker = (WorkerThread)i.next();
                returnWorker(worker);
                i.remove();
            }
        }
    }

    public void setMaxThreads(int maxThreads) {
        this.maxThreads = maxThreads;
    }

    public void setMinThreads(int minThreads) {
        this.minThreads = minThreads;
    }

    public ThreadCreator getThreadCreator() {
        return this.creator;
    }
    
    public static interface ThreadCreator {
        public WorkerThread getWorkerThread();
    }
}
 
在开启了nio模式后,tomcat6就会基于nio的方式启动,工作。
nio主线程启动,监听主要看这个类 
public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiver, ListenCallback
 
看关键部分代码
public void start() throws IOException {
    try {
    //            setPool(new ThreadPool(interestOpsMutex, getMaxThreads(),getMinThreads(),this));
        setPool(new ThreadPool(getMaxThreads(),getMinThreads(),this));
    } catch (Exception x) {
        log.fatal("ThreadPool can initilzed. Listener not started", x);
        if ( x instanceof IOException ) throw (IOException)x;
        else throw new IOException(x.getMessage());
    }
    try {
        getBind();
        bind();
        Thread t = new Thread(this, "NioReceiver");
        t.setDaemon(true);
        t.start();
    } catch (Exception x) {
        log.fatal("Unable to start cluster receiver", x);
        if ( x instanceof IOException ) throw (IOException)x;
        else throw new IOException(x.getMessage());
    }
}

protected void bind() throws IOException {
    // allocate an unbound server socket channel
    serverChannel = ServerSocketChannel.open();
    // Get the associated ServerSocket to bind it with
    ServerSocket serverSocket = serverChannel.socket();
    // create a new Selector for use below
    selector = Selector.open();
    // set the port the server channel will listen to
    //serverSocket.bind(new InetSocketAddress(getBind(), getTcpListenPort()));
    bind(serverSocket,getTcpListenPort(),getAutoBind());
    // set non-blocking mode for the listening socket
    serverChannel.configureBlocking(false);
    // register the ServerSocketChannel with the Selector
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    
}

protected void listen() throws Exception {
    if (doListen()) {
        log.warn("ServerSocketChannel already started");
        return;
    }
    
    setListen(true);

    while (doListen() && selector != null) {
        // this may block for a long time, upon return the
        // selected set contains keys of the ready channels
        try {
            events();
            socketTimeouts();
            int n = selector.select(getTcpSelectorTimeout());
            if (n == 0) {
                //there is a good chance that we got here
                //because the TcpReplicationThread called
                //selector wakeup().
                //if that happens, we must ensure that that
                //thread has enough time to call interestOps
//                    synchronized (interestOpsMutex) {
                    //if we got the lock, means there are no
                    //keys trying to register for the
                    //interestOps method
//                    }
                continue; // nothing to do
            }
            // get an iterator over the set of selected keys
            Iterator it = selector.selectedKeys().iterator();
            // look at each key in the selected set
            while (it.hasNext()) {
                SelectionKey key = (SelectionKey) it.next();
                // Is a new connection coming in?
                if (key.isAcceptable()) {
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
                    SocketChannel channel = server.accept();
                    channel.socket().setReceiveBufferSize(getRxBufSize());
                    channel.socket().setSendBufferSize(getTxBufSize());
                    channel.socket().setTcpNoDelay(getTcpNoDelay());
                    channel.socket().setKeepAlive(getSoKeepAlive());
                    channel.socket().setOOBInline(getOoBInline());
                    channel.socket().setReuseAddress(getSoReuseAddress());
                    channel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime());
                    channel.socket().setTrafficClass(getSoTrafficClass());
                    channel.socket().setSoTimeout(getTimeout());
                    Object attach = new ObjectReader(channel);
                    registerChannel(selector,
                                    channel,
                                    SelectionKey.OP_READ,
                                    attach);
                }
                // is there data to read on this channel?
                if (key.isReadable()) {
                    readDataFromSocket(key);
                } else {
                    key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
                }

                // remove key from selected set, it's been handled
                it.remove();
            }
        } catch (java.nio.channels.ClosedSelectorException cse) {
            // ignore is normal at shutdown or stop listen socket
        } catch (CancelledKeyException nx) {
            log.warn("Replication client disconnected, error when polling key. Ignoring client.");
        } catch (Throwable x) {
            try {
                log.error("Unable to process request in NioReceiver", x);
            }catch ( Throwable tx ) {
                //in case an out of memory error, will affect the logging framework as well
                tx.printStackTrace();
            }
        }

    }
    serverChannel.close();
    if (selector != null)
        selector.close();
}

protected void registerChannel(Selector selector,
                               SelectableChannel channel,
                               int ops,
                               Object attach) throws Exception {
    if (channel == null)return; // could happen
    // set the new channel non-blocking
    channel.configureBlocking(false);
    // register it with the selector
    channel.register(selector, ops, attach);
}

public void run() {
    try {
        listen();
    } catch (Exception x) {
        log.error("Unable to run replication listener.", x);
    }
}

protected void readDataFromSocket(SelectionKey key) throws Exception {
    NioReplicationThread worker = (NioReplicationThread) getPool().getWorker();
    if (worker == null) {
        // No threads available, do nothing, the selection
        // loop will keep calling this method until a
        // thread becomes available, the thread pool itself has a waiting mechanism
        // so we will not wait here.
        if (log.isDebugEnabled())
            log.debug("No TcpReplicationThread available");
    } else {
        // invoking this wakes up the worker thread then returns
        worker.serviceChannel(key);
    }
}
 
线程被启动,run执行,ServerSocketChannel和Selector被创建,绑定端口并且注册accept事件。while循环,调用selector的select方法。并且设定阻塞事件,这样既不会在没有请求需要处理的时候线程一直阻塞,也不会不停loop一直占用cpu。当请求到达,就会有acceptable的SelectionKey,主线程会循环处理完这些SelectionKey。获取SocketChannel并将其注册到selector,注册read事件。待后续select被调用会取出这些readable的SelectionKey。调用readDataFromSocket方法,从线程池获取worker来处理请求。
再看NioReplicationThread的实现
public class NioReplicationThread extends WorkerThread {
    
    private static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog( NioReplicationThread.class );
    
    private ByteBuffer buffer = null;
    private SelectionKey key;
    private int rxBufSize;
    private NioReceiver receiver;
    public NioReplicationThread (ListenCallback callback, NioReceiver receiver)
    {
        super(callback);
        this.receiver = receiver;
    }

    // loop forever waiting for work to do
    public synchronized void run() { 
        this.notify();
        if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER ) {
            buffer = ByteBuffer.allocateDirect(getRxBufSize());
        }else {
            buffer = ByteBuffer.allocate (getRxBufSize());
        }
        while (isDoRun()) {
            try {
                // sleep and release object lock
                this.wait();
            } catch (InterruptedException e) {
                if(log.isInfoEnabled()) log.info("TCP worker thread interrupted in cluster",e);
                // clear interrupt status
                Thread.interrupted();
            }
            if (key == null) {
                continue;	// just in case
            }
            if ( log.isTraceEnabled() ) 
                log.trace("Servicing key:"+key);

            try {
                ObjectReader reader = (ObjectReader)key.attachment();
                if ( reader == null ) {
                    if ( log.isTraceEnabled() ) 
                        log.trace("No object reader, cancelling:"+key);
                    cancelKey(key);
                } else {
                    if ( log.isTraceEnabled() ) 
                        log.trace("Draining channel:"+key);

                    drainChannel(key, reader);
                }
            } catch (Exception e) {
                //this is common, since the sockets on the other
                //end expire after a certain time.
                if ( e instanceof CancelledKeyException ) {
                    //do nothing
                } else if ( e instanceof IOException ) {
                    //dont spew out stack traces for IO exceptions unless debug is enabled.
                    if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].", e);
                    else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].");
                } else if ( log.isErrorEnabled() ) {
                    //this is a real error, log it.
                    log.error("Exception caught in TcpReplicationThread.drainChannel.",e);
                } 
                cancelKey(key);
            } finally {
                
            }
            key = null;
            // done, ready for more, return to pool
            getPool().returnWorker (this);
        }
    }

    /**
     * Called to initiate a unit of work by this worker thread
     * on the provided SelectionKey object.  This method is
     * synchronized, as is the run() method, so only one key
     * can be serviced at a given time.
     * Before waking the worker thread, and before returning
     * to the main selection loop, this key's interest set is
     * updated to remove OP_READ.  This will cause the selector
     * to ignore read-readiness for this channel while the
     * worker thread is servicing it.
     */
    public synchronized void serviceChannel (SelectionKey key) {
        if ( log.isTraceEnabled() ) 
            log.trace("About to service key:"+key);
        ObjectReader reader = (ObjectReader)key.attachment();
        if ( reader != null ) reader.setLastAccess(System.currentTimeMillis());
        this.key = key;
        key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
        key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE));
        this.notify();		// awaken the thread
    }

    /**
     * The actual code which drains the channel associated with
     * the given key.  This method assumes the key has been
     * modified prior to invocation to turn off selection
     * interest in OP_READ.  When this method completes it
     * re-enables OP_READ and calls wakeup() on the selector
     * so the selector will resume watching this channel.
     */
    protected void drainChannel (final SelectionKey key, ObjectReader reader) throws Exception {
        reader.setLastAccess(System.currentTimeMillis());
        reader.access();
        SocketChannel channel = (SocketChannel) key.channel();
        int count;
        buffer.clear();			// make buffer empty

        // loop while data available, channel is non-blocking
        while ((count = channel.read (buffer)) > 0) {
            buffer.flip();		// make buffer readable
            if ( buffer.hasArray() ) 
                reader.append(buffer.array(),0,count,false);
            else 
                reader.append(buffer,count,false);
            buffer.clear();		// make buffer empty
            //do we have at least one package?
            if ( reader.hasPackage() ) break;
        }

        int pkgcnt = reader.count();
        
        if (count < 0 && pkgcnt == 0 ) {
            //end of stream, and no more packages to process
            remoteEof(key);
            return;
        }
        
        ChannelMessage[] msgs = pkgcnt == 0? ChannelData.EMPTY_DATA_ARRAY : reader.execute();
        
        registerForRead(key,reader);//register to read new data, before we send it off to avoid dead locks
        
        for ( int i=0; i<msgs.length; i++ ) {
            /**
             * Use send ack here if you want to ack the request to the remote 
             * server before completing the request
             * This is considered an asynchronized request
             */
            if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
            try {
                if ( Logs.MESSAGES.isTraceEnabled() ) {
                    try {
                        Logs.MESSAGES.trace("NioReplicationThread - Received msg:" + new UniqueId(msgs[i].getUniqueId()) + " at " + new java.sql.Timestamp(System.currentTimeMillis()));
                    }catch ( Throwable t ) {}
                }
                //process the message
                getCallback().messageDataReceived(msgs[i]);
                /**
                 * Use send ack here if you want the request to complete on this 
                 * server before sending the ack to the remote server
                 * This is considered a synchronized request
                 */
                if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
            }catch ( RemoteProcessException e ) {
                if ( log.isDebugEnabled() ) log.error("Processing of cluster message failed.",e);
                if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
            }catch ( Exception e ) {
                log.error("Processing of cluster message failed.",e);
                if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
            }
            if ( getUseBufferPool() ) {
                BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
                msgs[i].setMessage(null);
            }
        }                        
        
        if (count < 0) {
            remoteEof(key);
            return;
        }
    }

    private void remoteEof(SelectionKey key) {
        // close channel on EOF, invalidates the key
        if ( log.isDebugEnabled() ) log.debug("Channel closed on the remote end, disconnecting");
        cancelKey(key);
    }

    protected void registerForRead(final SelectionKey key, ObjectReader reader) {
        if ( log.isTraceEnabled() ) 
            log.trace("Adding key for read event:"+key);
        reader.finish();
        //register our OP_READ interest
        Runnable r = new Runnable() {
            public void run() {
                try {
                    if (key.isValid()) {
                        // cycle the selector so this key is active again
                        key.selector().wakeup();
                        // resume interest in OP_READ, OP_WRITE
                        int resumeOps = key.interestOps() | SelectionKey.OP_READ;
                        key.interestOps(resumeOps);
                        if ( log.isTraceEnabled() ) 
                            log.trace("Registering key for read:"+key);
                    }
                } catch (CancelledKeyException ckx ) {
                    NioReceiver.cancelledKey(key);
                    if ( log.isTraceEnabled() ) 
                        log.trace("CKX Cancelling key:"+key);

                } catch (Exception x) {
                    log.error("Error registering key for read:"+key,x);
                }
            }
        };
        receiver.addEvent(r);
    }

    private void cancelKey(final SelectionKey key) {
        if ( log.isTraceEnabled() ) 
            log.trace("Adding key for cancel event:"+key);

        ObjectReader reader = (ObjectReader)key.attachment();
        if ( reader != null ) {
            reader.setCancelled(true);
            reader.finish();
        }
        Runnable cx = new Runnable() {
            public void run() {
                if ( log.isTraceEnabled() ) 
                    log.trace("Cancelling key:"+key);

                NioReceiver.cancelledKey(key);
            }
        };
        receiver.addEvent(cx);
    }
    
    



    /**
     * send a reply-acknowledgement (6,2,3)
     * @param key
     * @param channel
     */
    protected void sendAck(SelectionKey key, SocketChannel channel, byte[] command) {
        
        try {
            ByteBuffer buf = ByteBuffer.wrap(command);
            int total = 0;
            while ( total < command.length ) {
                total += channel.write(buf);
            }
            if (log.isTraceEnabled()) {
                log.trace("ACK sent to " + channel.socket().getPort());
            }
        } catch ( IOException x ) {
            log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage());
        }
    }

    public void setRxBufSize(int rxBufSize) {
        this.rxBufSize = rxBufSize;
    }

    public int getRxBufSize() {
        return rxBufSize;
    }
}
 
基本上就是获取SocketChannel并处理,和bio中处理socket逻辑差不多。
在这里tomcat是将自己封装的ObjectReader绑定到SelectionKey上,来分步处理这个有状态的ObjectReader,ObjectReader中维护了XByteBuffer,可以从维护的pool中获取XByteBuffer而不是直接创建。这一点和WorkThread中维护ByteBuffer一样,不用频繁的创建,减小内存开销。tomcat的处理过程和《Java NIO》这本书中的例子大体一致,只是tomcat的处理更加完善。
 
 
分享到:
评论

相关推荐

    node-v18.11.0-headers.tar.xz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    JavaScript_跨平台3D场景编辑器基于threejs golang和mongodb桌面和web.zip

    JavaScript

    JavaScript_如何编写跨平台Nodejs代码.zip

    JavaScript

    北邮大三物流工程物流信息系统课程设计

    北邮大三物流工程物流信息系统课程设计

    0520_1.mov

    0520_1.mov

    实现CAD基础绘图显示功能(C#源码)

    使用C#开发的,一款非常简单的二维CAD绘图程序。 支持多种图元:线段、构造线、射线、多段线、圆、圆弧、文字。 缩放和平移视图。 支持图层。 图元支持夹点,并且可以通过移动夹点来修改图元。 捕捉。目前支持的捕捉类型有:端点、中点、中心点、象限点。 基本的编辑操作:删除、复制、镜像、偏移、移动。 撤销和重做。 支持点选和框选来选择图元。

    aspectjweaver-1.7.4.jar

    作为AspectJ编译器的一部分,aspectj-weaver.jar主要有以下作用: 切面织入:aspectj-weaver.jar可以将定义好的切面织入到Java应用程序的字节码中,实现横切关注点的模块化aspectjweaver.jar是AspectJ编织器的主要库文件,它提供了AspectJ编织器的核心功能。它可以在编译时或运行时将AspectJ切面(aspects)编织到Java类中,实现面向切面

    JavaScript_使用Meteor构建的开源看板保持变量字段名camelCase对于翻译只添加Pull Request更改

    JavaScript

    JavaScript_JS中最强大的数据验证库.zip

    JavaScript

    node-v14.17.5-headers.tar.xz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    Android的移动应用下拉通知效果源码.rar

    Android的移动应用下拉通知效果源码.rar

    变电站呼吸器硅胶体破损

    变电站呼吸器硅胶体破损数据集,数据总共106张图片,标注为VOC格式

    JavaScript_Open Web Components指导开发Web组件的工具和库.zip

    JavaScript

    node-v16.19.1-headers.tar.xz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    JavaScript_每个人都可以在这里贡献.zip

    JavaScript

    移动应用Android 腾讯微博客户端源码.rar

    移动应用Android 腾讯微博客户端源码.rar

    da_1716184227697..apk.1.1.1

    da_1716184227697..apk.1.1.1

    node-v6.1.0-headers.tar.gz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    avalon-framework-api-4.3.1.jar

    avalon-framework-api-4.3.1.jar ======Avalon的组件模型一组联合的经典接口组成,用来定义容器和组件之间交换的物件。容器中的组件的需求使用和组件相关的元信息描述来表示。接口和缺省实现由Avalon ====== jeeplus需要用到的包

    固定资产管理系统.zip

    固定资产管理系统是对高校固定资产的一个信息化管理系统,基本功能包括:对固定资产的购进、接触、销毁,对物品的使用状态、借出状态、库存状态等进行标识,对各类物品进行编号,根据编号进行查询,根据名称进行查询等。本系统结构如下: (1)系统登录: 用户登录模块:登录功能 重置 (2)系统用户管理: 对系统用户的增加 系统用户的权限修改 系统用户的删除 分配系统用户的权限 修改本身登录密码 资产的相关维护 (3)员工信息管理: 教工的增加、修改、删除、查询 (4)资产入库管理: 资产的录入 资产的属性修改 资产的报废删除 资产的属性查询 (5)资产维护管理: 物资的维修、维护物资的信息修改 (7)资产借还管理: 增加借出资产 查询借出资产 归还已借出资产 (8)打印报表

Global site tag (gtag.js) - Google Analytics