`
qindongliang1922
  • 浏览: 2148444 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
7265517b-f87e-3137-b62c-5c6e30e26109
证道Lucene4
浏览量:116348
097be4a0-491e-39c0-89ff-3456fadf8262
证道Hadoop
浏览量:124617
41c37529-f6d8-32e4-8563-3b42b2712a50
证道shell编程
浏览量:58495
43832365-bc15-3f5d-b3cd-c9161722a70c
ELK修真
浏览量:70385
社区版块
存档分类
最新评论

JAVA中如何使用线程池来管理并行任务

阅读更多
大多数时候,我们都可以采取多线程+线程池的方式,来优化我们程序的处理效率,JAVA在JDK1.5后的并发包,提供了很多方便快捷的并发工具辅助类,来简化并发编程。

今天散仙,简单的描述下怎么使用CompletionService和Future来快捷的处理多个并行的任务。

需求如下:现在某个盘符的目录下有10个txt文件,每行一个单词,让你利用多线程来快速,去重和并发去重后的结果,写入一个总的txt文件。
(注意,去重有2部分,先是单个txt里面的去重,然后是合并后的去重)

如果的这个需求,利用线程池来处理,是非常方便的。

下面散仙的这两个例子,是模拟如下需求来测试的。
代码如下:

package com.dic.merge;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * 测试线程池提交任务
 * @author qindongliang
 * 
 * 
 * 
 * **/
public class TestFutureTask {

	
	
	public static void main(String[] args) throws Exception{
		
		TestFutureTask tft=new TestFutureTask();
		tft.testFutureTask();//测试FutrueTask
	//	tft.testCompletionService();//测试CompletionService
	}
	
	
	
	   /*
	    * 线程池
	    * 
	    * **/
	   ExecutorService es=Executors.newFixedThreadPool(5);
	
	   /*
	    * 
	    * 管理异步提交的task
	    * 
	    * 
	    * **/
	   private   CompletionService<List<String>> cs=new ExecutorCompletionService<List<String>>(es);
	  
	   
	   /**
	    * 异步进行,并获取结果
	    * 
	    * **/
	   public void testCompletionService()throws Exception{
		   
		
		   for(int i=0;i<3;i++){
			   
			  cs.submit(new MyTask());
			 //  Future<List<String>> f=  cs.submit(new MyTask());
			   //在此获取结果是如果第一个任务没有执行完,则其他的任务一直阻塞
//			   if(f.get()!=null){
//				   System.out.println("完成了....: "+f.get());
//			   }
		   }
		   
		   List<Future<List<String>>> list=new ArrayList<Future<List<String>>>();
		   
		   for(int i=0;i<3;i++){
			   list.add(cs.take());
		   }
		   
		   for(Future<List<String>> l:list){
				 System.out.println("有任务执行完毕: "+l.get());
			 }
//		   
		   es.shutdown();
	   }
	public  void testFutureTask()throws Exception{
		List<Future<List<String>>> list=new ArrayList<Future<List<String>>>();
		for(int i=0;i<3;i++){
			 list.add(es.submit(new MyTask()));
			//Future<List<String>> f=es.submit(new MyTask());
			// es.submit(new MyTask());
//			if(f.get()!=null){
//				System.out.println("有任务完成:  "+f.get());
//			}
		}   
		 for(Future<List<String>> l:list){
			 System.out.println("有任务执行完毕: "+l.get());
		 }
//		
		es.shutdown();
		
	}
	
	
	
	
	
	
	/**
	 * 实现Callable接口,带任务返回值的线程类
	 * 
	 * 
	 * ***/
	private class MyTask implements Callable<List<String>>{
		private Random r=new Random();
		@Override
		public List<String> call() throws Exception {
			int count=0;
			 String name=Thread.currentThread().getName();
			System.out.println(Thread.currentThread().getName()+"  开启启动任务......");
				List<String> list=new ArrayList<String>();
				
				for(int i=0;i<5;i++){
					int data=r.nextInt(5);
					//System.out.println(name+"    ==> "+data);
					count+=data;
					Thread.sleep(data*1000);
					list.add(name+"  "+data+"");
			System.out.println(name+"  线程沉睡"+data+" 秒,放入List数据");
				}
			System.out.println(name+"任务完成,总共沉睡时间为: "+count+"s .");
			
			return list;
		}
	}
	
	
	
	
	
	
	
	
	
	
	
}

执行testFutureTask方法的运行输出:
pool-1-thread-2  开启启动任务......
pool-1-thread-3  开启启动任务......
pool-1-thread-1  开启启动任务......
pool-1-thread-3  线程沉睡1 秒,放入List数据
pool-1-thread-1  线程沉睡2 秒,放入List数据
pool-1-thread-3  线程沉睡2 秒,放入List数据
pool-1-thread-2  线程沉睡4 秒,放入List数据
pool-1-thread-3  线程沉睡1 秒,放入List数据
pool-1-thread-1  线程沉睡2 秒,放入List数据
pool-1-thread-3  线程沉睡3 秒,放入List数据
pool-1-thread-1  线程沉睡3 秒,放入List数据
pool-1-thread-2  线程沉睡3 秒,放入List数据
pool-1-thread-2  线程沉睡0 秒,放入List数据
pool-1-thread-1  线程沉睡1 秒,放入List数据
pool-1-thread-2  线程沉睡2 秒,放入List数据
pool-1-thread-2  线程沉睡0 秒,放入List数据
pool-1-thread-2任务完成,总共沉睡时间为: 9s .
pool-1-thread-3  线程沉睡3 秒,放入List数据
pool-1-thread-3任务完成,总共沉睡时间为: 10s .
pool-1-thread-1  线程沉睡3 秒,放入List数据
pool-1-thread-1任务完成,总共沉睡时间为: 11s .
有任务执行完毕: [pool-1-thread-1  2, pool-1-thread-1  2, pool-1-thread-1  3, pool-1-thread-1  1, pool-1-thread-1  3]
有任务执行完毕: [pool-1-thread-2  4, pool-1-thread-2  3, pool-1-thread-2  0, pool-1-thread-2  2, pool-1-thread-2  0]
有任务执行完毕: [pool-1-thread-3  1, pool-1-thread-3  2, pool-1-thread-3  1, pool-1-thread-3  3, pool-1-thread-3  3]

执行testCompletionService方法的输出如下:
pool-1-thread-1  开启启动任务......
pool-1-thread-3  开启启动任务......
pool-1-thread-2  开启启动任务......
pool-1-thread-3  线程沉睡2 秒,放入List数据
pool-1-thread-2  线程沉睡2 秒,放入List数据
pool-1-thread-1  线程沉睡4 秒,放入List数据
pool-1-thread-1  线程沉睡1 秒,放入List数据
pool-1-thread-3  线程沉睡3 秒,放入List数据
pool-1-thread-2  线程沉睡3 秒,放入List数据
pool-1-thread-1  线程沉睡2 秒,放入List数据
pool-1-thread-3  线程沉睡2 秒,放入List数据
pool-1-thread-2  线程沉睡3 秒,放入List数据
pool-1-thread-2  线程沉睡1 秒,放入List数据
pool-1-thread-1  线程沉睡3 秒,放入List数据
pool-1-thread-3  线程沉睡3 秒,放入List数据
pool-1-thread-2  线程沉睡2 秒,放入List数据
pool-1-thread-2任务完成,总共沉睡时间为: 11s .
pool-1-thread-3  线程沉睡2 秒,放入List数据
pool-1-thread-3任务完成,总共沉睡时间为: 12s .
pool-1-thread-1  线程沉睡2 秒,放入List数据
pool-1-thread-1任务完成,总共沉睡时间为: 12s .
有任务执行完毕: [pool-1-thread-2  2, pool-1-thread-2  3, pool-1-thread-2  3, pool-1-thread-2  1, pool-1-thread-2  2]
有任务执行完毕: [pool-1-thread-3  2, pool-1-thread-3  3, pool-1-thread-3  2, pool-1-thread-3  3, pool-1-thread-3  2]
有任务执行完毕: [pool-1-thread-1  4, pool-1-thread-1  1, pool-1-thread-1  2, pool-1-thread-1  3, pool-1-thread-1  2]


观察上面的两个结果,我们就会发现,Future类的先执行完的任务,并不一定是先输出的,而在CompletionService里面,我们可以发现,最先完成的任务,一定是最先输出的,这一点需要注意一下,CompletionService提供了,异步分离任务的机制,将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者 submit 执行的任务。使用者 take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。例如,CompletionService 可以用来管理异步 IO ,执行读操作的任务作为程序或系统的一部分提交,然后,当完成读操作时,会在程序的不同部分执行其他操作,执行操作的顺序可能与所请求的顺序不同。

大部分情况下,这两种方式,我们都可以任意采用,如果,我们需要先完成的任务先输出,那么我们应该优先选择CompletionService 类,如果不在意任务的先后顺序,仅仅是看到任务的结果,那么我们即可以使用Future,也可以使用CompletionService。



分享到:
评论
1 楼 JavaCFW 2014-05-10  
  

相关推荐

    通用多线程模块(jdk线程池的运用)

    介绍一个通用多线程服务模块。是利用jdk线程池,多线程并行处理多任务,以提高执行效率。

    java高并发相关知识点.docx

    线程池:Java中的线程池机制,包括线程池的创建、执行任务、关闭等操作。 并发集合:Java中的并发集合,包括ConcurrentHashMap、ConcurrentLinkedQueue、CopyOnWriteArrayList等。 并发控制:Java中的并发控制机制,...

    Java 7并发编程实战手册

    java7在并发编程方面,带来了很多令人激动的新功能,这将使你的应用程序具备更好的并行任务性能。 《Java 7并发编程实战手册》是Java 7并发编程的实战指南,介绍了Java 7并发API中大部分重要而有用的机制。全书分为9...

    Java并发编程(学习笔记).xmind

    某些应用程序中存在比较明显的任务边界,而在其他一些程序中则需要进一步分析才能揭示出粒度更细的并行性 任务的取消和关闭 任务取消 停止基于线程的服务 处理非正常的线程终止 JVM关闭 线程池...

    Java并发编程实战

    6.3.4 在异构任务并行化中存在的局限 6.3.5 CompletionService:Executor与BlockingQueue 6.3.6 示例:使用CompletionService实现页面渲染器 6.3.7 为任务设置时限 6.3.8 示例:旅行预定门户网站 第7章 取消与...

    Java优化编程(第2版)

    12.2.1 并行任务与多线程 12.2.2 并行任务与死锁 12.3 线程池技术与应用性能优化 12.3.1 线程池 12.3.2 调优线程池的尺寸 12.4 通过线程池技术优化套接字网络编程 小结 第13章 java泛型与应用优化 13.1 认识泛型 ...

    Java并发编程实践 PDF 高清版

    在本书中,这些便利工具的创造者不仅解释了它们究竟如何工作、如何使用,同时,还阐释了创造它们的原因,及其背后的设计模式。 本书既能够成为读者的理论支持,又可以作为构建可靠的,可伸缩的,可维护的并发程序的...

    Java 并发编程实战

    6.3.4 在异构任务并行化中存在的局限 6.3.5 CompletionService:Executor与BlockingQueue 6.3.6 示例:使用CompletionService实现页面渲染器 6.3.7 为任务设置时限 6.3.8 示例:旅行预定门户网站 第7章 取消与...

    Java并发编程原理与实战

    Java中的阻塞队列原理与使用.mp4 实战:简单实现消息队列.mp4 并发容器ConcurrentHashMap原理与使用.mp4 线程池的原理与使用.mp4 Executor框架详解.mp4 实战:简易web服务器(一).mp4 实战:简易web服务器(二)....

    JAVA并发编程实践_中文版(1-16章全)_1/4

    6.1 在线程中执行任务 6.2 executor 框架 6.3 寻找可强化的并行性 第7章 取消和关闭 7.1 任务取消 7.2 停止基于线程的服务 7.3 处理反常的线程终止 7.4 jvm关闭 第8章 应用线程池 8.1 任务与执行策略问的隐性耦合 ...

    第7章-JUC多线程v1.1.pdf

    和创建单一线程池类似, 不同的是线程池中有多个线程, 可以并行处理任务, 若多个线程任务被提交到此线程池, 会有以下执行过程: 如果线程的数量未达到指定数量, 则创建新线程执行任务. 如果线程池的数量达到了指定...

    92道Java多线程与并发面试题含答案(很全)

    线程池(ThreadPool):线程池是一种管理线程资源的模式,它减少了创建和销毁线程的开销,提高了系统的响应速度和吞吐量。 原子操作(Atomic Operations):原子操作是不可中断的操作,即在多线程环境中,这些操作...

    Java并发编程part2

    6.1 在线程中执行任务 6.2 executor 框架 6.3 寻找可强化的并行性 第7章 取消和关闭 7.1 任务取消 7.2 停止基于线程的服务 7.3 处理反常的线程终止 7.4 jvm关闭 第8章 应用线程池 8.1 任务与执行策略问的隐性耦合 ...

    Java并发编程实践part1

    6.1 在线程中执行任务 6.2 executor 框架 6.3 寻找可强化的并行性 第7章 取消和关闭 7.1 任务取消 7.2 停止基于线程的服务 7.3 处理反常的线程终止 7.4 jvm关闭 第8章 应用线程池 8.1 任务与执行策略问的隐性耦合 ...

    龙果java并发编程完整视频

    第49节Java中的阻塞队列原理与使用00:26:18分钟 | 第50节实战:简单实现消息队列00:11:07分钟 | 第51节并发容器ConcurrentHashMap原理与使用00:38:22分钟 | 第52节线程池的原理与使用00:42:49分钟 | 第53节...

    龙果 java并发编程原理实战

    第49节Java中的阻塞队列原理与使用00:26:18分钟 | 第50节实战:简单实现消息队列00:11:07分钟 | 第51节并发容器ConcurrentHashMap原理与使用00:38:22分钟 | 第52节线程池的原理与使用00:42:49分钟 | 第53节...

    Java中的Fork/Join框架

     fork/join框架与其它ExecutorService的实现类相似,会给线程池中的线程分发任务,不同之处在于它使用了工作窃取算法,所谓工作窃取,指的是对那些处理完自身任务的线程,会从其它线程窃取任务执行。  fork/join...

    Java 并发编程原理与实战视频

    第49节Java中的阻塞队列原理与使用00:26:18分钟 | 第50节实战:简单实现消息队列00:11:07分钟 | 第51节并发容器ConcurrentHashMap原理与使用00:38:22分钟 | 第52节线程池的原理与使用00:42:49分钟 | 第53节...

Global site tag (gtag.js) - Google Analytics