Note:
1. 内容主要参考<Hadoop in Action> Chapter 5.2
2. 代码主要参考: http://qindongliang.iteye.com/blog/2052842
3. 这是基于老的API的实现,这种方法并不高效简洁
数据:(原始数据可以从movielens-1m里面去要,这里将原始数据进行了简单的修改方便演示与自测)
文件: u.user
结构: user id | age | gender | occupation | zip code
样例:
1|24|M|technician|85711
2|53|F|other|94043
3|23|M|writer|32067
4|24|M|technician|43537
5|33|F|other|15213
6|42|M|executive|98101
7|57|M|administrator|91344
8|36|M|administrator|05201
9|29|M|student|01002
10|53|M|lawyer|90703
|
文件: u.data
结构: user id | item id | rating | timestamp.
样例:
1 242 3 881250949
1 302 3 891717742
2 377 1 878887116
4 51 2 880606923
6 346 1 886397596
8 474 4 884182806
10 265 2 881171488
1 465 5 891628467
2 451 3 886324817
6 86 3 883603013
|
任务:
将两个文件的内容连接起来,输出部分内容:(inner join)
输出结构: user id | age | rating
输出示例:
1age=24,ratings=3
1age=24,ratings=3
1age=24,ratings=5
10age=53,ratings=2
2age=53,ratings=1
2age=53,ratings=3
4age=24,ratings=2
6age=42,ratings=1
6age=42,ratings=3
8age=36,ratings=4
|
思路:
在Map函数之中处理各个文件每一行的值。并根据文件名为每一行的值加上一个groupKey.
在这个例子之中,groupKey即为user id, 比如 1,2,3 ...
经过map函数处理之后,会有如下数据出现:(假设将其json化)
[{ tag: 'u.user', value: '1|24|M|technician|85711' }, { tag: 'u.data', value: '1 242 3 881250949' }, { tag: 'u.data', value: '1 377 1 878887116' }]
Hadoop会将相同的groupKey的值放在一起,所以在Reduce函数之中,需要做的事情就是将这一系列的值合并在一起。注意:上面的list里面值的顺序是不固定的,无法确定u.user一定排在首位。
在演示最终代码之前,需要注意,英文版的<Hadoop in Action>上面的代码在我的版本上面是有问题的,需要修改如下几个地方:
1. 在TaggedWritable之中新增一个默认的构造方法,
public static class TaggedWritable extends TaggedMapOutput { public TaggedWritable() { } }
不然会提示如下错误: 原因是在反射的时候一定需要一个默认的构造函数
java.lang.NoSuchMethodException: ch5.ReduceSideJoin$TaggedWritable.<init>()
2. readFields方法要增加一些处理空的代码,否则会报NullException
public void readFields(DataInput in) throws IOException { this.tag.readFields(in); //加入此部分代码,否则,可能报空指针异常 String temp=in.readUTF(); if (this.data == null|| !this.data.getClass().getName().equals(temp)) { try { this.data = (Writable) ReflectionUtils.newInstance( Class.forName(temp), null); } catch (ClassNotFoundException e) { e.printStackTrace(); } } this.data.readFields(in); }
最后实现的代码如下:
package ch5; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase; import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase; import org.apache.hadoop.contrib.utils.join.TaggedMapOutput; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /*** * * Hadoop1.2的版本,旧版本实现的Reduce侧连接 * * @author qindongliang * * 大数据交流群:376932160 * 搜索技术交流群:324714439 * * */ public class ReduceSideJoin extends Configured implements Tool { /** * * Map实现 * * */ public static class MapClass extends DataJoinMapperBase { /** * 读取输入的文件路径 * * **/ protected Text generateInputTag(String inputFile) { //返回文件路径,做标记 return new Text(inputFile); } /*** * 分组的Key * * **/ protected Text generateGroupKey(TaggedMapOutput aRecord) { Text tag = aRecord.getTag(); String line = ((Text)aRecord.getData()).toString(); if(line.trim().length() < 2) return null; String sep = "\t"; if(tag.toString().contains("u.user")) { sep = "[|]"; } String[] tokens = line.split(sep); String groupKey = tokens[0]; return new Text(groupKey); } protected TaggedMapOutput generateTaggedMapOutput(Object value) { TaggedWritable retv = new TaggedWritable((Text) value); retv.setTag(this.inputTag); return retv; } } /** * * Reduce进行笛卡尔积 * * **/ public static class Reduce extends DataJoinReducerBase { /*** * 笛卡尔积 * * */ protected TaggedMapOutput combine(Object[] tags, Object[] values) { if (tags.length < 2) return null; // // 开始连接两边的内容 String str = ""; String userInfo = ""; List<String> ratingDataList = new ArrayList<String>(); for(int i = 0 ; i < tags.length; i++) { Text curTag = (Text) tags[i]; String line = ((TaggedWritable)values[i]).getData().toString(); if(curTag.toString().contains("u.user")) { String[] tokens = line.split("[|]"); // 对于u.user 分隔符是| 并且只需要他的年龄这一列 userInfo = "age=" + tokens[1]; } else { String[] tokens = line.split("\t"); // 对于u.data 分隔符是制表符"\t" 需要的是ratings这一列 ratingDataList.add(tokens[2]); } } str = userInfo + ",ratings=" + StringUtils.join(ratingDataList, "|"); TaggedWritable retv = new TaggedWritable(new Text(str)); retv.setTag((Text) tags[0]); return retv; } } /** * * 自定义的输出类型 * * ***/ public static class TaggedWritable extends TaggedMapOutput { private Writable data; /** * 注意加上构造方法 * * */ public TaggedWritable() { } public TaggedWritable(Writable data) { this.tag = new Text(""); this.data = data; } public Writable getData() { return data; } public void write(DataOutput out) throws IOException { this.tag.write(out); //此行代码很重要 out.writeUTF(this.data.getClass().getName()); this.data.write(out); } public void readFields(DataInput in) throws IOException { this.tag.readFields(in); //加入此部分代码,否则,可能报空指针异常 String temp=in.readUTF(); if (this.data == null|| !this.data.getClass().getName().equals(temp)) { try { this.data = (Writable) ReflectionUtils.newInstance( Class.forName(temp), null); } catch (ClassNotFoundException e) { e.printStackTrace(); } } this.data.readFields(in); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); JobConf job = new JobConf(conf, ReduceSideJoin.class); job.setJarByClass(ReduceSideJoin.class); String path = "/home/hadoop/DataSet/movielens-output"; FileSystem fs=FileSystem.get(conf); Path out = new Path(path); if(fs.exists(out)){ fs.delete(out, true); System.out.println("输出路径存在,已删除!"); } Path in = new Path("/home/hadoop/DataSet/movielens"); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setJobName("ReduceSideJoin"); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setInputFormat(TextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(TaggedWritable.class); JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new ReduceSideJoin(), args); System.exit(res); } }
相关推荐
19、Join操作map side join 和 reduce side join 网址:https://blog.csdn.net/chenwewi520feng/article/details/130455477 本文介绍mapreduce的join操作。 本文前提是hadoop可以正常使用。 本文分为3个部分介绍,即...
The Joins query by using Hadoop and map reduce
展示使用MR方式实现表连接的代码示例。利用HIVE PIG之类的高层工具也可以实现,本代码旨在展示手工连接的流程
Hadoop中网络感知的Reduce任务放置算法,屈戈,张东旭,针对MapReduce中shuffle阶段的数据传输时延成为分布式计算应用性能瓶颈问题,本文提出一种网络感知的Hadoop任务放置算法,该算法通过将Re
在windows环境下开发hadoop时,需要配置HADOOP_HOME环境变量,变量值D:\hadoop-common-2.7.3-bin-master,并在Path追加%HADOOP_HOME%\bin,有可能出现如下错误: org.apache.hadoop.io.nativeio.NativeIO$Windows....
hadoop开发文档
hadoop中map/reduce自学资料合集
讲述了Windows平台的Hadoop安装... 最后,以最简单的求和为例,剖析Hadoop的Map/Reduce工作机制,对于初学Hadoop及Map/Reduce的读者有很大的帮助。相信通过最简单的求和为例,读者可步入Hadoop的Map/Reduce开发者行列。
Hadoop使用常见问题以及解决方法,简单实用
hadoop使用distcp问题解决 然后用distcp从1.0.3的集群拷数据到2.0.1的集群中。 遇到问题处理
项目负责人tomwhite透过本书详细阐述了如何使用hadoop构建可靠、可伸缩的分布式系统,程序员可从中探索如何分析海量数据集,管理员可以从中了解如何安装和运行hadoop集群。 本书结合丰富的案例来展示如何用hadoop...
摘要 Hadoop 是一个处理、存储和分析海量的...Hadoop 和 Google 内部使用的分布式计算系统原理相同,其开源特性使其成为分布式计算系统的事实上的国际标准。 Yahoo、Facebook、Amazon,以及国内的百度、阿里巴巴等众多
《Hadoop大数据开发实战》教学教案—01初识Hadoop.pdf《Hadoop大数据开发实战》教学教案—01初识Hadoop.pdf《Hadoop大数据开发实战》教学教案—01初识Hadoop.pdf《Hadoop大数据开发实战》教学教案—01初识Hadoop.pdf...
充分利用集群的威力进行高速运算和存储。Hadoop实现了一个分布式文件系统( Distributed File System),其中一个组件是HDFS(Hadoop Distributed File System)。HDFS有高容错性的特点,并且设计用来部署在低廉的...
hadoop的map reduce 学习手册,很实用
Hadoop使用常见问题以及解决方法.doc Hadoop使用常见问题以及解决方法.doc
hadoop 使用 maven3.3 仓库 5hadoop 使用 maven3.3 仓库 7
全面教你在Linux上使用hadoop 启动与关闭 启动HADOOP 1. 进入HADOOP_HOME目录。 2. 执行sh bin/start-all.sh 单个起 :/opt/hadoop-1.0.3/bin/hadoop-daemon.sh start datanode /opt/hadoop-1.0.3/bin/hadoop-...
hadoop 使用 maven3.3 仓库3
官网下载速度非常缓慢,因此将hadoop-3.3.4 版本放在这里,欢迎大家来下载使用! Hadoop 架构是一个开源的、基于 Java 的编程框架,设计用于跨电脑集群来 处理大数据。Hadoop 是一个能够让用户轻松搭建和使用的...