ThreadPoolExecutor对任务的提交和异步执行已分析完毕,现在要补充一些关于它对线程池的管理,也就是对其工作线程的调度和回收.
还记得上一篇“ThreadPoolExecutor
execute 方法分析”最后一个关于任务异步执行的流程图,虽然分支庞杂,但只有两个条逻辑路径会增加工作线程加入到线程池:一是当前线程池的大小<核心线程池大小(即poolSize<corePoolSize),二是任务队列已满,并且poolSize <
maxPoolSize。参考“ThreadPoolExecutor execute 方法分析”最后的流程图,也就是图右半边两个执行步骤“将command加入核心线程池执行”和“尝试在最大线程池数量限制下执行”会增加新的工作线程。具体对应源代码://加入核心线程池执行
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}
//在最大核心线程池数量限制下执行
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < maximumPoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}
上面两方法主要是调用了私有方法addThread(Runnable
firstTask),它是ThreadPoolExecutor中唯一一个增加工作线程到线程池的方法。关于该方法在“ThreadPoolExecutor
execute 方法分析”中已分析,所以这里主要是分析下当增加工作线程的生命周期。当t.start()以后,Worker线程开始工作,请看Worker.run()方法:public void run() {
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}
看while循环:
第一轮循环,task必定不等于null,因为是根据提交的任务才新增工作线程的(addThread(firstTask)),可以认为这个firstTask是该线程的“份内工作”,当他执行完份内工作,结束第一轮循环;
后续循环,会执行“分外工作”,每次都会让getTask()去获取一个待执行的task,然后执行runTask(task);周而复始,这便是它的工作。看来得细究两个方法:runTask(task)和getTask()。
先看runTask(task):
private void runTask(Runnable task) {
final ReentrantLock runLock = this.runLock;
runLock.lock();
try {
if (runState < STOP &&
Thread.interrupted() &&
runState >= STOP)
thread.interrupt();
boolean ran = false;
beforeExecute(thread, task);
try {
task.run();
ran = true;
afterExecute(task, null);
++completedTasks;
} catch (RuntimeException ex) {
if (!ran)
afterExecute(task, ex);
throw ex;
}
} finally {
runLock.unlock();
}
}
分析3点:
1) 该方法内部是加锁的,锁的原理会到java.util.concurrent.locks中再写,这里只说这个锁的作用。 这个锁是来保护当前线程(也就是thread对象)的,换句话说,该工作线程是不是空闲、是不是
回收,都是通过这个锁来判断的。
2) 正常执行目标任务之前,设计了两个钩子,beforeExecute和afterExecute,这个可以通过重写
来做些需要的事情。
3) 在beforeExecute之前的if语句有点晦涩,它主要是:在当前ThreadPoolExecutor状态小于STOP之
前,清除该线程中断标识;假如恰好是由shutdownNow中断了线程,那么该线程会重新中断,
任务也就取消、不会再执行了。
再看getTask():
Runnable getTask() {
for (;;) {
try {
int state = runState;
if (state > SHUTDOWN)
return null;
Runnable r;
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
r = workQueue.take();
if (r != null)
return r;
if (workerCanExit()) {
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}
这是一个无限循环,跳出有两种可能:
1) 是当前状态是STOP或TERMINATE,则返回null,这和上一篇“ThreadPoolExecutor
execute 方法
分析”讲到的在这两种状态下任务队列不会再执行是不谋而合的;
2) 是在任务队列中取到了待执行的任务并返回给工作线程执行;
3) 是如果当前线程满足了可以结束的条件,返回null。
关于它从任务队列中取任务的逻辑也值得思考:
A) 如果当前已经SHUTDOWN了,那意味着任务队列中不会再加入新的任务,所以直
接workQueue.poll()即可,无需等待,任务没有也就没有了;
B) 否则再看当前线程池的大小,若不大于核心线程池大小,那该线程就不该回收,所以
用workQueue.take()进行阻塞获取任务,该线程就会一直阻塞在这里直到任务队列中由了
新任务并被他取到;
C) 倘若不巧当前线程池大小已经大于核心线程池大小,那该线程是“可以”回收的,只不过
不是直接回收,而是设计了两个标识提供给客户端进行选择:
allowCoreThreadTimeOut和keepAliveTime。只有在前者为true的情况下,线程才可以
回收,并且只回收空闲时间已经达到keepAliveTime值的那些线程。对应到代码即:
allowCoreThreadTimeOut为true时,
workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)等待获取。
小结,已经看到了工作线程生命周期结束部分了,满足三个条件时候工作线程回收:
1) 参数allowCoreThreadTimeOut为true
2) 该线程在keepAliveTime时间内获取不到任务,即空闲这么长时间
3) 当前线程池大小 > 核心线程池大小corePoolSize
当然这只是对当前正在执行的一个工作线程的分析,并不是线程池回收的全部逻辑,其他逻辑
还有:
a) 在每个工作线程每次取不到任务的时候,检查发现可以跳出工作,并且当前状态已经 >=
SHUTDOWN, 中断所有空闲的工作线程。在ThreadPoolExecutor中,一般的,中断工作线程意味
着,该线程可能抛出中断异常(如果该线程在获取任务中阻塞),也可能不受影响(如果该线
程正在执行任务)。这里当然是前一种,因为这里是中断空闲线程,空闲线程总是在获取任务
队列时候阻塞着,从上面代码看该异常catch住但不做任何处理;和前面呼应下,如何判断当前
线程是空闲的呢?tryLock() ^_^
b) 调用ThreadPoolExecutor.shutdown(),会做两件事情,一设置当前状态为SHUTDOWN,使任务队列不
再增加新任务,这样当忙碌的工作线程将任务执行完之后,通过步骤1) 回收;二是将所有空闲的
工作线程中断;
c) 调用ThreadPoolExecutor.shutdownNow(),不管三七二十一,将所有工作线程中断,并返回还没
完成的任务集;
中断工作线程就是将其回收,看代码,不管是不是有中断异常,最终都将执行方法workerDone(worker),该方法会将工作线程从线程池中remove掉,当全部remove以后,调用tryTerminate(),该方法就不写了,马上下班呵呵。
分享到:
相关推荐
在《阿里巴巴java开发手册》中指出了线程资源必须通过线程池提供,不允许在应用中自行显示的创建线程,这样一方面是线程的创建更加规范,可以合理控制开辟线程的数量;另一方面线程的细节管理交给线程池处理,优化了...
ThreadPoolExecutor使用和思考
1.资源简介:PyQt5中使用多线程模块QThread解决了PyQt5界面程序执行比较耗时操作时,程序卡顿出现的无响应以及界面输出无法实时...2.适用人群:想学习PyQt5中多线程模块QThread和线程池ThreadPoolExecutor知识的人。
ThreadPoolExecutor源码解析.pdf
ThreadPoolExecutor的使用和Android常见的4种线程池使用介绍
ThreadPoolExecutor源码解析.md
NULL 博文链接:https://bijian1013.iteye.com/blog/2284676
ThreadPoolExecutor线程池,有详尽介绍,本人进行过测试,可以使用
线程池的工作主要是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,那么超出数量的线程排队等候,等其他线程执行完毕再从队列中取出任务来执行。...
因为asyncio程序中的每个线程都有自己的事件循环,但它只会在主线程中为你自动创建一个事件循环。所以如果你asyncio.get_event_loop在主线程中调用一次,它将自动创建一个循环对象并将其设置为默认值,但是如果你在...
(转)线程池:java_util_ThreadPoolExecutor 比较详细的介绍了ThreadPoolExecutor用法与属性
线程池执行器 使用多线程ThreadPoolExecutor从Web加载图像
主要为大家详细介绍了ThreadPoolExecutor线程池的使用方法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
一个关于java 线程池的例子,也适合android
线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor...
介绍ThreadPoolExecutor中池和queue配合使用的机制
从Python3.2开始,标准库为我们提供了 concurrent.futures 模块,它提供了 ThreadPoolExecutor (线程池)和ProcessPoolExecutor (进程池)两个类。 相比 threading 等模块,该模块通过 submit 返回的是一个 future ...
线程池原理-ThreadPoolExecutor源码解析 1.构造方法及参数 2.阻塞对列: BlockingQueue 3.线程工厂: DefaultThreadFactory 4.拒绝策略: RejectedExecutionHandler 5.执行线程 Executor
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long