`
langgufu
  • 浏览: 2291254 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

使用CompletionService批处理任务(线程池阻塞线程)

阅读更多

如果你向Executor提交了一个批处理任务,并且希望在它们完成后获得结果。为此你可以保存与每个任务相关联的Future,然后不断地调用timeout为零的get,来检验Future是否完成。这样做固然可以,但却相当乏味。幸运的是,还有一个更好的方法:完成服务(Completion service)。

CompletionService整合了Executor和BlockingQueue的功能。你可以将Callable任务提交给它去执行,然后使用类似于队列中的take和poll方法,在结果完整可用时获得这个结果,像一个打包的Future。ExecutorCompletionService是实现CompletionService接口的一个类,并将计算任务委托给一个Executor。

ExecutorCompletionService的实现相当直观。它在构造函数中创建一个BlockingQueue,用它去保持完成的结果。计算完成时会调用FutureTask中的done方法。当提交一个任务后,首先把这个任务包装为一个QueueingFuture,它是FutureTask的一个子类,然后覆写done方法,将结果置入BlockingQueue中,take和poll方法委托给了BlockingQueue,它会在结果不可用时阻塞。

 

[java] view plaincopy
  1. import java.util.Random;  
  2. import java.util.concurrent.BlockingQueue;  
  3. import java.util.concurrent.Callable;  
  4. import java.util.concurrent.CompletionService;  
  5. import java.util.concurrent.ExecutionException;  
  6. import java.util.concurrent.ExecutorCompletionService;  
  7. import java.util.concurrent.ExecutorService;  
  8. import java.util.concurrent.Executors;  
  9. import java.util.concurrent.Future;  
  10. import java.util.concurrent.LinkedBlockingQueue;  
  11.   
  12. public class Test17 {  
  13.     public static void main(String[] args) throws Exception {  
  14.         Test17 t = new Test17();  
  15.         t.count1();  
  16.         t.count2();  
  17.     }  
  18. //使用阻塞容器保存每次Executor处理的结果,在后面进行统一处理  
  19.     public void count1() throws Exception{  
  20.         ExecutorService exec = Executors.newCachedThreadPool();  
  21.         BlockingQueue<Future<Integer>> queue = new LinkedBlockingQueue<Future<Integer>>();  
  22.         for(int i=0; i<10; i++){  
  23.             Future<Integer> future =exec.submit(getTask());  
  24.             queue.add(future);  
  25.         }  
  26.         int sum = 0;  
  27.         int queueSize = queue.size();  
  28.         for(int i=0; i<queueSize; i++){  
  29.             sum += queue.take().get();  
  30.         }  
  31.         System.out.println("总数为:"+sum);  
  32.         exec.shutdown();  
  33.     }  
  34. //使用CompletionService(完成服务)保持Executor处理的结果  
  35.     public void count2() throws InterruptedException, ExecutionException{  
  36.         ExecutorService exec = Executors.newCachedThreadPool();  
  37.         CompletionService<Integer> execcomp = new ExecutorCompletionService<Integer>(exec);  
  38.         for(int i=0; i<10; i++){  
  39.             execcomp.submit(getTask());  
  40.         }  
  41.         int sum = 0;  
  42.         for(int i=0; i<10; i++){  
  43. //检索并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。  
  44.             Future<Integer> future = execcomp.take();  
  45.             sum += future.get();  
  46.         }  
  47.         System.out.println("总数为:"+sum);  
  48.         exec.shutdown();  
  49.     }  
  50.     //得到一个任务  
  51.     public Callable<Integer> getTask(){  
  52.         final Random rand = new Random();  
  53.         Callable<Integer> task = new Callable<Integer>(){  
  54.             @Override  
  55.             public Integer call() throws Exception {  
  56.                 int i = rand.nextInt(10);  
  57.                 int j = rand.nextInt(10);  
  58.                 int sum = i*j;  
  59.                 System.out.print(sum+"\t");  
  60.                 return sum;  
  61.             }  
  62.         };  
  63.         return task;  
  64.     }  
  65.     /** 
  66.      * 执行结果: 
  67.         6   6   14  40  40  0   4   7   0   0   总数为:106 
  68.         12  6   12  54  81  18  14  35  45  35  总数为:312 
  69.      */  
  70. }  

ExecutorCompletionService统一了ExecutorService和BlockingQueue,既有线程池功能,能提交任务,又有阻塞队列功能,能判断所有线程的执行结果。

分享到:
评论

相关推荐

    阻塞线程池 阻塞线程池 阻塞线程池

    阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池...

    Python 使用threading+Queue实现线程池示例

    在线程池缓存线程可用已有的闲置线程来执行新任务,避免了创建/销毁带来的系统开销。 1.2 线程并发数量过多,抢占系统资源从而导致阻塞。 线程能共享系统资源,如果同时执行的线程过多,就有可能导致系统资源不足而...

    Java并发编程相关源码集 包括多任务线程,线程池等.rar

    多个线程竞争问题、多个线程多个锁问题、创建一个缓存的线程池、多线程使用Vector或者HashTable的示例(简单线程同步问题)、PriorityBlockingQueue示例、高性能无阻塞无界队列: ConcurrentLinkedQueue、DelayQueue...

    并发-线程池和阻塞队列.pdf

    讲述线程池原理,线程池使用场景和注意事项,手动创建线程池方法,注意事项,阻塞队列的相关知识

    基础知识五、Python实现线程池之线程安全队列

    文章目录一、线程池组成二、线程安全队列的实现三、测试逻辑3.1、测试阻塞逻辑3.2、测试读写加锁逻辑 一、线程池组成  一个完整的线程池由下面几部分组成,线程安全队列、任务对象、线程处理对象、线程池对象。其中...

    java线程池概念.txt

    //如果线程池线程大小大于核心线且且添加任务到线程失败,就把任务添加到阻塞队列 if (poolSize &gt;= corePoolSize || !addIfUnderCorePoolSize(command)) {//新建线程并启动 if (runState == RUNNING && workQueue...

    非阻塞线程池框架,管理线程,管理连接

    非阻塞线程池框架。利用该API提高代码线程效率,参考该代码更好管理线程,管理连接。

    在线程池中创建多少线程比较合理?

    在设置线程池线程个数的时候,经常会想到这个问题,是不是设置的线程数越多越好?理解这个问题之前我们要先清楚的知道我们为什么使用多线程。 为什么会使用多线程 使用多线程的主要目的我们应该都能回答的出来就是...

    并发-线程池和阻塞队列

    并发-线程池和阻塞队列 并发-线程池和阻塞队列 并发-线程池和阻塞队列

    java线程详解

    二、单任务线程池 三、可变尺寸的线程池 四、延迟连接池 五、单任务延迟连接池 六、自定义线程池 Java线程:新特征-有返回值的线程 Java线程:新特征-锁(上) Java线程:新特征-阻塞队列 Java线程:新特征-...

    论文研究-Linux下通用线程池的改进与实现.pdf

    对线程池的阻塞唤醒机制,动态调整,线程安全退出,参数处理,系统线程数限制等细节进行研究,保证了其在不同应用场景下的独立性和通用性;同时采用一种基于数组的链表机制来改进线程池的查找分配算法,将其时间...

    java线程池详解

    正好,线程池缓存线程,可用已有的闲置线程来执行新任务,避免了T1+T3带来的系统开销 2. 线程并发数量过多,抢占系统资源从而导致阻塞 我们知道线程能共享系统资源,如果同时执行的线程过多,就有可能导致系统资源...

    Java多线程编程总结

    Java线程:新特征-线程池 Java线程:新特征-有返回值的线程 Java线程:新特征-锁(上) Java线程:新特征-锁(下) Java线程:新特征-信号量 Java线程:新特征-阻塞队列 Java线程:新特征-阻塞栈 Java线程:新特征-...

    Java 线程状态、线程池

    java 线程状态、线程池 1. java 的线程状态 状态 发生条件 NEW 线程刚刚被创建,没有启动,没有调用start方法 RUNNABLE(可运行) 线程已经在JVM中运行,但是是否运行不确定,看当前线程是否由CPU执行权 ...

    java多线程编程总结

    Java线程:新特征-线程池 Java线程:新特征-有返回值的线程 Java线程:新特征-锁(上) Java线程:新特征-锁(下) Java线程:新特征-信号量 Java线程:新特征-阻塞队列 Java线程:新特征-阻塞栈 Java线程:新特征-...

    java多线程实现生产者和消费者

    java多线程实现生产者和消费者 ,4种实现方式,分别为synchronizated,condition和lock,信号量,阻塞队列

    浅谈Android 的线程和线程池的使用

    Android 的线程和线程池 从用途上分,线程分为主线程和子线程;主线程主要处理和界面相关的事情,子线程则往往用于耗时操作。 主线程和子线程 主线程是指进程所拥有的线程。Android 中主线程交 UI 线程,主要作用是...

    Java几种线程池类型介绍及使用.docx

    1.使用new Thread()创建线程的弊端: 每次通过new Thread()创建对象...可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。 提供定时执行、定期执行、单线程、并发数控制等功能。

Global site tag (gtag.js) - Google Analytics