`
yychao
  • 浏览: 97778 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

ThreadPoolExecutor分析

 
阅读更多

ThreadPoolExecutor 状态:

 

RUNNING:接受新任务,并处理队列的任务

SHUTDOWN:不接受新任务,但是处理等待队列的任务

STOP:不接受新任务,不处理等待队列的任务,并且中断正在执行的任务

TERMINATED:和STOP相同,并附加所有线程已终结

 

状态转换:

 

RUNNING --> SHUTDOWN :  调用shutdown(),或者再finalize()方法中隐式调用shutdown();

 

RUNNING/SHUTDOWN --> STOP : 调用shutdownNow()

 

SHUTDOWN --> TERMINATED : 队列和线程池都为空

 

STOP --> TERMINATED : 线程池为空

 

ThreadPoolExecutor任务执行步骤:

 


 

ThreadPoolExecutor 关闭操作:

 

shutdown(): 修改runStatus=SHUTDOWN,并中断空闲线程(等待getTask返回)并退出;若没有任何线程或任务,则调用tryTerminate尝试关闭线程池;否则,最后一个worker线程关系线程池

 

步骤:

 

  1. SecurityManager检查是否有关闭线程权限
  2. 如果通过第一步,检测SecurityManager对每个线程是否特殊对待,通过检查后,修改runStatus=SHUTDOWN
  3. 如果1、2都通过但interrupt()线程时抛出SecurityException,此时,需要恢复runStatus状态;一些线程再non-shutdown状态被关闭,此时可能通过tryTerminate启动一个worker线程

PS:shutdown通过interruptIfIdle()终止空闲线程

 

shutdownNow():

 

与shutdown()不同点:

 

  1. runStatus设置为STOP
  2. 所有的工作线程被中断,而非空闲线程
  3. drain等待队列,并返回任务列表

Worker线程执行:

 

        public void run() {
            try {
                Runnable task = firstTask;
                firstTask = null;
                while (task != null || (task = getTask()) != null) {
                    runTask(task);
                    task = null;
                }
            } finally {
                workerDone(this);
            }
        }
    }

worker线程执行步骤:

 

  1. worker执行第一个提交任务,or从任务队列中获取任务(取得任务or等待任务返回)
  2. 执行任务,执行完毕后;重新获取任务
  3. 从getTask()返回null,线程终结;执行workerDone清理线程

任务获取:

 

    Runnable getTask() {
        for (;;) {
            try {
                int state = runState;
                if (state > SHUTDOWN)
                    return null;
                Runnable r;
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                else
                    r = workQueue.take();
                if (r != null)
                    return r;
                if (workerCanExit()) {
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    return null;
                }
                // Else retry
            } catch (InterruptedException ie) {
                // On interruption, re-check runState
            }
        }
    }
 

任务获取步骤:

 

  1. runStatus > SHUTDOWN时,返回null;终止线程
  2. runStatus == SHUTDOWN,workQueue.poll()返回任务队列任务
  3. poolSize > corePoolSize || allowCoreThreadTimeOut时,workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),等待任务keepAliveTime时间
  4. 否则,等待任务队列workQueue.take()

如果上述2、3、4返回结果不为null,则返回任务;

否则判断线程时候可以退出:

 

    private boolean workerCanExit() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        boolean canExit;
        try {
            canExit = runState >= STOP ||
                workQueue.isEmpty() ||
                (allowCoreThreadTimeOut &&
                 poolSize > Math.max(1, corePoolSize));
        } finally {
            mainLock.unlock();
        }
        return canExit;
    }

 任务获取退出:

 

  • runState >=STOP 或者 workQueue.isEmpty() 或者(allowCoreThreadTimeOut &&poolSize > Math.max(1, corePoolSize))
  • 当workerCanExit()为true 且runState >= SHUTDOWN时,唤醒空闲线程interruptIdleWorkers
  • 最后返回null 工作线程退出

线程执行任务:

 

 

        private void runTask(Runnable task) {
            final ReentrantLock runLock = this.runLock;
            runLock.lock();
            try {
                /*
                 * Ensure that unless pool is stopping, this thread
                 * does not have its interrupt set. This requires a
                 * double-check of state in case the interrupt was
                 * cleared concurrently with a shutdownNow -- if so,
                 * the interrupt is re-enabled.
                 */
                if (runState < STOP &&
                    Thread.interrupted() &&
                    runState >= STOP)
                    thread.interrupt();
                /*
                 * Track execution state to ensure that afterExecute
                 * is called only if task completed or threw
                 * exception. Otherwise, the caught runtime exception
                 * will have been thrown by afterExecute itself, in
                 * which case we don't want to call it again.
                 */
                boolean ran = false;
                beforeExecute(thread, task);
                try {
                    task.run();
                    ran = true;
                    afterExecute(task, null);
                    ++completedTasks;
                } catch (RuntimeException ex) {
                    if (!ran)
                        afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                runLock.unlock();
            }
        }
 

任务执行之前、之后回调方法:beforeExecute、afterExecute,方便用户扩展

 

线程退出清理工作:

 

 

    void workerDone(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
            if (--poolSize == 0)
                tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

 线程退出处理步骤:

 

  1. 收集已完成任务数
  2. 线程缓存队列清理工作线程
  3. poolSize ==0时,tryTerminate()尝试关闭线程池
线程池终止操作:
    private void tryTerminate() {
        if (poolSize == 0) {
            int state = runState;
            if (state < STOP && !workQueue.isEmpty()) {
                state = RUNNING; // disable termination check below
                Thread t = addThread(null);
                if (t != null)
                    t.start();
            }
            if (state == STOP || state == SHUTDOWN) {
                runState = TERMINATED;
                termination.signalAll();
                terminated();
            }
        }
    }
 线程池终止操作 在三个点调用:
  1. 最后一个线程退出的workerDone()中调用
  2. 直接调用shutdown(),且没有存活线程时
  3. 或者直接调用shutdownNow(),且没有存活线程时
线程池终止操作步骤:

  1. state < STOP && !workQueue.isEmpty(),设置state=RUNNING(关闭线程池终止操作),并新增一个活动线程,保证任务队列处理
  2. state == STOP || state == SHUTDOWN时,转换runState=TERMINATED;并回调terminated()方法

terminated()方法作为回调接口,可以扩展此接口,再线程池终止时,扩展操作

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics