对于这2种io以及在socket的应用不作描述,主要探究一下在tomcat中是如何应用这2种io的。找了2个版本的tomcat做一下对比,以tomcat4、tomcat6为例。
1 .tomcat4
在tomcat4中,只有bio的使用。首先看这个类
public final class HttpConnector
implements Connector, Lifecycle, Runnable
它实现了Runnable接口以及Lifecycle接口,在tomcat中,实现Lifecycle接口的类都需要实现start和stop等接口,作为一种规范,在类被实例化后start方法会被显式调用;并且在启动和销毁时都会触发相应事件,这里不多说。
HttpConnector在被实例化后它的start方法会被调用
private Stack processors = new Stack();
public void start() throws LifecycleException {
// Validate and update our current state
if (started)
throw new LifecycleException
(sm.getString("httpConnector.alreadyStarted"));
threadName = "HttpConnector[" + port + "]";
lifecycle.fireLifecycleEvent(START_EVENT, null);
started = true;
// Start our background thread
threadStart();
// Create the specified minimum number of processors
while (curProcessors < minProcessors) {
if ((maxProcessors > 0) && (curProcessors >= maxProcessors))
break;
HttpProcessor processor = newProcessor();
recycle(processor);
}
}
private void threadStart() {
log(sm.getString("httpConnector.starting"));
thread = new Thread(this, threadName);
thread.setDaemon(true);
thread.start();
}
public void run() {
// Loop until we receive a shutdown command
while (!stopped) {
// Accept the next incoming connection from the server socket
Socket socket = null;
try {
// if (debug >= 3)
// log("run: Waiting on serverSocket.accept()");
socket = serverSocket.accept();
// if (debug >= 3)
// log("run: Returned from serverSocket.accept()");
if (connectionTimeout > 0)
socket.setSoTimeout(connectionTimeout);
socket.setTcpNoDelay(tcpNoDelay);
} catch (AccessControlException ace) {
log("socket accept security exception", ace);
continue;
} catch (IOException e) {
// if (debug >= 3)
// log("run: Accept returned IOException", e);
try {
// If reopening fails, exit
synchronized (threadSync) {
if (started && !stopped)
log("accept error: ", e);
if (!stopped) {
// if (debug >= 3)
// log("run: Closing server socket");
serverSocket.close();
// if (debug >= 3)
// log("run: Reopening server socket");
serverSocket = open();
}
}
// if (debug >= 3)
// log("run: IOException processing completed");
} catch (IOException ioe) {
log("socket reopen, io problem: ", ioe);
break;
} catch (KeyStoreException kse) {
log("socket reopen, keystore problem: ", kse);
break;
} catch (NoSuchAlgorithmException nsae) {
log("socket reopen, keystore algorithm problem: ", nsae);
break;
} catch (CertificateException ce) {
log("socket reopen, certificate problem: ", ce);
break;
} catch (UnrecoverableKeyException uke) {
log("socket reopen, unrecoverable key: ", uke);
break;
} catch (KeyManagementException kme) {
log("socket reopen, key management problem: ", kme);
break;
}
continue;
}
// Hand this socket off to an appropriate processor
HttpProcessor processor = createProcessor();
if (processor == null) {
try {
log(sm.getString("httpConnector.noProcessor"));
socket.close();
} catch (IOException e) {
;
}
continue;
}
// if (debug >= 3)
// log("run: Assigning socket to processor " + processor);
processor.assign(socket);
// The processor will recycle itself when it finishes
}
// Notify the threadStop() method that we have shut ourselves down
// if (debug >= 3)
// log("run: Notifying threadStop() that we have shut down");
synchronized (threadSync) {
threadSync.notifyAll();
}
}
调用start方法,线程被启动,在此之前会创建好ServerSocket,如果ServerSocket异常关闭会重新启动。接着run方法内会一直while循环,调用ServerSocket的accept从请求队列中获取套接字socket
private ServerSocket open()
throws IOException, KeyStoreException, NoSuchAlgorithmException,
CertificateException, UnrecoverableKeyException,
KeyManagementException
{
// Acquire the server socket factory for this Connector
ServerSocketFactory factory = getFactory();
// If no address is specified, open a connection on all addresses
if (address == null) {
log(sm.getString("httpConnector.allAddresses"));
try {
return (factory.createSocket(port, acceptCount));
} catch (BindException be) {
throw new BindException(be.getMessage() + ":" + port);
}
}
// Open a server socket on the specified address
try {
InetAddress is = InetAddress.getByName(address);
log(sm.getString("httpConnector.anAddress", address));
try {
return (factory.createSocket(port, acceptCount, is));
} catch (BindException be) {
throw new BindException(be.getMessage() + ":" + address +
":" + port);
}
} catch (Exception e) {
log(sm.getString("httpConnector.noAddress", address));
try {
return (factory.createSocket(port, acceptCount));
} catch (BindException be) {
throw new BindException(be.getMessage() + ":" + port);
}
}
}
在处理socket的时候,会开启线程异步执行,这样可以保证多线程同时处理多个请求,避免单线程只能处理一个请求而其他请求阻塞的情况。HttpConnector采用线程池的方式来实现线程的复用,用stack维护一定数量的HttpProcessor,在需要的时候从stack中pop线程处理请求
private HttpProcessor createProcessor() {
synchronized (processors) {
if (processors.size() > 0) {
// if (debug >= 2)
// log("createProcessor: Reusing existing processor");
return ((HttpProcessor) processors.pop());
}
if ((maxProcessors > 0) && (curProcessors < maxProcessors)) {
// if (debug >= 2)
// log("createProcessor: Creating new processor");
return (newProcessor());
} else {
if (maxProcessors < 0) {
// if (debug >= 2)
// log("createProcessor: Creating new processor");
return (newProcessor());
} else {
// if (debug >= 2)
// log("createProcessor: Cannot create new processor");
return (null);
}
}
}
}
再看处理请求的类
final class HttpProcessor
implements Lifecycle, Runnable
它也是实现了Runnable接口,run方法中一直执行while循环,只要有新的socket,就会调用process处理它。
synchronized void assign(Socket socket) {
// Wait for the Processor to get the previous Socket
while (available) {
try {
wait();
} catch (InterruptedException e) {
}
}
// Store the newly available Socket and notify our thread
this.socket = socket;
available = true;
notifyAll();
if ((debug >= 1) && (socket != null))
log(" An incoming request is being assigned");
}
public void run() {
// Process requests until we receive a shutdown signal
while (!stopped) {
// Wait for the next socket to be assigned
Socket socket = await();
if (socket == null)
continue;
// Process the request from this socket
try {
process(socket);
} catch (Throwable t) {
log("process.invoke", t);
}
// Finish up this request
connector.recycle(this);
}
// Tell threadStop() we have shut ourselves down successfully
synchronized (threadSync) {
threadSync.notifyAll();
}
}
当assign方法被调用,线程被唤醒,run方法中while循环执行,处理socket请求,当一个socket被处理完,调用recycle归还该线程至线程池。
void recycle(HttpProcessor processor) {
// if (debug >= 2)
// log("recycle: Recycling processor " + processor);
processors.push(processor);
}
可以看到,调用了Stack的push方法。
总结:tomcat4基于阻塞io的方法,使用基于Stack数据结构来维护线程池,可以开启多个线程异步同时处理多个并发请求。
2.tomcat6
tomcat6开始支持nio了,可以通过改conf/server.xml配置文件开启
<Connector port="8080" protocol="org.apache.coyote.http11.Http11NioProtocol"
connectionTimeout="20000"
redirectPort="8443" />
相关类在org.apache.catalina.tribes.transport包下。
默认是bio模式,实现方式虽然和tomcat4不一样,但是核心内容大差不差。相关类换成了
//ServerSocket启动,监听
public class BioReceiver extends ReceiverBase implements Runnable, ChannelReceiver, ListenCallback
//具体的处理socket类
public class BioReplicationThread extends WorkerThread
线程池的实现也和之前不同,维护了idle,used两个LinkedList,WorkerThread就在两个集合中来回切换。
package org.apache.catalina.tribes.transport;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
/**
* @author not attributable
* @version 1.0
*/
public class ThreadPool
{
/**
* A very simple thread pool class. The pool size is set at
* construction time and remains fixed. Threads are cycled
* through a FIFO idle queue.
*/
List idle = new LinkedList();
List used = new LinkedList();
Object mutex = new Object();
boolean running = true;
private static int counter = 1;
private int maxThreads;
private int minThreads;
private ThreadCreator creator = null;
private static synchronized int inc() {
return counter++;
}
public ThreadPool (int maxThreads, int minThreads, ThreadCreator creator) throws Exception {
// fill up the pool with worker threads
this.maxThreads = maxThreads;
this.minThreads = minThreads;
this.creator = creator;
//for (int i = 0; i < minThreads; i++) {
for (int i = 0; i < maxThreads; i++) { //temporary fix for thread hand off problem
WorkerThread thread = creator.getWorkerThread();
setupThread(thread);
idle.add (thread);
}
}
protected void setupThread(WorkerThread thread) {
synchronized (thread) {
thread.setPool(this);
thread.setName(thread.getClass().getName() + "[" + inc() + "]");
thread.setDaemon(true);
thread.setPriority(Thread.MAX_PRIORITY);
thread.start();
try {thread.wait(500); }catch ( InterruptedException x ) {}
}
}
/**
* Find an idle worker thread, if any. Could return null.
*/
public WorkerThread getWorker()
{
WorkerThread worker = null;
synchronized (mutex) {
while ( worker == null && running ) {
if (idle.size() > 0) {
try {
worker = (WorkerThread) idle.remove(0);
} catch (java.util.NoSuchElementException x) {
//this means that there are no available workers
worker = null;
}
} else if ( used.size() < this.maxThreads && creator != null) {
worker = creator.getWorkerThread();
setupThread(worker);
} else {
try { mutex.wait(); } catch ( InterruptedException x ) {Thread.currentThread().interrupted();}
}
}//while
if ( worker != null ) used.add(worker);
}
return (worker);
}
public int available() {
return idle.size();
}
/**
* Called by the worker thread to return itself to the
* idle pool.
*/
public void returnWorker (WorkerThread worker) {
if ( running ) {
synchronized (mutex) {
used.remove(worker);
//if ( idle.size() < minThreads && !idle.contains(worker)) idle.add(worker);
if ( idle.size() < maxThreads && !idle.contains(worker)) idle.add(worker); //let max be the upper limit
else {
worker.setDoRun(false);
synchronized (worker){worker.notify();}
}
mutex.notify();
}
}else {
worker.setDoRun(false);
synchronized (worker){worker.notify();}
}
}
public int getMaxThreads() {
return maxThreads;
}
public int getMinThreads() {
return minThreads;
}
public void stop() {
running = false;
synchronized (mutex) {
Iterator i = idle.iterator();
while ( i.hasNext() ) {
WorkerThread worker = (WorkerThread)i.next();
returnWorker(worker);
i.remove();
}
}
}
public void setMaxThreads(int maxThreads) {
this.maxThreads = maxThreads;
}
public void setMinThreads(int minThreads) {
this.minThreads = minThreads;
}
public ThreadCreator getThreadCreator() {
return this.creator;
}
public static interface ThreadCreator {
public WorkerThread getWorkerThread();
}
}
在开启了nio模式后,tomcat6就会基于nio的方式启动,工作。
nio主线程启动,监听主要看这个类
public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiver, ListenCallback
看关键部分代码
public void start() throws IOException {
try {
// setPool(new ThreadPool(interestOpsMutex, getMaxThreads(),getMinThreads(),this));
setPool(new ThreadPool(getMaxThreads(),getMinThreads(),this));
} catch (Exception x) {
log.fatal("ThreadPool can initilzed. Listener not started", x);
if ( x instanceof IOException ) throw (IOException)x;
else throw new IOException(x.getMessage());
}
try {
getBind();
bind();
Thread t = new Thread(this, "NioReceiver");
t.setDaemon(true);
t.start();
} catch (Exception x) {
log.fatal("Unable to start cluster receiver", x);
if ( x instanceof IOException ) throw (IOException)x;
else throw new IOException(x.getMessage());
}
}
protected void bind() throws IOException {
// allocate an unbound server socket channel
serverChannel = ServerSocketChannel.open();
// Get the associated ServerSocket to bind it with
ServerSocket serverSocket = serverChannel.socket();
// create a new Selector for use below
selector = Selector.open();
// set the port the server channel will listen to
//serverSocket.bind(new InetSocketAddress(getBind(), getTcpListenPort()));
bind(serverSocket,getTcpListenPort(),getAutoBind());
// set non-blocking mode for the listening socket
serverChannel.configureBlocking(false);
// register the ServerSocketChannel with the Selector
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}
protected void listen() throws Exception {
if (doListen()) {
log.warn("ServerSocketChannel already started");
return;
}
setListen(true);
while (doListen() && selector != null) {
// this may block for a long time, upon return the
// selected set contains keys of the ready channels
try {
events();
socketTimeouts();
int n = selector.select(getTcpSelectorTimeout());
if (n == 0) {
//there is a good chance that we got here
//because the TcpReplicationThread called
//selector wakeup().
//if that happens, we must ensure that that
//thread has enough time to call interestOps
// synchronized (interestOpsMutex) {
//if we got the lock, means there are no
//keys trying to register for the
//interestOps method
// }
continue; // nothing to do
}
// get an iterator over the set of selected keys
Iterator it = selector.selectedKeys().iterator();
// look at each key in the selected set
while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
// Is a new connection coming in?
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
channel.socket().setReceiveBufferSize(getRxBufSize());
channel.socket().setSendBufferSize(getTxBufSize());
channel.socket().setTcpNoDelay(getTcpNoDelay());
channel.socket().setKeepAlive(getSoKeepAlive());
channel.socket().setOOBInline(getOoBInline());
channel.socket().setReuseAddress(getSoReuseAddress());
channel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime());
channel.socket().setTrafficClass(getSoTrafficClass());
channel.socket().setSoTimeout(getTimeout());
Object attach = new ObjectReader(channel);
registerChannel(selector,
channel,
SelectionKey.OP_READ,
attach);
}
// is there data to read on this channel?
if (key.isReadable()) {
readDataFromSocket(key);
} else {
key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
}
// remove key from selected set, it's been handled
it.remove();
}
} catch (java.nio.channels.ClosedSelectorException cse) {
// ignore is normal at shutdown or stop listen socket
} catch (CancelledKeyException nx) {
log.warn("Replication client disconnected, error when polling key. Ignoring client.");
} catch (Throwable x) {
try {
log.error("Unable to process request in NioReceiver", x);
}catch ( Throwable tx ) {
//in case an out of memory error, will affect the logging framework as well
tx.printStackTrace();
}
}
}
serverChannel.close();
if (selector != null)
selector.close();
}
protected void registerChannel(Selector selector,
SelectableChannel channel,
int ops,
Object attach) throws Exception {
if (channel == null)return; // could happen
// set the new channel non-blocking
channel.configureBlocking(false);
// register it with the selector
channel.register(selector, ops, attach);
}
public void run() {
try {
listen();
} catch (Exception x) {
log.error("Unable to run replication listener.", x);
}
}
protected void readDataFromSocket(SelectionKey key) throws Exception {
NioReplicationThread worker = (NioReplicationThread) getPool().getWorker();
if (worker == null) {
// No threads available, do nothing, the selection
// loop will keep calling this method until a
// thread becomes available, the thread pool itself has a waiting mechanism
// so we will not wait here.
if (log.isDebugEnabled())
log.debug("No TcpReplicationThread available");
} else {
// invoking this wakes up the worker thread then returns
worker.serviceChannel(key);
}
}
线程被启动,run执行,ServerSocketChannel和Selector被创建,绑定端口并且注册accept事件。while循环,调用selector的select方法。并且设定阻塞事件,这样既不会在没有请求需要处理的时候线程一直阻塞,也不会不停loop一直占用cpu。当请求到达,就会有acceptable的SelectionKey,主线程会循环处理完这些SelectionKey。获取SocketChannel并将其注册到selector,注册read事件。待后续select被调用会取出这些readable的SelectionKey。调用readDataFromSocket方法,从线程池获取worker来处理请求。
再看NioReplicationThread的实现
public class NioReplicationThread extends WorkerThread {
private static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog( NioReplicationThread.class );
private ByteBuffer buffer = null;
private SelectionKey key;
private int rxBufSize;
private NioReceiver receiver;
public NioReplicationThread (ListenCallback callback, NioReceiver receiver)
{
super(callback);
this.receiver = receiver;
}
// loop forever waiting for work to do
public synchronized void run() {
this.notify();
if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER ) {
buffer = ByteBuffer.allocateDirect(getRxBufSize());
}else {
buffer = ByteBuffer.allocate (getRxBufSize());
}
while (isDoRun()) {
try {
// sleep and release object lock
this.wait();
} catch (InterruptedException e) {
if(log.isInfoEnabled()) log.info("TCP worker thread interrupted in cluster",e);
// clear interrupt status
Thread.interrupted();
}
if (key == null) {
continue; // just in case
}
if ( log.isTraceEnabled() )
log.trace("Servicing key:"+key);
try {
ObjectReader reader = (ObjectReader)key.attachment();
if ( reader == null ) {
if ( log.isTraceEnabled() )
log.trace("No object reader, cancelling:"+key);
cancelKey(key);
} else {
if ( log.isTraceEnabled() )
log.trace("Draining channel:"+key);
drainChannel(key, reader);
}
} catch (Exception e) {
//this is common, since the sockets on the other
//end expire after a certain time.
if ( e instanceof CancelledKeyException ) {
//do nothing
} else if ( e instanceof IOException ) {
//dont spew out stack traces for IO exceptions unless debug is enabled.
if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].", e);
else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].");
} else if ( log.isErrorEnabled() ) {
//this is a real error, log it.
log.error("Exception caught in TcpReplicationThread.drainChannel.",e);
}
cancelKey(key);
} finally {
}
key = null;
// done, ready for more, return to pool
getPool().returnWorker (this);
}
}
/**
* Called to initiate a unit of work by this worker thread
* on the provided SelectionKey object. This method is
* synchronized, as is the run() method, so only one key
* can be serviced at a given time.
* Before waking the worker thread, and before returning
* to the main selection loop, this key's interest set is
* updated to remove OP_READ. This will cause the selector
* to ignore read-readiness for this channel while the
* worker thread is servicing it.
*/
public synchronized void serviceChannel (SelectionKey key) {
if ( log.isTraceEnabled() )
log.trace("About to service key:"+key);
ObjectReader reader = (ObjectReader)key.attachment();
if ( reader != null ) reader.setLastAccess(System.currentTimeMillis());
this.key = key;
key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE));
this.notify(); // awaken the thread
}
/**
* The actual code which drains the channel associated with
* the given key. This method assumes the key has been
* modified prior to invocation to turn off selection
* interest in OP_READ. When this method completes it
* re-enables OP_READ and calls wakeup() on the selector
* so the selector will resume watching this channel.
*/
protected void drainChannel (final SelectionKey key, ObjectReader reader) throws Exception {
reader.setLastAccess(System.currentTimeMillis());
reader.access();
SocketChannel channel = (SocketChannel) key.channel();
int count;
buffer.clear(); // make buffer empty
// loop while data available, channel is non-blocking
while ((count = channel.read (buffer)) > 0) {
buffer.flip(); // make buffer readable
if ( buffer.hasArray() )
reader.append(buffer.array(),0,count,false);
else
reader.append(buffer,count,false);
buffer.clear(); // make buffer empty
//do we have at least one package?
if ( reader.hasPackage() ) break;
}
int pkgcnt = reader.count();
if (count < 0 && pkgcnt == 0 ) {
//end of stream, and no more packages to process
remoteEof(key);
return;
}
ChannelMessage[] msgs = pkgcnt == 0? ChannelData.EMPTY_DATA_ARRAY : reader.execute();
registerForRead(key,reader);//register to read new data, before we send it off to avoid dead locks
for ( int i=0; i<msgs.length; i++ ) {
/**
* Use send ack here if you want to ack the request to the remote
* server before completing the request
* This is considered an asynchronized request
*/
if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
try {
if ( Logs.MESSAGES.isTraceEnabled() ) {
try {
Logs.MESSAGES.trace("NioReplicationThread - Received msg:" + new UniqueId(msgs[i].getUniqueId()) + " at " + new java.sql.Timestamp(System.currentTimeMillis()));
}catch ( Throwable t ) {}
}
//process the message
getCallback().messageDataReceived(msgs[i]);
/**
* Use send ack here if you want the request to complete on this
* server before sending the ack to the remote server
* This is considered a synchronized request
*/
if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
}catch ( RemoteProcessException e ) {
if ( log.isDebugEnabled() ) log.error("Processing of cluster message failed.",e);
if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
}catch ( Exception e ) {
log.error("Processing of cluster message failed.",e);
if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
}
if ( getUseBufferPool() ) {
BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
msgs[i].setMessage(null);
}
}
if (count < 0) {
remoteEof(key);
return;
}
}
private void remoteEof(SelectionKey key) {
// close channel on EOF, invalidates the key
if ( log.isDebugEnabled() ) log.debug("Channel closed on the remote end, disconnecting");
cancelKey(key);
}
protected void registerForRead(final SelectionKey key, ObjectReader reader) {
if ( log.isTraceEnabled() )
log.trace("Adding key for read event:"+key);
reader.finish();
//register our OP_READ interest
Runnable r = new Runnable() {
public void run() {
try {
if (key.isValid()) {
// cycle the selector so this key is active again
key.selector().wakeup();
// resume interest in OP_READ, OP_WRITE
int resumeOps = key.interestOps() | SelectionKey.OP_READ;
key.interestOps(resumeOps);
if ( log.isTraceEnabled() )
log.trace("Registering key for read:"+key);
}
} catch (CancelledKeyException ckx ) {
NioReceiver.cancelledKey(key);
if ( log.isTraceEnabled() )
log.trace("CKX Cancelling key:"+key);
} catch (Exception x) {
log.error("Error registering key for read:"+key,x);
}
}
};
receiver.addEvent(r);
}
private void cancelKey(final SelectionKey key) {
if ( log.isTraceEnabled() )
log.trace("Adding key for cancel event:"+key);
ObjectReader reader = (ObjectReader)key.attachment();
if ( reader != null ) {
reader.setCancelled(true);
reader.finish();
}
Runnable cx = new Runnable() {
public void run() {
if ( log.isTraceEnabled() )
log.trace("Cancelling key:"+key);
NioReceiver.cancelledKey(key);
}
};
receiver.addEvent(cx);
}
/**
* send a reply-acknowledgement (6,2,3)
* @param key
* @param channel
*/
protected void sendAck(SelectionKey key, SocketChannel channel, byte[] command) {
try {
ByteBuffer buf = ByteBuffer.wrap(command);
int total = 0;
while ( total < command.length ) {
total += channel.write(buf);
}
if (log.isTraceEnabled()) {
log.trace("ACK sent to " + channel.socket().getPort());
}
} catch ( IOException x ) {
log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage());
}
}
public void setRxBufSize(int rxBufSize) {
this.rxBufSize = rxBufSize;
}
public int getRxBufSize() {
return rxBufSize;
}
}
基本上就是获取SocketChannel并处理,和bio中处理socket逻辑差不多。
在这里tomcat是将自己封装的ObjectReader绑定到SelectionKey上,来分步处理这个有状态的ObjectReader,ObjectReader中维护了XByteBuffer,可以从维护的pool中获取XByteBuffer而不是直接创建。这一点和WorkThread中维护ByteBuffer一样,不用频繁的创建,减小内存开销。tomcat的处理过程和《Java NIO》这本书中的例子大体一致,只是tomcat的处理更加完善。
分享到:
相关推荐
Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。
JavaScript
JavaScript
北邮大三物流工程物流信息系统课程设计
0520_1.mov
使用C#开发的,一款非常简单的二维CAD绘图程序。 支持多种图元:线段、构造线、射线、多段线、圆、圆弧、文字。 缩放和平移视图。 支持图层。 图元支持夹点,并且可以通过移动夹点来修改图元。 捕捉。目前支持的捕捉类型有:端点、中点、中心点、象限点。 基本的编辑操作:删除、复制、镜像、偏移、移动。 撤销和重做。 支持点选和框选来选择图元。
作为AspectJ编译器的一部分,aspectj-weaver.jar主要有以下作用: 切面织入:aspectj-weaver.jar可以将定义好的切面织入到Java应用程序的字节码中,实现横切关注点的模块化aspectjweaver.jar是AspectJ编织器的主要库文件,它提供了AspectJ编织器的核心功能。它可以在编译时或运行时将AspectJ切面(aspects)编织到Java类中,实现面向切面
JavaScript
JavaScript
Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。
Android的移动应用下拉通知效果源码.rar
变电站呼吸器硅胶体破损数据集,数据总共106张图片,标注为VOC格式
JavaScript
Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。
JavaScript
移动应用Android 腾讯微博客户端源码.rar
da_1716184227697..apk.1.1.1
Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。
avalon-framework-api-4.3.1.jar ======Avalon的组件模型一组联合的经典接口组成,用来定义容器和组件之间交换的物件。容器中的组件的需求使用和组件相关的元信息描述来表示。接口和缺省实现由Avalon ====== jeeplus需要用到的包
固定资产管理系统是对高校固定资产的一个信息化管理系统,基本功能包括:对固定资产的购进、接触、销毁,对物品的使用状态、借出状态、库存状态等进行标识,对各类物品进行编号,根据编号进行查询,根据名称进行查询等。本系统结构如下: (1)系统登录: 用户登录模块:登录功能 重置 (2)系统用户管理: 对系统用户的增加 系统用户的权限修改 系统用户的删除 分配系统用户的权限 修改本身登录密码 资产的相关维护 (3)员工信息管理: 教工的增加、修改、删除、查询 (4)资产入库管理: 资产的录入 资产的属性修改 资产的报废删除 资产的属性查询 (5)资产维护管理: 物资的维修、维护物资的信息修改 (7)资产借还管理: 增加借出资产 查询借出资产 归还已借出资产 (8)打印报表