`
fyting
  • 浏览: 215543 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

矩阵的并行运算

    博客分类:
  • Java
阅读更多
这篇文章只是简单使用java concurrent库写了个二维数组运算,并无什么实际意义和技术可言,请仅把我当做标题党。 更有价值的东西来自于以下留言:

Cappuccino 写道
文章的名字叫做《矩阵的并行运算》。既然叫这个名字,关键字想必应该和矩阵,或者是并行计算相关。但是通篇里面除了一个二维数组的简单加减运算,和JDK里面现成的线程池以外,没看到什么其他太多的东西。

个人认为整篇东西里面很多东西都很值得深入的,譬如线程中的互斥是怎样实现的,lock、开关量,或者是Atom变量。或者是矩阵,可以尝试有没有其他更好,更有效率的办法。如果是深入JDK的一些办法,也可以尝试看一下源码,TreeMap和HashMap到底有什么不同,为什么选这个,另外Collections.synchronizedMap是怎样实现的,让一个集合可以变为同步的。

感觉上Java最大的误导是导致使用Java的人认为他们可以用JDK实现非常非常多的东西,但实际上我们都只是在用着Sun实现的很多东西罢了,而且也并没有去弄清楚一下一个东西它内部是怎样做的,怎么样才是最好最有效的。建议你可以尝试深入很多东西,多弄点底层的东西也算是好事,毕竟软件也是相对越底层相对越吃香的o(∩_∩)o


----------------------------------------------------------------------
原文:

最近正被项目的并发问题搞得焦头烂额,在封装的语义和项目需求上似乎都出现了问题,迟迟实现不了。昨天无意中看到这个招聘题,挺有意思的。出题人的本意可能是想考考时下流行的并行运算,受限于CPU,在PC上这种纯运算的效果并不明显,但并发倒是以后的一种趋势…
引用
- 使用Java多线程实现下述算法:
输入:整数组成的m*n的矩阵A。(m=100000, n=10000)
输出:一个数列B,数列B中的每一项为矩阵A中对应列数字之和

CPU的核心是越来越多了,以后并行运算是一种趋势,越来越多的程序需要考虑使用并行运算来加速,硬件厂商给了我们翻倍的CPU,而不是翻倍的速度,真麻烦 ……

首先定义一个类,就叫做MatrixSolver吧
package fyting.iteye.com;
public class MatrixSolver
{
	private final int[][] matrix;
	private final int maxThreads;
	private final Map<Integer, Integer> results = Collections.synchronizedMap(new TreeMap<Integer, Integer>());
	public void start(){
		//...
	}
}

写一个main方法来测试:
public static void main(String[] args)
{
	final int row = 5000, column = 2000;
	final int[][] data = new int[row][column];
	for(int i = 0 ; i < row; i++) {
		for(int j = 0; j < column; j++) {
			data[i][j] = j;
		}
	}
	final int maxThreads = 3;
	MatrixSolver solver = new MatrixSolver(maxThreads, data);
	solver.start();
}

然后是实现的代码,利用JDK5的线程池,可以很轻松实现:
(事实上,JDK里CyclicBarrier的javadoc就是一个解矩阵的例子)
public void start()
{
	ExecutorService executor = Executors.newFixedThreadPool(maxThreads);
	final long st = System.nanoTime();
	if(this.matrix.length == 0) {
		System.err.println("no matrix data!");
		return;
	}
	final int columnCount = this.matrix[0].length;
	final CountDownLatch latch = new CountDownLatch(columnCount);
	for(int i = 0 ; i < columnCount; i++) {
		final int column = i;
		executor.execute(newTask(column,latch));
	}
	try {
		latch.await();
		executor.shutdown();
		long et = System.nanoTime();
		long time = (et - st) / (1000*1000);
		System.out.println("success, time: " + time + "ms");
		executor.awaitTermination(1, TimeUnit.SECONDS);
	}
	catch(InterruptedException e) {
		e.printStackTrace();
	}
	System.out.println("maxtrix: " + (matrix.length + "*" + matrix[0].length));
	System.out.println("column0==>" + ArrayUtils.toString(matrix[0]));
	System.out.println("results==>" + results);
}

protected Runnable newTask(int column, CountDownLatch latch)
{
	return new ColumnSolver(column,latch);
}

private class ColumnSolver extends Thread
{
	private final int columnIndex;
	private final CountDownLatch latch;

	ColumnSolver(int columnIndex, CountDownLatch latch)
	{
		this.columnIndex = columnIndex;
		this.latch = latch;
	}

	@Override
	public void run()
	{
		int count = 0;
		for(int j = 0, rowCount = matrix.length; j < rowCount; j++) {
			count += matrix[j][columnIndex];
		}
		results.put(columnIndex, count);
		latch.countDown();
	}
}

当然,如果是JDK1.4,可以用单独的Doug Lea大师写的java concurrent包,否则需要自己实现线程池了,很麻烦。如下面这样的代码,很容易出错,实在是没有必要在项目中采用。

class ThreadPool
{
	private final LinkedList<Runnable> tasks = new LinkedList<Runnable>();
	private final Thread[] workers;

	public ThreadPool(int maxThreads)
	{
		if(maxThreads <= 0) throw new IllegalArgumentException("maxThreads must > 0");
		this.workers = new Thread[maxThreads];
		for(int i = 0; i < maxThreads; i++) {
			workers[i] = new WorkThread();
			workers[i].start();
		}
	}

	private class WorkThread extends Thread
	{
		public void run() {
            Runnable r;
            while(true) {
                synchronized(tasks) {
                    while (tasks.isEmpty()) {
                        try
                        {
                        	tasks.wait();
                        }
                        catch (InterruptedException ex)
                        {
//                        	ex.printStackTrace();
                        	return;
                        }
                    }
                    r = (Runnable) tasks.removeFirst();
                }
                try {
                    r.run();
                }
                catch (RuntimeException e) {
                	e.printStackTrace();
                }
                synchronized(ThreadPool.this) {
                	ThreadPool.this.notifyAll();
//                	System.out.println(tasks.size() + " remaining");
                }
            }
        }
	}

	synchronized void waitAndShutdown() throws InterruptedException
	{
		while(this.tasks.size() != 0) {
			wait();
		}
		for(Thread t: workers) {
			synchronized(t) {
				t.interrupt();
			}
		}
	}

	public void submit(Runnable runnable)
	{
		 synchronized(this.tasks) {
			 this.tasks.addLast(runnable);
			 this.tasks.notify();
	    }
	}
}

附件中有3个不同的类,分别是利用JDK1.5和JDK1.4,以及单线程的实现。
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics