Future的局限
在使用线程池批量并行执行任务时,有时需要获取任务的返回值,此时一般可以使用Future实现异步接收,关于Future的相关使用以及实现原理在前一篇文章中分析过(如需详细了解,请点击这里)。但在批量等待获取返回结果时,有些局限,下面先看一个真实的场景。
在对一个页面进行并行渲染时,一般的做法是把页面分成多个模块,每个模块作为一个任务单独提交到线程池中进行并行渲染。在主线程中,需要接受每个模块渲染的结果,对页面进行渲染。如果直接使用Future,这时需要遍历每个模块渲染任务对应的Future,调用其get方法阻塞获取渲染结果,此时会存在多次阻塞,遍历伪代码如下:
for(Future oneFuture:futureList){ Object obj = oneFuture.get(); // oneFuture.get() 可能会存在多次阻塞 //省略使用obj渲染渲染页面代码。 }
最理想的方式是futureList是一个排序后的列表,排前面的都是已经执行完成的,这时可以达到最高的并发效果(下一次循环时,有可能刚好下一个任务执行完成,主线程又可以继续执行自己的业务逻辑)。那要如何对这个列表排序呢?庆幸的是我们不必自己排序,java提供了现成的API: CompletionService,直接使用即可。
CompletionService的使用方法
CompletionService是一个接口类,目前其唯一实现类是ExecutorCompletionService。他的主要功能就是在批量提交的任务中,优先获取已经完成的任务(可以简单的理解为对任务执行完成的先后顺序进行排序)。具体使用方式如下:
public class CompletionServiceTest { public static void main(String[] args) throws Exception{ //创建线程池 Executor executor = Executors.newCachedThreadPool(); Callable<String> temp = null; //创建任务列表 Collection<Callable<String>> tasks = new ArrayList<>(); for (int i=0;i<10;i++){ temp = new PrintTask(i+""); tasks.add(temp); } solve(executor,tasks); } public static void solve(Executor e, Collection<Callable<String>> solvers) throws InterruptedException { //构建CompletionService CompletionService<String> ecs = new ExecutorCompletionService<String>(e); int n = solvers.size(); List<Future<String>> futures = new ArrayList<Future<String>>(n); try { for (Callable<String> s : solvers) futures.add(ecs.submit(s));//提交多个任务 //获取任务执行结果 for (int i = 0; i < n; ++i) { try { Future<String> future = ecs.take();//阻塞优先获取已经完成的任务Future String r = future.get();//这步不会阻塞了 System.out.println(r); } catch (ExecutionException ignore) {} } } finally { for (Future<String> f : futures) //判断是否需要取消任务 f.cancel(true); } } } class PrintTask implements Callable<String>{ private String taskname; public PrintTask(String taskname) { this.taskname = taskname; } @Override public String call() throws Exception { System.out.println("任务"+taskname+":执行中"); Thread.sleep(Integer.parseInt(taskname)*1000); return "任务"+taskname+":执行完成;"; } }
本示例中通过CompletionService的take方法,每次获取到的都是最先完成任务对应的Future,由于任务已经完成,后面调用Future的get方法,就不会产生阻塞。这是如何实现的呢?下面开始对ExecutorCompletionService的实现原理进行分析。
ExecutorCompletionService的实现原理
ExecutorCompletionService在其内部维护了一个阻塞队列BlockingQueue,每当有任务执行完成后,都会放入这个队列。ExecutorCompletionService的take方法本质上是调用的BlockingQueue的take方法,如果队列中没有完成的任务,就阻塞;如果队列中有多个完成的任务,由于BlockingQueue(默认是LinkedBlockingQueue)是FIFO队列,每次take取出的都是优先完成的任务。这就是对ExecutorCompletionService实现原理简述,下面来来具体的实现。
构造方法
ExecutorCompletionService的构造方法有两个,一个是使用默认的LinkedBlockingQueue作为完成任务的存放队列,另一是使用传入的BlockingQueue 参数作为完成任务的存放队列:
//默认使用LinkedBlockingQueue作为存放队列 public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); } //使用自定义的BlockingQueue作为存放队列 public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) { if (executor == null || completionQueue == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = completionQueue; }
提交任务submit方法
ExecutorCompletionService中有两个版本的submit方法,一个是Callable类型的参数;另一个是Runnable类型的参数。两个方法的实现基本相同,只是由于Runnable的run方法没有返回值,本质上差异就是需要把Runnable对象使用适配器模式封装成Callable对象(这里的适配器为RunnableAdapter,关于更多适配器模式的理解可以点击这里)。这里只对参数为Callable类型的submit方法进行讲解:
public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); //把Callable对象封装成FutureTask对象 RunnableFuture<V> f = newTaskFor(task); //把FutureTask对象封装成QueueingFuture对象,并提交任务到线程池 executor.execute(new QueueingFuture(f)); return f; }
可以看到submit方法,本质上上只是先把Callable对象封装成FutureTask对象,在封装成QueueingFuture对象,然后提交到线程池中。相比于线程池的submit方法,该方法只多了一步:把把FutureTask对象封装成QueueingFuture对象。再来看下内部类QueueingFuture的实现,非常简单:
private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future<V> task; }
QueueingFuture基础自FutureTask,重新了父类的done()方法(该方法在FutureTask中是空的)。done()方法的重新也很简单,只是往阻塞队列中添加该任务。回想下这个done()方式是在什么时候调用的?FutureTask的run方法在任务执行完成后会调用finishCompletion方法,finishCompletion方法末尾调用了done()方法。换句话说,让任务执行完成时会把自身放入ExecutorCompletionService维护的“已完成阻塞队列”:存在于这个队列中的任务都是已经完成的。
take方法
掉用ExecutorCompletionService的take方法,本质上调用的是“已完成阻塞队列”的take方法:
public Future<V> take() throws InterruptedException { return completionQueue.take(); }
另外ExecutorCompletionService还提供了非阻塞的poll方法,以及延时阻塞的poll方法,本质上也是直接调用阻塞队列的对应poll方法:
public Future<V> poll() { return completionQueue.poll(); } public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException { return completionQueue.poll(timeout, unit); }
这两个方法,可以放到一个while循环中,当没有任务执行完成时,主线程可以做一些其他事情(或者sleep一会儿),防止线程阻塞,过一会儿再继续poll 以使并行执行效果最大化。
总结
CompletionService的主要作用就是优先返回线程池中已经执行完成的任务,尽量减少主线程的阻塞时间,是并行最大化。本质上通过维护一个“已完成的阻塞队列”实现。
相关推荐
java 线程池 完整 源码 java 线程池 完整 源码
简单的线程池程序+中文文档 包结构: com.tangkai.threadpool --SimpleThread.java 工作线程 --TestThreadPool.java 程序入口 --ThreadPoolManager.java 线程池管理类
主要给大家介绍了关于java线程池使用后到底要不要关闭的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
Reference: 《创建Java线程池》[1],《Java线程:新特征-线程池》[2], 《Java线程池学习》[3],《线程池ThreadPoolExecutor使用简介》[4],《Java5中的线程池实例讲解》[5],《ThreadPoolExecutor使用和思考》[6] ...
2.然后根据提示运行java命令执行示例程序,观看线程池的运行结果 目标:Java中多线程技术是一个难点,但是也是一个核心技术。因为Java本身就是一个多线程语言。本人目前在给46班讲授Swing的网络编程--使用Swing来...
Java线程池使用说明Java线程池使用说明Java线程池使用说明
java线程池知识、
java 线程池 java 线程池 java 线程池 java 线程池
基于Java线程池技术实现Knock Knock游戏项目.zip 基于Java线程池技术实现Knock Knock游戏项目.zip 基于Java线程池技术实现Knock Knock游戏项目.zip 基于Java线程池技术实现Knock Knock游戏项目.zip 基于Java线程池...
java线程池实例java线程池实例E:\Users\Administrator\workspace
java线程池封装j
java技术学习——基于Java线程池技术实现Knock Knock游戏项目(包含服务端、客户端两部分) java技术学习——基于Java线程池技术实现Knock Knock游戏项目(包含服务端、客户端两部分) java技术学习——基于Java...
讲述了java线程池的优点,参数,6种线程池的使用场景,线程池用到的handler,线程任务的提交方式等等。
自定义实现Java线程池,学习大师设计思想,瞻仰大神笔法
java线程池的源码分析以及各种池之间的对比;
基于Java线程池技术的数据爬虫设计与实现.pdf
JAVA线程池的原理与实现.pdf
1.媲美java线程池框架,整套源码资源,使用Intellij Idea开发工具,JDK1.8以上 2.带有测试代码 3.可以根据项目实际情况任意调整代码 4.任务队列、拒绝策略 5.BasicThreadPool.java、LinkedRunnableQueue.java、...
NULL 博文链接:https://yulincqupt.iteye.com/blog/1673919
JAVA使用线程池查询大批量数据