`

利用多核CPU计算大的列表中的整数和 CyclicBarrier CountDownLatch ExecutorService

阅读更多

原文:http://flysnow.iteye.com/blog/711162

原文利用了CyclicBarrier、ExecutorService和Callable技术,这些我还不是太懂。将原实现改了一下

 

public class CountListIntegerSum1 {
	private Long sum=0L;//存放整数的和
	private List<Integer> list;//整数集合List
	private int threadCounts;//使用的线程数
	public CountListIntegerSum1(List<Integer> list,int threadCounts) {
		this.list=list;
		this.threadCounts=threadCounts;
	}
	/**
	 * 获取List中所有整数的和
	 */
	public void getIntegerSum(){
		int len=list.size()/threadCounts;//平均分割List
		if(len==0) {
			threadCounts=list.size();//采用一个线程处理List中的一个元素
			len=list.size()/threadCounts;//重新平均分割List
		}
		int tmp = threadCounts;
		for(int i=0;i<tmp;i++) {
			if(i==tmp-1) {//最后一个线程承担剩下的所有元素的计算
				new Thread(new SubIntegerSumTask(list.subList(i*len,list.size()))).start();
			} else {
				new Thread(new SubIntegerSumTask(list.subList(i*len, len*(i+1)>list.size()?list.size():len*(i+1)))).start();
			}
		}
	}
	/**
	 * 分割计算List整数和的线程任务
	 *
	 */
	public class SubIntegerSumTask implements Runnable {
		private List<Integer> subList;
		public SubIntegerSumTask(List<Integer> subList) {
			this.subList=subList;
		}
		public void run() {
			long subSum=0L;
			for (Integer i : subList) {
				subSum += i;
			}
			synchronized(sum) {
				sum+=subSum;
			}
			System.out.println("分配给线程:"+Thread.currentThread().getName()+" 那一部分List的整数和为:"+subSum);
			if(--threadCounts == 0) {
				System.out.println("最后的结果为: ------- " + sum);
			}
		}
	}
	
    public static void main(String[] args) {
        List<Integer> list = new ArrayList<Integer>();
        int threadCounts = 10;//采用的线程数
        //生成的List数据   
        for (int i = 1; i <= 1000000; i++) {
            list.add(i);   
        }
        CountListIntegerSum1 countListIntegerSum=new CountListIntegerSum1(list,threadCounts);   
        countListIntegerSum.getIntegerSum();   
    }
}

 

结果 写道
分配给线程:Thread-4 那一部分List的整数和为:45000050000
分配给线程:Thread-8 那一部分List的整数和为:85000050000
分配给线程:Thread-2 那一部分List的整数和为:25000050000
分配给线程:Thread-1 那一部分List的整数和为:15000050000
分配给线程:Thread-6 那一部分List的整数和为:65000050000
分配给线程:Thread-3 那一部分List的整数和为:35000050000
分配给线程:Thread-5 那一部分List的整数和为:55000050000
分配给线程:Thread-0 那一部分List的整数和为:5000050000
分配给线程:Thread-9 那一部分List的整数和为:95000050000
分配给线程:Thread-7 那一部分List的整数和为:75000050000
最后的结果为: ------- 500000500000

 

当然结果还是一样的,这里当所有线程运行完打印出结果,即threadCounts为0时打印。这里还有一个小问题本以为是不是sublist = list.subList(2, 4);就会生成新的list,原来不会。

小提示:list.subList并没有产生新的list

sublist = list.subList(2, 4);这里的sublist 只是RandomAccessSubList,所以对list操作会影响sublist,对sublist操作也会影响list。

public class ListSubList {
	public static void main(String[] args) {
        List<Integer> list = new ArrayList<Integer>();
        List<Integer> sublist = new ArrayList<Integer>();   
        //生成的List数据   
        for(int i = 1; i <= 10; i++) {
            list.add(i);   
        }
        sublist = list.subList(2, 4);
        System.out.println("sss");
//        sublist.clear();
        list.clear();
        
        System.out.println("sss");
        Iterator i = sublist.iterator();
        while(i.hasNext()) {
        	System.out.println(i.next());
        }
	}
}

 

在eclipse中设置一个断点,很容易可以看出来。

原来作者实现的理解 CyclicBarrier ExecutorService

public class CountListIntegerSum {
	private Long sum=0L;//存放整数的和
	private CyclicBarrier barrier;//障栅集合点(同步器)
	private List<Integer> list;//整数集合List
	private int threadCounts;//使用的线程数
	public CountListIntegerSum(List<Integer> list,int threadCounts) {
		this.list=list;
		this.threadCounts=threadCounts;
	}
	/**
	 * 获取List中所有整数的和
	 * @return
	 */
	public long getIntegerSum(){
		ExecutorService exec=Executors.newFixedThreadPool(threadCounts);
		int len=list.size()/threadCounts;//平均分割List
		//List中的数量没有线程数多(很少存在)
		if(len==0){
			threadCounts=list.size();//采用一个线程处理List中的一个元素
			len=list.size()/threadCounts;//重新平均分割List
		}
		barrier=new CyclicBarrier(threadCounts+1);
		for(int i=0;i<threadCounts;i++){
			//创建线程任务
			if(i==threadCounts-1){//最后一个线程承担剩下的所有元素的计算
				exec.execute(new SubIntegerSumTask(list.subList(i*len,list.size())));
			}else{
				exec.execute(new SubIntegerSumTask(list.subList(i*len, len*(i+1)>list.size()?list.size():len*(i+1))));
			}
		}
		try {
			barrier.await();//关键,使该线程在障栅处等待,直到所有的线程都到达障栅处
		} catch (InterruptedException e) {
			System.out.println(Thread.currentThread().getName()+":Interrupted");
		} catch (BrokenBarrierException e) {
			System.out.println(Thread.currentThread().getName()+":BrokenBarrier");
		}
		exec.shutdown();
		return sum;
	}
	/**
	 * 分割计算List整数和的线程任务
	 * @author lishuai
	 *
	 */
	public class SubIntegerSumTask implements Runnable {
		private List<Integer> subList;
		public SubIntegerSumTask(List<Integer> subList) {
			this.subList=subList;
		}
		
		public void run() {
			long subSum=0L;
			for (Integer i : subList) {
				subSum += i;
			}
			synchronized(sum){//在CountListIntegerSum对象上同步
				sum+=subSum;
			}
			System.out.println("分配给线程:"+Thread.currentThread().getName()+"那一部分List的整数和为:\tSubSum:"+subSum);
			try {
				barrier.await();//关键,使该线程在障栅处等待,直到所有的线程都到达障栅处
			} catch (InterruptedException e) {
				System.out.println(Thread.currentThread().getName()+":Interrupted");
			} catch (BrokenBarrierException e) {
				System.out.println(Thread.currentThread().getName()+":BrokenBarrier");
			}
		}
	}
	
    public static void main(String[] args) {   
        List<Integer> list = new ArrayList<Integer>();   
        int threadCounts = 10;//采用的线程数   
        //生成的List数据   
        for (int i = 1; i <= 1000000; i++) {   
            list.add(i);   
        }
        CountListIntegerSum countListIntegerSum=new CountListIntegerSum(list,threadCounts);   
        long sum=countListIntegerSum.getIntegerSum();   
        System.out.println("List中所有整数的和为:"+sum);   
    }
	
}

 

程序中采用了10个线程来分别计算一个大的list中的所有数的和,利用了CyclicBarrier,这个程序正好展示了如何利用CyclicBarrier,这里在新建CyclicBarrier 时传替了一个11参数进去,就是当有11个线程到达这个樟栅时,才放开所有的线程,让其运行barrier.await();之后的语句。这里正好也比较一下CyclicBarrier 与CountDownLatch(http://xmind.iteye.com/blog/717140)的区别,CyclicBarrier主要可以用于许多线程等待到同一时间点同时发行,CountDownLatch可用于一个线程等待多个线程。显然,这里更适合用CountDownLatch。同时这里还用到了ExecutorService,ExecutorService对于启动新建和启动线程提供了方便。

原来作者实现的理解 ExecutorService.invokeAll()  Callable  Future

public class CountSumWithCallable {

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		int threadCounts =9;//使用的线程数
		long sum=0;
		ExecutorService exec=Executors.newFixedThreadPool(threadCounts);
		List<Callable<Long>> callList=new ArrayList<Callable<Long>>();
		//生成很大的List
		List<Integer> list = new ArrayList<Integer>();
		for (int i = 1; i <= 1000000; i++) {
			list.add(i);
		}
		int len=list.size()/threadCounts;//平均分割List
		//List中的数量没有线程数多(很少存在)
		if(len==0){
			threadCounts=list.size();//采用一个线程处理List中的一个元素
			len=list.size()/threadCounts;//重新平均分割List
		}
		for(int i=0;i<threadCounts;i++){
			final List<Integer> subList;
			if(i==threadCounts-1){
				subList=list.subList(i*len,list.size());
			} else {
				subList=list.subList(i*len, len*(i+1)>list.size()?list.size():len*(i+1));
			}
			//采用匿名内部类实现
			callList.add(new Callable<Long>() {
				public Long call() throws Exception {
					long subSum=0L;
					for(Integer i:subList){
						subSum+=i;
					}
					System.out.println("分配给线程:"+Thread.currentThread().getName()+"那一部分List的整数和为:\tSubSum:"+subSum);
					return subSum;
				}
			});
		}
		List<Future<Long>> futureList=exec.invokeAll(callList);
		for(Future<Long> future:futureList) {
			sum+=future.get();
		}
		exec.shutdown();
		System.out.println(sum);
	}
}

 

最简单直接的一种实现方法,exec.invokeAll负责运行所有的callList,把结果返回到List<Future<Long>> ,程序一直阻塞到所有运行结束。很是适合这种把任务分割出去,最后又要汇总的情况。ExecutorService 不光只是提供了新建启动线程的方法,还提供了返回线程运行结果的方便啊。

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics