`

Tomcat的Socket实现:org.apache.tomcat.util.net(一)

    博客分类:
  • J2SE
阅读更多
org.apache.tomcat.util.net包的内容都与网络连接和socket有关,比较主要和常见的是JIOEndpoint这个类,前面提到Coyote连接器的时候,就有涉及到JIOEndpoint,它用于监听某个socket端口、将socket对象交给coyote,并提供基本的线程池功能。除了JIOEndpoint,还有AprEndpoint、NioEndpoint等。由于对apr和nio不熟悉,所以只研究了一下JIOEndpoint

org.apache.tomcat.util.net.JIoEndpoint
JIOEndpoint其实和我们本科时上计算机网络或者分布式系统,做实验写的socket服务器差不多,结构也是经典的“Listen-Accept-Handle”,这里简单描述一下:JIOEndpoint使用JDK的ServerSocket类监听某个端口,有socket连接进来的时候便返回一个socket对象,交给专门的处理器。当然,具体的实现没那么简单,下面会按照socket的处理过程,详细说明其中的机理。

初始化
public void init()
        throws Exception {

        if (initialized)
            return;
        // Initialize thread count defaults for acceptor
        if (acceptorThreadCount == 0) {
            acceptorThreadCount = 1;
        }
        if (serverSocketFactory == null) {
            serverSocketFactory = ServerSocketFactory.getDefault();
        }
        if (serverSocket == null) {
            try {
                if (address == null) {
                    serverSocket = serverSocketFactory.createSocket(port, backlog);
                } else {
                    serverSocket = serverSocketFactory.createSocket(port, backlog, address);
                }
            } catch (BindException be) {
                throw new BindException(be.getMessage() + ":" + port);
            }
        }
        //if( serverTimeout >= 0 )
        //    serverSocket.setSoTimeout( serverTimeout );
        initialized = true;
    }

在这里,利用serverSocketFactory新建了一个serverSocket对象,用于监听特定的端口

启动JIOEndpoint
// Create worker collection
            if (executor == null) {
                workers = new WorkerStack(maxThreads);
            }

            // Start acceptor threads
            for (int i = 0; i < acceptorThreadCount; i++) {
                Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i);
                acceptorThread.setPriority(threadPriority);
                acceptorThread.setDaemon(daemon);
                acceptorThread.start();
            }

这里有几个类:worker,workerStack,Acceptor。这些都是JIOEndpoint的一些内部类,下面按照处理顺序依次讲述

Acceptor内部类
Acceptor实现了Runnable接口,只有一个方法run,做的事情就是通过ServerSocket.accept方法,得到socket,然后调用JIOEndpoint的processSocket方法

protected boolean processSocket(Socket socket) {
        try {
            if (executor == null) {
                getWorkerThread().assign(socket);
            } else {
                executor.execute(new SocketProcessor(socket));
            }
        } catch (Throwable t) {
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            log.error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;
    }

在这里,JIOEndpoint有两种处理socket的方式:使用JDK5的executor,或者内部的worker类。executor怎么用大家可以直接翻书了,我们继续讨论第二种方法。

首先我们要通过getWorkerThread()方法,得到一个Worker对象。具体逻辑是,看看WorkerStack(存放所有worker的一个堆栈)里面有没有空余的worker,有则直接拿来用,无则看看能不能新建一个worker线程,假如不能(比如超出了最大线程数限制),则返回null,那样就只能委屈一下这个acceptor,稍微等一下了(wait()方法),直到有新的worker可用时,通过notify方法唤醒等待的acceptor

     /**
     * Return a new worker thread, and block while to worker is available.
     */
    protected Worker getWorkerThread() {
        // Allocate a new worker thread
        Worker workerThread = createWorkerThread();
        while (workerThread == null) {
            try {
                synchronized (workers) {
                    workers.wait();
                }
            } catch (InterruptedException e) {
                // Ignore
            }
            workerThread = createWorkerThread();
        }
        return workerThread;
    }

如下,当有worker被回收后,通知等待的acceptor

protected void recycleWorkerThread(Worker workerThread) {
        synchronized (workers) {
            workers.push(workerThread);
            curThreadsBusy--;
            workers.notify();
        }
    }



ok,回到前面的processSocket方法,得到worker后,通过worker.assign方法,将socket对象传递给worker

Worker内部类
Worker也实现了runnable接口,有三个方法:assign、await、start和run

所谓的start方法,就是new一个Thread对象,把worker自己传进去,我们知道这个thread就开始执行run方法了。

public void run() {

            // Process requests until we receive a shutdown signal
            while (running) {

                // Wait for the next socket to be assigned
                Socket socket = await();
                if (socket == null)
                    continue;

                // Process the request from this socket
                if (!setSocketOptions(socket) || !handler.process(socket)) {
                    // Close socket
                    try {
                        socket.close();
                    } catch (IOException e) {
                    }
                }

                // Finish up this request
                socket = null;
                recycleWorkerThread(this);

            }

        }

run方法首先调用await方法

private synchronized Socket await() {

            // Wait for the Connector to provide a new Socket
            while (!available) {
                try {
                    wait();
                } catch (InterruptedException e) {
                }
            }

            // Notify the Connector that we have received this Socket
            Socket socket = this.socket;
            available = false;
           notifyAll();

            return (socket);

        }

通过标记位available,如果当前的worker是“非available”的,则线程会开始等待。直到我们调用的assign方法,把一个可用的socket给worker后,才会notifyall,唤醒一个线程,进而取得assign过来的socekt

synchronized void assign(Socket socket) {

            // Wait for the Processor to get the previous Socket
            while (available) {
                try {
                    wait();
                } catch (InterruptedException e) {
                }
            }

            // Store the newly available Socket and notify our thread
            this.socket = socket;
            available = true;
            notifyAll();

        }

所以,assign和await方法相当于生产者和消费者方法,两者通过available进行互斥,而this.socket则相当于被竞争的资源

现在,又回到Worker.run方法。在取得socket后,通过setSocketOptions(socket)方法设置socket的相关选项(例如超时值),最后通过handler.process(socket),终于把socket这个接力棒交给coyote了!

handler是什么?就是个简单的接口,如下:

/**
     * Bare bones interface used for socket processing. Per thread data is to be
     * stored in the ThreadWithAttributes extra folders, or alternately in
     * thread local fields.
     */
    public interface Handler {
        public boolean process(Socket socket);
    }

回顾一下org.apache.coyote.http11.Http11Protocol 的Http11ConnectionHandler内部类,实现的正是这个接口

分享到:
评论

相关推荐

    Apache服务器错误问题

    Apache服务器错误问题 Apache服务器错误问题 Apache服务器错误问题

    java head space.txt

    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1468) at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) at java.util.concurrent....

    struts2驱动包

    at org.apache.catalina.util.LifecycleSupport.fireLifecycleEvent(LifecycleSupport.java:117) at org.apache.catalina.core.ContainerBase.start(ContainerBase.java:1053) at org.apache.catalina.core....

    解决struts2下载异常的jar包 struts2-sunspoter-stream-1.0.jar

    588) at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:489) at java.lang.Thread.run(Thread.java:662) 网络解决办法: (虽然该办法可行,但是本人并不提倡。具体原因在之后解释。...

    SocketInputStream.java

    * $Header: /home/cvs/jakarta-tomcat-4.0/catalina/src/share/org/apache/catalina/util/StringManager.java,v 1.2 2001/07/22 20:25:14 pier Exp $ * $Revision: 1.2 $ * $Date: 2001/07/22 20:25:14 $ * * =...

    apr-1.4.6.tar.bz2

    安装Apache的时候,为什么要安装apr和apr-util呢 要测APR给tomcat带来的好处最好的方法是在慢速网络上(模拟Internet),将Tomcat线程数开到300以上的水平,然后模拟一大堆并发请求。如果不配APR,基本上300个线程狠...

    JAVA上百实例源码以及开源项目

    一个简单的CS模式的聊天软件,用socket实现,比较简单。 凯撒加密解密程序 1个目标文件 1、程序结构化,用函数分别实现 2、对文件的加密,解密输出到文件 利用随机函数抽取幸运数字 简单 EJB的真实世界模型(源代码...

    JAVA上百实例源码以及开源项目源代码

    一个简单的CS模式的聊天软件,用socket实现,比较简单。 凯撒加密解密程序 1个目标文件 1、程序结构化,用函数分别实现 2、对文件的加密,解密输出到文件 利用随机函数抽取幸运数字 简单 EJB的真实世界模型(源代码...

    java面试题,180多页,绝对良心制作,欢迎点评,涵盖各种知识点,排版优美,阅读舒心

    【多线程】简述synchronized 和java.util.concurrent.locks.Lock的异同? 90 【线程】ThreadLocal的作用 90 【Spring】什么是IOC和DI?DI是如何实现的 91 【Spring】spring中的IOC(控制反转)的原理 92 【Spring】...

Global site tag (gtag.js) - Google Analytics