`

java线程池之CompletionService

阅读更多

 

Future的局限

 

在使用线程池批量并行执行任务时,有时需要获取任务的返回值,此时一般可以使用Future实现异步接收,关于Future的相关使用以及实现原理在前一篇文章中分析过(如需详细了解,请点击这里)。但在批量等待获取返回结果时,有些局限,下面先看一个真实的场景。

 

在对一个页面进行并行渲染时,一般的做法是把页面分成多个模块,每个模块作为一个任务单独提交到线程池中进行并行渲染。在主线程中,需要接受每个模块渲染的结果,对页面进行渲染。如果直接使用Future,这时需要遍历每个模块渲染任务对应的Future,调用其get方法阻塞获取渲染结果,此时会存在多次阻塞,遍历伪代码如下:

 

for(Future oneFuture:futureList){
   Object obj = oneFuture.get(); // oneFuture.get() 可能会存在多次阻塞
   //省略使用obj渲染渲染页面代码。
}
 

 

 

最理想的方式是futureList是一个排序后的列表,排前面的都是已经执行完成的,这时可以达到最高的并发效果(下一次循环时,有可能刚好下一个任务执行完成,主线程又可以继续执行自己的业务逻辑)。那要如何对这个列表排序呢?庆幸的是我们不必自己排序,java提供了现成的API: CompletionService,直接使用即可。

 

CompletionService的使用方法

 

CompletionService是一个接口类,目前其唯一实现类是ExecutorCompletionService。他的主要功能就是在批量提交的任务中,优先获取已经完成的任务(可以简单的理解为对任务执行完成的先后顺序进行排序)。具体使用方式如下:

 

public class CompletionServiceTest {
    public static void main(String[] args) throws Exception{
        //创建线程池
        Executor executor = Executors.newCachedThreadPool();
        Callable<String> temp = null;
        //创建任务列表
        Collection<Callable<String>> tasks = new ArrayList<>();
        for (int i=0;i<10;i++){
            temp = new PrintTask(i+"");
            tasks.add(temp);
        }
        solve(executor,tasks);
    }
 
    public static void solve(Executor e, Collection<Callable<String>> solvers)
            throws InterruptedException {
        //构建CompletionService
        CompletionService<String> ecs = new ExecutorCompletionService<String>(e);
        int n = solvers.size();
        List<Future<String>> futures = new ArrayList<Future<String>>(n);
        try {
            for (Callable<String> s : solvers)
                futures.add(ecs.submit(s));//提交多个任务
 
            //获取任务执行结果
            for (int i = 0; i < n; ++i) {
                try {
                    Future<String> future = ecs.take();//阻塞优先获取已经完成的任务Future
                    String r = future.get();//这步不会阻塞了
                    System.out.println(r);
                } catch (ExecutionException ignore) {}
            }
        }
        finally {
            for (Future<String> f : futures)
                //判断是否需要取消任务
                f.cancel(true);
        }
    }
}
 
class PrintTask implements Callable<String>{
 
    private String taskname;
 
    public PrintTask(String taskname) {
        this.taskname = taskname;
    }
 
    @Override
    public String call() throws Exception {
        System.out.println("任务"+taskname+":执行中");
        Thread.sleep(Integer.parseInt(taskname)*1000);
        return "任务"+taskname+":执行完成;";
    }
}
 

 

 

本示例中通过CompletionServicetake方法,每次获取到的都是最先完成任务对应的Future,由于任务已经完成,后面调用Futureget方法,就不会产生阻塞。这是如何实现的呢?下面开始对ExecutorCompletionService的实现原理进行分析。

 

ExecutorCompletionService的实现原理

 

ExecutorCompletionService在其内部维护了一个阻塞队列BlockingQueue,每当有任务执行完成后,都会放入这个队列。ExecutorCompletionServicetake方法本质上是调用的BlockingQueuetake方法,如果队列中没有完成的任务,就阻塞;如果队列中有多个完成的任务,由于BlockingQueue(默认是LinkedBlockingQueue)是FIFO队列,每次take取出的都是优先完成的任务。这就是对ExecutorCompletionService实现原理简述,下面来来具体的实现。

 

构造方法

ExecutorCompletionService的构造方法有两个,一个是使用默认的LinkedBlockingQueue作为完成任务的存放队列,另一是使用传入的BlockingQueue 参数作为完成任务的存放队列:

 
//默认使用LinkedBlockingQueue作为存放队列
public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }
 
//使用自定义的BlockingQueue作为存放队列
public ExecutorCompletionService(Executor executor,
                                     BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
}
 

 

提交任务submit方法

ExecutorCompletionService中有两个版本的submit方法,一个是Callable类型的参数;另一个是Runnable类型的参数。两个方法的实现基本相同,只是由于Runnablerun方法没有返回值,本质上差异就是需要把Runnable对象使用适配器模式封装成Callable对象(这里的适配器为RunnableAdapter,关于更多适配器模式的理解可以点击这里)。这里只对参数为Callable类型的submit方法进行讲解:

public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        //把Callable对象封装成FutureTask对象
        RunnableFuture<V> f = newTaskFor(task);
        //把FutureTask对象封装成QueueingFuture对象,并提交任务到线程池
        executor.execute(new QueueingFuture(f));
        return f;
}
 

 

可以看到submit方法,本质上上只是先把Callable对象封装成FutureTask对象,在封装成QueueingFuture对象,然后提交到线程池中。相比于线程池的submit方法,该方法只多了一步:把把FutureTask对象封装成QueueingFuture对象。再来看下内部类QueueingFuture的实现,非常简单:

private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

 

QueueingFuture基础自FutureTask,重新了父类的done()方法(该方法在FutureTask中是空的)。done()方法的重新也很简单,只是往阻塞队列中添加该任务。回想下这个done()方式是在什么时候调用的?FutureTaskrun方法在任务执行完成后会调用finishCompletion方法,finishCompletion方法末尾调用了done()方法。换句话说,让任务执行完成时会把自身放入ExecutorCompletionService维护的已完成阻塞队列:存在于这个队列中的任务都是已经完成的。

 

take方法

掉用ExecutorCompletionServicetake方法,本质上调用的是已完成阻塞队列take方法:

public Future<V> take() throws InterruptedException {
        return completionQueue.take();
}
 

 

另外ExecutorCompletionService还提供了非阻塞的poll方法,以及延时阻塞的poll方法,本质上也是直接调用阻塞队列的对应poll方法:

public Future<V> poll() {
        return completionQueue.poll();
    }
 
    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
}

 

这两个方法,可以放到一个while循环中,当没有任务执行完成时,主线程可以做一些其他事情(或者sleep一会儿),防止线程阻塞,过一会儿再继续poll 以使并行执行效果最大化。

 

总结

 

CompletionService的主要作用就是优先返回线程池中已经执行完成的任务,尽量减少主线程的阻塞时间,是并行最大化。本质上通过维护一个已完成的阻塞队列实现。

 

 

 

0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics