`

Tomcat接收请求

 
阅读更多

 

1. 启动Tomcat后, 在哪里接收Request?

      启动Tomcat时,部署完webapp 后,就会启动connector ,启动这个连接器,也就意味着会启动一个线程来接收请求,

   具体涉及的类:

    Http11Protocol

     org.apach.tomcat.uitl.net ,

    JIoEndPoint , WorkStack (pool), Acceptor(Runnable) 

 

  首先,启动Http11Protocol会调用JIoEndpoint中的start();JIO会new监听器线程,监控连接;

 public void start()
        throws Exception {
        // Initialize socket if not done before
        if (!initialized) {
            init();
        }
        if (!running) {
            running = true;
            paused = false;

            // Create worker collection
            if (executor == null) {
                workers = new WorkerStack(maxThreads);
            }

            // Start acceptor threads
            for (int i = 0; i < acceptorThreadCount; i++) {
                Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i);
                acceptorThread.setPriority(threadPriority);
                acceptorThread.setDaemon(daemon);
                acceptorThread.start();
            }
        }
    }

 Acceptor(实现Runnable接口) 的 run() :启动http11Protocol ,就会阻塞在acceptSocket()上,直到有连接传入,得到Socket , 传递给Processor

   

 /**
         * The background thread that listens for incoming TCP/IP connections and
         * hands them off to an appropriate processor.
         * 监控连接,并把他们交给processor处理
         */
        public void run() {
            // Loop until we receive a shutdown command
            // 只要不是shutdown 命令 他就一直在运行 等候请求传入
            while (running) {
                // Loop if endpoint is paused
                while (paused) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }
                // Accept the next incoming connection from the server socket
                try { // 阻塞在这个地方 直到有连接传入
                    Socket socket = serverSocketFactory.acceptSocket(serverSocket);
                    serverSocketFactory.initSocket(socket);
                    // Hand this socket off to an appropriate processor
                    if (!processSocket(socket)) { // 处理socket
                        // Close socket right away
                        try {
                            socket.close();
                        } catch (IOException e) {
                            // Ignore
                        }
                    }
                }catch ( IOException x ) {
                    if ( running ) log.error(sm.getString("endpoint.accept.fail"), x);
                } catch (Throwable t) {
                    log.error(sm.getString("endpoint.accept.fail"), t);
                }
                // The processor will recycle itself when it finishes
            }
        }
    }

 processSocket() : 取得线程,处理Socket

 /**
     * Process given socket.
     */
    protected boolean processSocket(Socket socket) {
        try {
            if (executor == null) { // exector是线程池
                getWorkerThread().assign(socket); 
            } else {
                executor.execute(new SocketProcessor(socket));// SocketProcess 实现了Runnable接口 ,process处理!
            }
        } catch (Throwable t) {
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            log.error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;
    }
    

   可能你会举得到这里,如果不通过线程池处理,那么是怎么把request传递给servlet的了?

   通过分析下面的源码,好像getWorkThread().assign(socket) 并没有调用处理socket的方法?

    注意createWorkThread()中的“newWorkerThread() ” , 通过这个方法,我们会创建一个线程,并且会启动,Work类的源代码 它是实现了Runnable接口的!

   所以,总结Tomcat接收一个请求的过程 :

     Tomcat启动,会启动一个Acceptor线程来监听请求 , 请求传入了时,得到相应的Socket,然后通过workStack 或者 Executor的方式,产生一个新的线程来处理,

     在通过workStack 产生线程到处理的过程 : getWorkerThread() 调用createWorkerThread() 调用 newWorkerThread会产生一个Work thread , 并且start,但是

     因为socket = await() , avaliable = false , 会阻塞,通过调用 assign() , 使得available = true 把最新的socket传递进去, run()方法运行,处理socket , 这就是assign()的作用。

     但是 , 为什么要这么做了,直接new 出线程,然后run就是,就像executor一样!为什么这中间要添加这些额外的操作了?

       为了保证线程得到的是最新的socket!!

 newWorkerThread() :

 

 /**
     * Create and return a new processor suitable for processing HTTP
     * requests and returning the corresponding responses.
     */
    protected Worker newWorkerThread() {

        Worker workerThread = new Worker();
        workerThread.start(); //注意这里!!!!
        return (workerThread);

    }

  

getWrokThread() : // 不通过线程池的方式取得线程,将线程放在一个workStack中

  

 /**
     * Return a new worker thread, and block while to worker is available.
     */
    protected Worker getWorkerThread() {
        // Allocate a new worker thread
        synchronized (workers) { // 注意同步
            Worker workerThread;
            while ((workerThread = createWorkerThread()) == null) {
                try {
                    workers.wait(); // 如果不能取的线程,wait,因为你在server.xml中配置了maxThread;
                } catch (InterruptedException e) {
                    // Ignore
                }
            }
            return workerThread;
        }
    }

  

Work内部类: // 一个处理rquest的线程就是一个worker!

   里面两个特别重要的方法,assign()  和 await() 都是同步

   assgin 传递最新的socket!

   而 await 则是在等候最新的socket!

 protected class Worker implements Runnable {
        protected Thread thread = null;
        protected boolean available = false;
        protected Socket socket = null;
        // 将availabe 置为 true , 并唤醒所有等待的线程我知道了,assign的作用就是把最新的socket传递进来
        synchronized void assign(Socket socket) {
            // Wait for the Processor to get the previous Socket
            while (available) {
                try {
                    wait();
                } catch (InterruptedException e) {
                }
            }
            // Store the newly available Socket and notify our thread
            this.socket = socket;
            available = true;
            notifyAll();
        }
// 等候最新的socket
        private synchronized Socket await() {
            // Wait for the Connector to provide a new Socket
            while (!available) {
                try {
                    wait();
                } catch (InterruptedException e) {
                }
            }
            // Notify the Connector that we have received this Socket
            Socket socket = this.socket;
            available = false;  // 将available置为false!
            notifyAll();
            return (socket);
        }

// run方法
        public void run() {
            // Process requests until we receive a shutdown signal
            while (running) {
                // Wait for the next socket to be assigned
                Socket socket = await(); // 调用这个方法时,除非调用assign()方法使的available为true,否则就会一直等待!而且现在锁已经加上来了,其他线程是不能够进来了!
                if (socket == null) //
                    continue;
                // Process the request from this socket
                if (!setSocketOptions(socket) || !handler.process(socket)) { // 这两个方法跟线程池处理的一样
                    // Close socket
                    try {
                        socket.close();
                    } catch (IOException e) {
                    }
                }
                // Finish up this request
                socket = null;
                recycleWorkerThread(this);
            }
        }

        public void start() {
            thread = new Thread(this); 
            thread.setName(getName() + "-" + (++curThreads));
            thread.setDaemon(true);
            thread.start();
        }
    }

  

createWorkThread() ; 创建一个新的线程,调用newWorkerThread() 启动线程

/**
     * Create (or allocate) and return an available processor for use in
     * processing a specific HTTP request, if possible.  If the maximum
     * allowed processors have already been created and are in use, return
     * <code>null</code> instead.
     */
    protected Worker createWorkerThread() {

        synchronized (workers) {
            if (workers.size() > 0) { // 线程栈,可用的线程
                curThreadsBusy++;
                return workers.pop();
            }
            if ((maxThreads > 0) && (curThreads < maxThreads)) {
                curThreadsBusy++;
                if (curThreadsBusy == maxThreads) {
                    log.info(sm.getString("endpoint.info.maxThreads",
                            Integer.toString(maxThreads), address,
                            Integer.toString(port)));
                }
                return (newWorkerThread()); // 注意这个方法!!
            } else {
                if (maxThreads < 0) {
                    curThreadsBusy++;
                    return (newWorkerThread());
                } else {
                    return (null);
                }
            }
        }

    }

 

 

 SocketProcess的Run() :// 处理socket ,

public void run() {
            // Process the request from this socket
            if (!setSocketOptions(socket) || !handler.process(socket)) { // handler 处理socket 通过这个方法就会将request传递给servlet
                // Close socket
                try {
                    socket.close();
                } catch (IOException e) {
                }
            }
            // Finish up this request
            socket = null;
        }

  2. 接收请求后,是怎么传递给相应的servlet处理的?

     通过调用org.apache.coyote.http11.http11Protocol.process(socket) 方法!!

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics