ExecutorService
abstraction has been around since Java 5. We are talking about 2004 here. Just a quick reminder: both Java 5 and 6 are no longer supported, Java 7won't be in half a year. The reason I'm bringing this up is that many Java programmers still don't fully understand how ExecutorService
works. There are many places to learn that, today I wanted to share few lesser known features and practices. However this article is still aimed toward intermediate programmers, nothing especially advanced.
1. Name pool threads
I can't emphasize this. When dumping threads of a running JVM or during debugging, default thread pool naming scheme is pool-N-thread-M
, where N
stands for pool sequence number (every time you create a new thread pool, global N
counter is incremented) and M
is a thread sequence number within a pool. For example pool-2-thread-3
means third thread in second pool created in the JVM lifecycle. See:Executors.defaultThreadFactory()
. Not very descriptive. JDK makes it slightly complex to properly name threads because naming strategy is hidden insideThreadFactory
. Luckily Guava has a helper class for that:
1
2
3
4
5
6
7
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat( "Orders-%d" )
.setDaemon( true )
.build();
final ExecutorService executorService = Executors.newFixedThreadPool( 10 , threadFactory);
|
By default thread pools create non-daemon threads, decide whether this suits you or not.
2. Switch names according to context
This is a trick I learnt from Supercharged jstack: How to Debug Your Servers at 100mph. Once we remember about thread names, we can actually change them at runtime whenever we want! It makes sense because thread dumps show classes and method names, not parameters and local variables. By adjusting thread name to keep some essential transaction identifier we can easily track which message/record/query/etc. is slow or caused deadlock. Example:
1
2
3
4
5
6
7
8
9
10
11
12
|
private void process(String messageId) {
executorService.submit(() -> {
final Thread currentThread = Thread.currentThread();
final String oldName = currentThread.getName();
currentThread.setName( "Processing-" + messageId);
try {
//real logic here...
} finally {
currentThread.setName(oldName);
}
});
} |
Inside try
-finally
block current thread is named Processing-WHATEVER-MESSAGE-ID-IS
. This might come in handy when tracking down message flow through the system.
3. Explicit and safe shutdown
Between client threads and thread pool there is a queue of tasks. When your application shuts down, you must take care of two things: what is happening with queued tasks and how already running tasks are behaving (more on that later). Surprisingly many developers are not shutting down thread pool properly or consciously. There are two techniques: either let all queued tasks to execute (shutdown()
) or drop them (shutdownNow()
) - it totally depends on your use case. For example if we submitted a bunch of tasks and want to return as soon as all of them are done, use shutdown()
:
1
2
3
4
5
6
7
8
|
private void sendAllEmails(List<String> emails) throws InterruptedException {
emails.forEach(email ->
executorService.submit(() ->
sendEmail(email)));
executorService.shutdown();
final boolean done = executorService.awaitTermination( 1 , TimeUnit.MINUTES);
log.debug( "All e-mails were sent so far? {}" , done);
} |
In this case we send a bunch of e-mails, each as a separate task in a thread pool. After submitting these tasks we shut down pool so that it no longer accepts any new tasks. Then we wait at most one minute until all these tasks are completed. However if some tasks are still pending, awaitTermination()
will simply return false
. Moreover, pending tasks will continue processing. I know hipsters would go for:
1
|
emails.parallelStream().forEach( this ::sendEmail);
|
Call me old fashioned, but I like to control the number of parallel threads. Never mind, an alternative to graceful shutdown()
is shutdownNow()
:
1
2
|
final List<Runnable> rejected = executorService.shutdownNow();
log.debug( "Rejected tasks: {}" , rejected.size());
|
This time all queued tasks are discarded and returned. Already running jobs are allowed to continue.
4. Handle interruption with care
Lesser known feature of Future
interface is cancelling. Rather than repeating myself, check out my older article: InterruptedException and interrupting threads explained
5. Monitor queue length and keep it bounded
Incorrectly sized thread pools may cause slowness, instability and memory leaks. If you configure too few threads, the queue will build up, consuming a lot of memory. Too many threads on the other hand will slow down the whole system due to excessive context switches - and lead to same symptoms. It's important to look at depth of queue and keep it bounded, so that overloaded thread pool simply rejects new tasks temporarily:
1
2
3
4
|
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>( 100 );
executorService = new ThreadPoolExecutor(n, n,
0L, TimeUnit.MILLISECONDS,
queue);
|
Code above is equivalent to Executors.newFixedThreadPool(n)
, however instead of default unlimited LinkedBlockingQueue
we use ArrayBlockingQueue
with fixed capacity of 100
. This means that if 100 tasks are already queued (and n
being executed), new task will be rejected with RejectedExecutionException
. Also sincequeue
is now available externally, we can periodically call size()
and put it in logs/JMX/whatever monitoring mechanism you use.
6. Remember about exception handling
What will be the result of the following snippet?
1
2
3
|
executorService.submit(() -> { System.out.println( 1 / 0 );
}); |
I got bitten by that too many times: it won't print anything. No sign ofjava.lang.ArithmeticException: / by zero
, nothing. Thread pool just swallows this exception, as if it never happened. If it was a good'ol java.lang.Thread
created from scratch, UncaughtExceptionHandler
could work. But with thread pools you must be more careful. If you are submitting Runnable
(without any result, like above), youmust surround whole body with try
-catch
and at least log it. If you are submittingCallable<Integer>
, ensure you always dereference it using blocking get()
to re-throw exception:
1
2
3
|
final Future<Integer> division = executorService.submit(() -> 1 / 0 );
//below will throw ExecutionException caused by ArithmeticException division.get(); |
Interestingly even Spring framework made this bug with @Async
, see: SPR-8995 andSPR-12090.
7. Monitor waiting time in a queue
Monitoring work queue depth is one side. However when troubleshooting single transaction/task it's worthwhile to see how much time passed between submitting task and actual execution. This duration should preferably be close to 0 (when there was some idle thread in a pool), however it will grow when task has to be queued. Moreover if pool doesn't have a fixed number of threads, running new task might require spawning thread, also consuming short amount of time. In order to cleanly monitor this metric, wrap original ExecutorService
with something similar to this:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
public class WaitTimeMonitoringExecutorService implements ExecutorService {
private final ExecutorService target;
public WaitTimeMonitoringExecutorService(ExecutorService target) {
this .target = target;
}
@Override
public <T> Future<T> submit(Callable<T> task) {
final long startTime = System.currentTimeMillis();
return target.submit(() -> {
final long queueDuration = System.currentTimeMillis() - startTime;
log.debug( "Task {} spent {}ms in queue" , task, queueDuration);
return task.call();
}
);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return submit(() -> {
task.run();
return result;
});
}
@Override
public Future<?> submit(Runnable task) {
return submit( new Callable<Void>() {
@Override
public Void call() throws Exception {
task.run();
return null ;
}
});
}
//...
} |
This is not a complete implementation, but you get the basic idea. The moment we submit a task to a thread pool, we immediately start measuring time. We stop as soon as task was picked up and begins execution. Don't be fooled by close proximity ofstartTime
and queueDuration
in source code. In fact these two lines are evaluated in different threads, probably milliseconds or even seconds apart, e.g.:
1
|
Task com.nurkiewicz.MyTask @7c7f3894 spent 9883ms in queue
|
8. Preserve client stack trace
Reactive programming seems to get a lot of attention these days. Reactive manifesto,reactive streams, RxJava (just released 1.0!), Clojure agents, scala.rx... They all work great, but stack trace are no longer your friend, they are at most useless. Take for example an exception happening in a task submitted to thread pool:
1
2
3
4
5
6
7
|
java.lang.NullPointerException: null at com.nurkiewicz.MyTask.call(Main.java:76) ~[classes/:na]
at com.nurkiewicz.MyTask.call(Main.java:72) ~[classes/:na]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0]
at java.lang.Thread.run(Thread.java:744) ~[na:1.8.0]
|
We can easily discover that MyTask
threw NPE at line 76. But we have no idea who submitted this task, because stack trace reveals only Thread
andThreadPoolExecutor
. We can technically navigate through the source code in hope to find just one place where MyTask
is created. But without threads (not to mention event-drivent, reactive, actor-ninja-programming) we would immediately see full picture. What if we could preserve stack trace of client code (the one which submitted task) and show it, e.g. in case of failure? The idea isn't new, for example Hazelcastpropagates exceptions from owner node to client code. This is how naïve support for keeping client stack trace in case of failure could look:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
public class ExecutorServiceWithClientTrace implements ExecutorService {
protected final ExecutorService target;
public ExecutorServiceWithClientTrace(ExecutorService target) {
this .target = target;
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return target.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));
}
private <T> Callable<T> wrap( final Callable<T> task, final Exception clientStack, String clientThreadName) {
return () -> {
try {
return task.call();
} catch (Exception e) {
log.error( "Exception {} in task submitted from thrad {} here:" , e, clientThreadName, clientStack);
throw e;
}
};
}
private Exception clientTrace() {
return new Exception( "Client stack trace" );
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return tasks.stream().map( this ::submit).collect(toList());
}
//...
} |
This time in case of failure we will retrieve full stack trace and thread name of a place where task was submitted. Much more valuable compared to standard exception seen earlier:
1
2
3
4
5
6
7
8
9
10
|
Exception java.lang.NullPointerException in task submitted from thrad main here: java.lang.Exception: Client stack trace at com.nurkiewicz.ExecutorServiceWithClientTrace.clientTrace(ExecutorServiceWithClientTrace.java:43) ~[classes/:na]
at com.nurkiewicz.ExecutorServiceWithClientTrace.submit(ExecutorServiceWithClientTrace.java:28) ~[classes/:na]
at com.nurkiewicz.Main.main(Main.java:31) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0]
at java.lang.reflect.Method.invoke(Method.java:483) ~[na:1.8.0]
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) ~[idea_rt.jar:na]
|
9. Prefer CompletableFuture
In Java 8 more powerful CompletableFuture
was introduced. Please use it whenever possible. ExecutorService
wasn't extended to support this enhanced abstraction, so you have to take care of it yourself. Instead of:
1
2
|
final Future<BigDecimal> future =
executorService.submit( this ::calculate);
|
do:
1
2
|
final CompletableFuture<BigDecimal> future =
CompletableFuture.supplyAsync( this ::calculate, executorService);
|
CompletableFuture
extends Future
so everything works as it used to. But more advanced consumers of your API will truly appreciate extended functionality given byCompletableFuture
.
10. Synchronous queue
SynchronousQueue
is an interesting BlockingQueue
that's not really a queue. It's not even a data structure per se. It's best explained as a queue with capacity of 0. Quoting JavaDoc:
eachinsert
operation must wait for a correspondingremove
operation by another thread, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one. You cannot peek at a synchronous queue because an element is only present when you try to remove it; you cannot insert an element (using any method) unless another thread is trying to remove it; you cannot iterate as there is nothing to iterate. [...]
Synchronous queues are similar to rendezvous channels used in CSP and Ada.
How is this related to thread pools? Try using SynchronousQueue
withThreadPoolExecutor
:
1
2
3
4
|
BlockingQueue<Runnable> queue = new SynchronousQueue<>();
ExecutorService executorService = new ThreadPoolExecutor( 2 , 2 ,
0L, TimeUnit.MILLISECONDS,
queue);
|
We created a thread pool with two threads and a SynchronousQueue
in front of it. Because SynchronousQueue
is essentially a queue with 0 capacity, suchExecutorService
will only accept new tasks if there is an idle thread available. If all threads are busy, new task will be rejected immediately and will never wait. This behavior might be desirable when processing in background must start immediately or be discarded.
That's it, I hope you found at least one interesting feature!
相关推荐
并发该库提供了接口和相关类的实现,这些类旨在支持长时间运行的阻塞任务(通常为I / O绑定)。 此功能增强了唯一的Java 8内置实现 ,该实现主要支持计算任务。 此外,该库还可以帮助解决许多异步编程难题,例如处理...
NULL 博文链接:https://x125858805.iteye.com/blog/2191873
ExecutorService方法案例文件.zip
接口 java.util.concurrent.ExecutorService 表述了异步执行的机制,并且可以让任务在后台执行。壹個 ExecutorService 实例因此特别像壹個线程池。事实上,在 java.util.concurrent 包中的 ExecutorService 的实现...
ExecutorService的execute和submit方法
ExecutorService10个要诀和技巧编程开发技术共9页.pdf.zip
本程序实现了ExecutorService线程池,内置说明txt说明,可以参考
今天小编就为大家分享一篇关于在spring boot中使用java线程池ExecutorService的讲解,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
如果 Future 结果没有完成,调用 get() 方法,程序会 阻塞 在那里,直至获取返回结果 先来看第一种实现方式,假设任务 A 由于参数原因,执行时间相对任务 B,C,D 都要长很多,但是按照程序的执行顺序,程序在 get()...
运用JAVA的concurrent.ExecutorService线程池实现socket的TCP和UDP连接
主要为大家详细介绍了java ExecutorService使用方法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
结果包括用于特定任务的线程名称和总执行时间{ " asyncTaskResults " : [{ " milliseconds " : 7500 , " threadName " : " executorService-5 " }, { " milliseconds " : 2500 , " threadName " : " executorSer
Executors: 是java.util.concurrent包下的一个类,提供了若干个静态方法,用于生成不同类型的线程池。Executors一共可以创建下面这四类线程池: 1.newFixedThreadPool创建一个可缓存线程池,如果线程池长度超过...
2_ExecutorService源码阅读1
NULL 博文链接:https://songjianyong.iteye.com/blog/2056990
主要介绍了Java使用ExecutorService来停止线程服务,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
主要介绍了java中Executor,ExecutorService,ThreadPoolExecutor详解的相关资料,需要的朋友可以参考下
主要介绍了Java 线程池ExecutorService详解及实例代码的相关资料,线程池减少在创建和销毁线程上所花的时间以及系统资源的开销.如果不使用线程池,有可能造成系统创建大量线程而导致消耗系统内存以及”过度切换
主要介绍了Java利用ExecutorService实现同步执行大量线程,ExecutorService可以维护我们的大量线程在操作临界资源时的稳定性。
10 提示和技巧 和 - 线程解释,线程状态是什么以及如何分析线程转储。 - 通过分步指南及其背后的原因解决真正复杂的问题。 Maven - Maven 配置文件 调试 - 类加载器、JVM 字节码内部结构 缓存 - JCache API Refcard ...