`

MapReduce,泛型,匿名内部类,Runnable和Callable

阅读更多

一时兴起,写了下面的小程序,程序用来实现MapReduce计算。并在客户端模拟一个可以用MapReduce的场景:有n个数字,1,2,3,... k ...,n-1,n。计算n个数字的平方和。程序提供了MapReduce方法和直接计算的方法分别计算结果,并比较每种计算方法的代价。程序会打印出所有计算的成本和最终计算结果。

 

事实上客户端完全可以定义自己的Dispense,Map和Reduce算法,如计算n个数字的立方和或统计日志。

 

程序中用到了以下的知识点:MapReduce,泛型,匿名内部类,多线程Runnable和Callable。
小子不才,计划以此程序为起点,做一个开源的MapReduce框架。希望朋友们和我一起讨论:程序中有哪些需要改进的地方?哪里有更好的实现方式。下面提供了程序的静态类图,时序图,api文档和源代码,和朋友们一起共勉。



                     mapreduce_paticipants

 



                                mapreduce_sequence

 

程序的原理如下:
见百度百科上对MapReduce的介绍MapReduce
Dispense(分发)操作负责把大规模的原始列表(原始键值对)切割成小的碎片,并分发给每个计算节点。并行计算节点可以是线程、进程或服务器。Dispense所处的位置可能是主线程或中央节点服务器。
Map(映射)操作是把一个原始列表(原始键值对组)中的每一个元素进行指定的映射操作并返回一个新的列表(新键值对组)。这里的每个元素都是被独立操作的,所以适合高度的并行。
本程序的客户端:
原始列表:1,2,... k ...,n-1,n。 映射操作: k -> k*k。 新的列表:1*1,2*2,k*k,(n-1)*(n-1),n*n。
原始键值对组:1->1,2->2,... k->k...,n-1->n-1,n->n。 映射操作: k -> k*k。 新的键值对:1->1*1,3->3*3,k->k*k,(n-1)->(n-1)*(n-1),n->n*n。
Reduce(化简)操作指的是对新的列表(新的键值组)的元素进行适当的合并。 本例中适当的合并就是对新列表中的每个元素(新键值对组中的每个值)进行求和的合 并。
从MapReduce定义中可以看出:Map(映射)操作适合那些可高度并行化的操作,Reduce(化简)操作是对并行化操作后的结果进行合并的操作。通常化简操作的并行能力较差,但不是不可以,中央节点会进行最后的化简。

 

代码实现:

1、map算法

定义map接口

public interface IMaper<E> {
public E map(E x);
}

 

客户端实现map接口

new IMaper<Integer>() {
	public Integer map(Integer x) {
								// 模拟Map计算的成本
								try {
										Thread.sleep(100);
								} catch (Exception e) {
										e.printStackTrace();
								}
								return x * x * x;
	}
}

 2、reduce算法

定义reduce接口

public interface IReducer<E> {
	public E reduce(E x, E y);
}

 

实现reduce接口

 

new IReducer<Integer>() {
	public Integer reduce(Integer x, Integer y) {
								// 模拟Reduce计算的成本
								try {
										Thread.sleep(50);
								} catch (Exception e) {
										e.printStackTrace();
								}
								if (x == null)
										x = 0;
								if (y == null)
										y = 0;
								return x + y;
	}
}

 3、模拟计算节点

public class CallNode<E> implements Callable<E> {
	......public E call() throws Exception {
		E sum = null;
		if (maper == null)
			throw new Exception("请给本计算节点" + this.toString() + "分配一个映射器对象");
		if (reducer == null)
			throw new Exception("请给本计算节点" + this.toString() + "分配一个化简器对象");
		for (E e : subLists) {
			sum = reducer.reduce(sum, maper.map(e));
		}
		return sum;
	} ......
}

 详细的代码实现请下载原代码。

  • 大小: 5.3 KB
  • 大小: 13.7 KB
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics