`
沉沦的快乐
  • 浏览: 55812 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

并发编程之线程池ThreadPoolExecutor原理探究

阅读更多

        当前越来越多的系统使用多线程来处理任务,但是为每一个任务创建线程并不是合理的方案,原因有2点:一是创建线程的开销很大,一个任务一个线程的方式会有性能上的损失;二是可能导致线程数量的膨胀,不但不易于线程的管理,还可能导致内存被消耗完,导致out of memory(OOM),从而使系统崩溃。为了解决这个问题,线程池应运而生。线程池有两个作用:一个是限制线程的数量,不会导致线程的膨胀;二是线程复用,线程执行完一个人任务之后,可以接着执行下一个任务,减少了创建线程的开销。

       java中一个运用非常普遍的线程池是ThreadPoolExecutor。下面来探究下ThreadPoolExecutor的功能和实现原理。

 

ThreadPoolExecutor的功能

 

       1.自定义线程池的核心线程数和最大线程数。如果当前池中的线程数小于核心线程数,则直接为任务创建新线程来执行,如果前池中的线程数大于核心线程数,则把任务放入任务队列中,等待线程池中已有的线程去执行。如果任务队列满了,但是池中的线程数小于最大线程数,则创建新线程执行任务。如果任务队列满了,池中的线程数等于最大线程数,那么执行拒绝任务策略。

      2.可配置拒绝任务策略,ThreadPoolExecutor自带了四种拒绝策略:丢弃当前将要加入队列的任务本身(DiscardPolicy),丢弃任务队列中最旧任务(DiscardOldestPolicy),抛出异常的方式(AbortPolicy),任务交由调用者线程去执行(CallerRunsPolicy),除了自带的策略之外,用户还可以自定义策略。

     3.线程声明周期管理。如果线程空闲时间超过了配置的时间keepAliveTime,则线程将被销毁。

     4.配置线程工厂,用户可以自定义创建线程的工厂。

    5.配置阻塞队列类型。

    6.线程池生命周期管理。可以强制shutdown线程池,也可以优雅shutdown线程池。

     为了实现上面的配置管理。ThreadPoolExecutor提供了不同的创建线程池的构造方法,用户可以根据自身实际情况选择。

 

ThreadPoolExecutor实现原理

 ThreadPoolExecutor的属性

属性名 属性说明
volatile int runState

runState主要提供了生命周期的控制,下面是主要的状态:

RUNNING:0。接收新任务以及处理队列中的任务

SHUTDOWN:1。不再接收新任务,但是处理队列中的任务

STOP:2。不再接收新任务,也不处理队列中的任务,同时中断正在执行的任务

TERMINATED:3。跟STOP相同,同时所有的线程都终止了。

BlockingQueue<Runnable> workQueue 任务队列
ReentrantLock mainLock 为poolSize, corePoolSize,maximumPoolSize, runState, and workers属性的set提供同步。
HashSet<Worker> workers 保存线程池中所有的工作线程,只有获得mainLock锁才能访问
volatile long  keepAliveTime 空闲线程的最大存活时间
volatile boolean allowCoreThreadTimeOut 核心线程是否也支持最大存活时间管理
volatile int corePoolSize 线程池核心线程数
volatile int   maximumPoolSize 线程池最大线程数
volatile int   poolSize 线程池当前线程数
int largestPoolSize 线程池峰值线程数
long completedTaskCount 线程池总共处理的任务数
volatile RejectedExecutionHandler handler 任务拒绝策略
volatile ThreadFactory threadFactory 创建线程工厂

      ThreadPoolExecutor的属性,基本上大部分都是构造函数中可配置的,也说明了ThreadPoolExecutor的灵活性。不过通过上面的表大家可能会有点疑惑:怎么没有保存Thread对象集合的属性?不要急,大家应该发现了里面有个HashSet<Worker> workers属性。这个集合里Worker对象是ThreadPoolExecutor定义的一个内部类,它包含了thread对象。现在我们来看下Worker对象包含的属性。

 

Worker对象的属性

属性名称 属性说明
inal ReentrantLock runLock 这个锁的作用是保护取消worker线程的中断,而不是中断正在执行的任务。
Runnable firstTask 由于线程池创建现在的时候都是为某个任务创建,所以该属性就是记录该刚线程创建时执行的任务
long completedTasks 这个线程执行的任务数
Thread thread 本worker运行的线程
volatile boolean hasRun 本worker对象运行的线程是否执行过该worker的run方法。只有hasRun为true时worker的线程才能被中断。

 

ThreadPoolExecutor的任务处理流程

       前面介绍了ThreadPoolExecutor的属性以及需要用到的内部类,素材有了,那么下面来看看是如何来把素材加工成成品的吧。在用户创建完线程池之后,需要把任务提交给线程池,线程池提供了submit和execute方法来提交任务,而submit方法最终还是调用的execute方法,它只是把任务封装成futuretask,以便获得任务的返回值。对于没有返回值的任务直接用execute提交就可以了,如果有返回值的任务,用submit提交更好。所以提交任务的核心还是execute方法。现在就来看看execute的实现代码:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
    }

        这段代码的主要逻辑如下:

       1.如果当选线程数大于等于核心线程数,则直接把任务放到任务队列里,等待已有的线程去执行它。如果当前选线程数小于核心线程数,则为该任务创建新的线程去执行它,这个的功能的实现方法是addIfUnderCorePoolSize(command)。addIfUnderCorePoolSize(command)方法的代码如下:

 private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < corePoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        return t != null;
    }

       可以发现,这段源码是如果发现小于corePoolSize就会调用addThread()方法创建一个新的线程,并且调用线程的start()方法将线程运行起来。只有没有创建成功Thread才会返回false,也就是当当前的poolSize > corePoolSize的时候,或线程池已经不是在running状态的时候才会出现。execute对poolSize和corePoolSize的比较只是粗略判断,而addIfUnderCorePoolSize()内部是加锁后判定的,以得到更为准确的结果,而外部初步判定如果是大于了,就没有必要进入这段有锁的代码了。

 

       2.如果addIfUnderCorePoolSize返回false,说明没有为任务创建线程(原因可能是线程池不是RUNNING状态,或者poolsize大于corepoolsize了)。则需要把任务存放到任务队列中。

       3.在任务放到队列之前,先初步判断下此时线程池的状态。如果是running才接受新任务,否则addIfUnderMaximumPoolSize方法精确线程池状态。

       4.如果任务可以添加到任务队列,则判调用队列的offer方法,往队列末尾加入任务。由于队列是一个自定义的阻塞队列,可以是有界也可以是无界的。如果加入队列成功,还有先判断下runState != RUNNING || poolSize == 0。前面判断了状态之后为什么还要判断呢?这是因为有时间差,状态随时可以发生改变。记住了这一点在看这样一堆状态判断就不会难以理解了。好了,如果线程池不是RUNNING状态或线程池里没有线程了,则执行ensureQueuedTaskHandled方法处理任务。ensureQueuedTaskHandled的代码如下:

private void ensureQueuedTaskHandled(Runnable command) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        boolean reject = false;
        Thread t = null;
        try {
            int state = runState;
            if (state != RUNNING && workQueue.remove(command))
                reject = true;
            else if (state < STOP &&
                     poolSize < Math.max(corePoolSize, 1) &&
                     !workQueue.isEmpty())
                t = addThread(null);
        } finally {
            mainLock.unlock();
        }
        if (reject)
            reject(command);
    }

    这段代码是处理拒绝任务的。这里也会加锁来锁定当前的状态和工作队列。如果状态确实不等于running,则把任务从任务列表中移除并执行拒绝策略。如果任务remove失败,并且当前状态为running和shutdown状态,任务队列不为空,并且poolSize小于Math.max(corePoolSize, 1)。则调用addThread为线程池创建一个新线程。但是这个任务并没有直接给新线程执行。为什么要判断poolSize小于Math.max(corePoolSize, 1),因为corePoolSize可以设置为0.当corePoolSize=0时,需要至少有1个线程去执行任务。前面的几个方法中出现了几次创建addThread的方法,现在来看看这个方法做了哪些事情:

 

private Thread addThread(Runnable firstTask) {
        Worker w = new Worker(firstTask);
        Thread t = threadFactory.newThread(w);
        boolean workerStarted = false;
        if (t != null) {
            if (t.isAlive()) // precheck that t is startable
                throw new IllegalThreadStateException();
            w.thread = t;
            workers.add(w);
            int nt = ++poolSize;
            if (nt > largestPoolSize)
                largestPoolSize = nt;
            try {
                t.start();
                workerStarted = true;
            }
            finally {
                if (!workerStarted)
                    workers.remove(w);
            }
        }
        return t;
    }

       这个方法的参数名firstTask可能比较难理解。这里详细说明一下:首先在线程池中,一个新线程的创建大多数情况都是为执行某个任务而创建的,这个任务不会加入任务队列,而是通过firstTask传给为他而建的新线程去执行。所以这个任务也就是这个线程执行的第一个任务。如果firstTask设为null,那么线程将去执行任务队列中的任务。下面来分析这个方法的功能,首先先创建一个worker对象,把firstTask初始化这个worker对象。然后通过线程工厂创建一个线程,并检查这个线程的状态,同时跟新线程池的峰值线程数的值。需要注意的是,这个线程不是属于某个具体任务的,而是属于这个worker的,即该线程不是执行某个任务的run,而是执行这个worker的run。最后把worker对象添加到worker队列里面。所以发到这里可以明白了ThreadPoolExecutor为什么没有thread的集合属性了。

 

      5.第4点阐述了任务加入队列成功的情况,但是如果队列满了加入队列也可能失败。这时候会去尝试创建新线程来执行该任务。即执行addIfUnderMaximumPoolSize方法。这个方法与addIfUnderCorePoolSize基本一致,只是后者是拿poolSize跟corePoolSize比较,而前者是poolSize跟maximumPoolSize比较。如果addIfUnderMaximumPoolSize方法为任务创建线程失败,则执行拒绝策略来处理这个任务。

      到目前为止,前面讲的5个步骤将了一个任务提交给线程池之后是如何处理的。但是细心的用户可能发现,里面缺失了非常重要的一个功能:任务被添加到任务队列之后是如何被线程池处理掉的?线程处理完它的首个任务之后是如何获取新任务的呢?线程池是不是有类似Timer一样的守护进程不断扫描线程队列和等待队列?还是利用某种锁机制,实现类似wait和notify实现的? 别急。下面来揭开它的神秘面纱。

       前面提到了ThreadPoolExecutor的内部类Worker,也在介绍addThread方法的时候提到了线程池的线程是和Worker对象绑定在一起的。所以现在来看看Worker类做了什么事情?通过代码发现Worker的定义也是一个Runnable。addthread方法中调用了这个Worker的start()方法,也就是线程的启动方法,其实也就是调用了Worker的run()方法。现在来看看worker的run方法做了什么事情:

 

public void run() {
            try {
                hasRun = true;
                Runnable task = firstTask;
                firstTask = null;
                while (task != null || (task = getTask()) != null) {
                    runTask(task);
                    task = null;
                }
            } finally {
                workerDone(this);
            }
        }
    }
         woker的run方法主要是通过while循环不断调用getTask()方法去获取任务。然后执行runTask(task)方法来执行任务,最后调用workerDone()方法来执行一些清除操作。

 

         runTask(task)其实做的事情很简单:它的核心就是调用任务的run方法来执行真正的用户任务,除此之外还执行了任务执行前后需要的一些操作,以及统计一下这个worker完成的任务数。这个方法不需要深究,代码也比较简单:

 

private void runTask(Runnable task) {
            final ReentrantLock runLock = this.runLock;
            runLock.lock();
            try {
                if ((runState >= STOP ||
                    (Thread.interrupted() && runState >= STOP)) &&
                    hasRun)
                    thread.interrupt();
                boolean ran = false;
                beforeExecute(thread, task);
                try {
                    task.run();
                    ran = true;
                    afterExecute(task, null);
                    ++completedTasks;
                } catch (RuntimeException ex) {
                    if (!ran)
                        afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                runLock.unlock();
            }
        }
          worker的润方法真正的核心是如果不断获取任务的。所以这里必须认真解读下getTask()方法,下面是getTask()的代码:

 

 

Runnable getTask() {
        for (;;) {
            try {
                int state = runState;
                if (state > SHUTDOWN)
                    return null;
                Runnable r;
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                else
                    r = workQueue.take();
                if (r != null)
                    return r;
                if (workerCanExit()) {
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    return null;
                }
                // Else retry
            } catch (InterruptedException ie) {
                // On interruption, re-check runState
            }
        }
    }
        你会发现getTask()方法是从workQueue队列中,也就是等待队列中获取一个任务出来并返回!如果没有获得任务,则通过 interruptIdleWorkers()方法去关闭空闲时间超过阈值的空闲线程。

 

       至此,完整的ThreadPoolExecutor线程池处理任务的原理就解读完毕了。其他的一些诸如关闭线程池和获取线程池的状态和统计信息等的接口都比较简单,这里就不一一解释了。

 

 ThreadPoolExecutor引申出来的几种常见线程池

 

1.newSingleThreadExecutor。

     只有一个线程的线程池,即corePoolSize和maximumPoolSize都等于1。

2.newCachedThreadPool

    创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。调用 execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。在newCachedThreadPool构造参数中,corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,即可与无限制的创建线程。但是它使用的是阻塞队列是SynchronousQueue。这个队列比较奇葩,虽然他是无界的,但是里面只能有一个元素。在添加一个任务的时候,必须要有一个线程正在等待一个任务。即通过这个阻塞队列,既可以保证任务能够马上得到线程去运行,同时又能重用已有的空闲线程。

3.newFixedThreadPool

   线程数固定的线程池,即corePoolSize=maximumPoolSize。当线程数达到了corePoolSize时,不能创建新的线程了,所以新的任务只能放到任务队列中,因此这个线程池用的阻塞队列是无界队列LinkedBlockingQueue。

4.ScheduledThreadPoolExecutor

  可以执行延迟固定时间的任务,也可以执行定时任务的线程池。ScheduledThreadPoolExecutor的底层不是基于ThreadPoolExecutor实现的,它有一个自己的实现类。下次详细介绍下ScheduledThreadPoolExecutor的实现原理。

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics