`
Donald_Draper
  • 浏览: 950956 次
社区版块
存档分类
最新评论

ExecutorCompletionService解析

    博客分类:
  • JUC
阅读更多
Executor接口的定义:http://donald-draper.iteye.com/blog/2365625
ExecutorService接口定义:http://donald-draper.iteye.com/blog/2365738
Future接口定义:http://donald-draper.iteye.com/blog/2365798
FutureTask解析:http://donald-draper.iteye.com/blog/2365980
CompletionService接口定义:http://donald-draper.iteye.com/blog/2366239

package java.util.concurrent;

/**
 * A {@link CompletionService} that uses a supplied {@link Executor}
 * to execute tasks.  This class arranges that submitted tasks are,
 * upon completion, placed on a queue accessible using {@code take}.
 * The class is lightweight enough to be suitable for transient use
 * when processing groups of tasks.
 *
 ExecutorCompletionService是执行器的补充或者说是辅助,用于执行任务。
 任务提交到ExecutorCompletionService执行后,如果任务执行完,则添加到完成
 队列,我们可以通过take,取回完成任务的结果。ExecutorCompletionService
 是处理集合任务的一个轻量级的实现。

 * <p>
 *
 * <b>Usage Examples.</b>
 *
 实例:
 * Suppose you have a set of solvers for a certain problem, each
 * returning a value of some type {@code Result}, and would like to
 * run them concurrently, processing the results of each of them that
 * return a non-null value, in some method {@code use(Result r)}. You
 * could write this as:
 *
假设将一个确定的问题,分成n个部分,这n个部分可以并发执行,每个部分返回一个非null,结果
在其他一些方法中,使用这些结果。
 * <pre> {@code
 * void solve(Executor e,
 *            Collection<Callable<Result>> solvers)
 *     throws InterruptedException, ExecutionException {
 *     CompletionService<Result> ecs
 *         = new ExecutorCompletionService<Result>(e);
 *     for (Callable<Result> s : solvers)
 *         ecs.submit(s);
 *     int n = solvers.size();
 *     for (int i = 0; i < n; ++i) {
 *         Result r = ecs.take().get();
 *         if (r != null)
 *             use(r);
 *     }
 * }}</pre>
 *
 * Suppose instead that you would like to use the first non-null result
 * of the set of tasks, ignoring any that encounter exceptions,
 * and cancelling all other tasks when the first one is ready:
 *
 假设我们仅想用任务集合中第一个完成任务的结果,并忽略其他任务遇到的异常,
 当第一个完成任务的结果可用时,取消其他任务。
 * <pre> {@code
 * void solve(Executor e,
 *            Collection<Callable<Result>> solvers)
 *     throws InterruptedException {
 *     CompletionService<Result> ecs
 *         = new ExecutorCompletionService<Result>(e);
 *     int n = solvers.size();
 *     List<Future<Result>> futures
 *         = new ArrayList<Future<Result>>(n);
 *     Result result = null;
 *     try {
 *         for (Callable<Result> s : solvers)
 *             futures.add(ecs.submit(s));
 *         for (int i = 0; i < n; ++i) {
 *             try {
 *                 Result r = ecs.take().get();
 *                 if (r != null) {
 *                     result = r;
 *                     break;
 *                 }
 *             } catch (ExecutionException ignore) {}
 *         }
 *     }
 *     finally {
 *         for (Future<Result> f : futures)
 *             f.cancel(true);
 *     }
 *
 *     if (result != null)
 *         use(result);
 * }}</pre>
 */
public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;//执行器
    private final AbstractExecutorService aes;//执行器服务
    private final BlockingQueue<Future<V>> completionQueue;//任务完成队列

    /**
     * FutureTask extension to enqueue upon completion
     扩展FutureTask的队列完成任务
     */
    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
	//这个是关键,在FutureTask那篇文章中,我们有讲,及当任务完成时调用done方法,
	//done方法为抽象方法,待子类扩展
        protected void done() { 
	        //当任务任务执行结束时,添加到完成队列
		completionQueue.add(task); 
	}
        private final Future<V> task;
    }
    //根据Callable,创建
    private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            return new FutureTask<V>(task);
        else
            return aes.newTaskFor(task);
    }
    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
        if (aes == null)
            return new FutureTask<V>(task, result);
        else
            return aes.newTaskFor(task, result);
    }
    /**
     * Creates an ExecutorCompletionService using the supplied
     * executor for base task execution and a
     * {@link LinkedBlockingQueue} as a completion queue.
     *
     根据执行器构建ExecutorCompletionService,完成队列默认为LinkedBlockingQueue
     * @param executor the executor to use
     * @throws NullPointerException if executor is {@code null}
     */
    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
	//将完成任务放在LinkedBlockingQueue中
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

    /**
     * Creates an ExecutorCompletionService using the supplied
     * executor for base task execution and the supplied queue as its
     * completion queue.
     *
     根据执行器和完成队列构建ExecutorCompletionService
     * @param executor the executor to use
     * @param completionQueue the queue to use as the completion queue
     *        normally one dedicated for use by this service. This
     *        queue is treated as unbounded -- failed attempted
     *        {@code Queue.add} operations for completed taskes cause
     *        them not to be retrievable.
     * @throws NullPointerException if executor or completionQueue are {@code null}
     */
    public ExecutorCompletionService(Executor executor,
                                     BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }
   //提交执行Callable任务
    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }
   //提交执行Runnable任务
    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture(f));
        return f;
    }
    //从完成队列take完成任务的结果,没有则阻塞,直到有任务完成
    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }
    //从完成队列获取完成任务的结果,没有则返回null
    public Future<V> poll() {
        return completionQueue.poll();
    }
    //从完成队列获取完成任务的结果,没有则超时等待,如果超时等待期间还没有完成任务,返回为null
    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }
}

总结:
ExecutorCompletionService内部关联一个执行器AbstractExecutorService和
一个阻塞的任务完成队列,默认为LinkedBlockingQueue。当提交任务,则包装成QueueingFuture,QueueingFuture
扩展了FutureTask,重写done方法,即在任务执行结束时,添加任务执行结果到完成队列。
而take,poll,超时poll直接委托给完成队列。
0
0
分享到:
评论

相关推荐

    Callable,Future的使用方式

    Callable,Future的使用方式,里面使用了三种使用方式分别是FutureTask,ExecutorService,ExecutorCompletionService

    多线程相关代码(新)

    包括阻塞队列、阻塞栈、ExecutorService、Future、ExecutorCompletionService、死锁、join、重入锁、读写锁、多线程抢票、信号量、signal/await、ThreadLocal等的实例。

    Toxi / Oxy Pro 便携式气体检测仪参考手册 使用说明书

    Toxi Oxy Pro 便携式气体检测仪参考手册 使用说明书

    科傻模拟网优化操作-教程书

    官方的的说明书资料,部分视频说明在这里: https://www.bilibili.com/video/BV1Fz4y1d7rn/?spm_id_from=333.999.0.0&vd_source=13dc65dbb4ac9127d9af36e7b281220e

    node-v8.14.0-x64.msi

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    2023商业银行数据资产体系白皮书,主要介绍了“三位一体”数据资产体系的构成与工作机制,以及商业银行数据资产体系建设实践

    2023商业银行数据资产体系白皮书 目录 第 1 章 数据资产化与数据要素市场化相辅相成,相互促进 第 2 章 数据资产化是企业数据治理向上演进的必经之路 第 3 章 数据资产体系发展概述 第 4 章 “三位一体”数据资产体系的构思 4.1“三位一体”数据资产体系的构成与工作机制 数据资产管理 数据资产运营 数据资产评价 数据资产体系工作机制 4.2“三位一体”数据资产体系的相互作用关系 4.3“三位一体”数据资产体系的构建 4.4“三位一体”数据资产体系的优势 第 5 章 商业银行数据资产体系建设实践 5.1商业银行开展数据资产体系建设的背景和目标 5.2商业银行数据资产体系建设的工作步骤 5.3上海银行数据资产体系建设实践的主要成果 第 6 章 数据要素流通市场赋能企业数据资产化 6.1全国多层次数据要素市场的建设 6.2上海数据交易所赋能企业数据资产化 6.3数据要素流通交易市场赋能企业数据资产化的展望 第 7 章 未来演进与展望

    基于微信小程序的助农扶贫小程序

    大学生毕业设计、大学生课程设计作业

    车辆销售数据Python爬取并做数据分析,项目源码注解清晰一看就懂.zip

    车辆销售数据Python爬取并做数据分析,项目源码注解清晰一看就懂

    毕业设计:基于SSM的mysql-学生社团管理系统(源码 + 数据库 + 说明文档)

    毕业设计:基于SSM的mysql_学生社团管理系统(源码 + 数据库 + 说明文档) 第2章 主要技术和工具介绍 1 2.1 JSP语言 1 2.2 MySQL数据库 1 2.3 jsp技术 2 2.4ssm简介 3 第3章 系统分析 1 3.1可行性分析 1 3.1.1经济可行性 1 3.1.2技术可行性 1 3.1.3操作可行性 1 3.2需求分析 1 3.3业务流程分析 2 3.4数据流程分析 3 第4章 系统设计 5 4.1系统结构设计 5 4.2功能模块设计 5 4.3数据库设计 6 4.3.1数据库设计概述 6 4.3.1概念设计 6 4.3.2表设计 7 第5章 系统实现 15 5.1基本任务 15 5.2登录模块的实现 15 5.2.1首页实现 15 5.2.2管理员后台登录 16 5.3用户模块的实现 19 5.3.1注册模块及登录的实现 19 5.2.2入团模块的实现 21 5.2.3场地预约模块的实现 22 5.4管理员模块的实现 24 5.4.1系统用户管理模块的实现 24 5.4.2活动公告管理模块的实现 26 5.5社团模块的实现 28 5.5.1活动信息

    大健康零售业务O2O数字化战略规划方案.pptx

    大健康零售业务O2O数字化战略规划方案.pptx

    数据中台项目主要岗位及其职责和任务

    数据中台项目主要岗位及其职责和任务

    node-v8.0.0-linux-armv7l.tar.gz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    流程制造行业数字化智能工厂总体规划建设方案.pptx

    流程制造行业数字化智能工厂总体规划建设方案.pptx

    c语言学生成绩管理系统源码.zip

    c语言学生成绩管理系统源码.zip

    DEV-C++-5.11下载链接

    DEV-C++-5.11下载链接

    电器租赁小程序.zip

    电器租赁小程序.zip

    学生成绩管理系统 数据结构与算法课程设计 C++.zip

    学生成绩管理系统 数据结构与算法课程设计 C++

    知乎小程序算法.zip

    知乎小程序算法.zip

    基于R语言SIR传染病传播的SIR模型,很全,可直接应用仿真模拟.rar

    基于R语言SIR传染病传播的SIR模型,很全,可直接应用仿真模拟.rar

Global site tag (gtag.js) - Google Analytics