Reduce-side joining / repartitioned sort-merge join
Note:DataJoinReducerBase, on the other hand, is the workhorse of the datajoin package, and it simplifies our programming by performing a full outer join for us. Our reducer subclass only has to implement the combine() method to filter out unwanted combinations to get the desired join operation (inner join, left outer join, etc.). It’s also in the combine() method that we format the combination into the appropriate
output format.
When run the sample code in <<hadoop in action>> chapter 5. There will some errors coming up. see
http://stackoverflow.com/questions/10201500/hadoop-reduce-side-join-using-datajoin
The correct code after modified likes
package com.test.datamine.topic; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase; import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase; import org.apache.hadoop.contrib.utils.join.TaggedMapOutput; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class DataJoin extends Configured implements Tool { public static class MapClass extends DataJoinMapperBase { protected Text generateInputTag(String inputFile) { String datasource = inputFile.split("-")[0]; return new Text(datasource); } protected Text generateGroupKey(TaggedMapOutput aRecord) { String line = ((Text) aRecord.getData()).toString(); String[] tokens = line.split(","); String groupKey = tokens[0]; return new Text(groupKey); } protected TaggedMapOutput generateTaggedMapOutput(Object value) { TaggedWritable retv = new TaggedWritable((Text) value); retv.setTag(this.inputTag); return retv; } } public static class Reduce extends DataJoinReducerBase { protected TaggedMapOutput combine(Object[] tags, Object[] values) { if (tags.length < 2) return null; String joinedStr = ""; for (int i = 0; i < values.length; i++) { if (i > 0) joinedStr += ","; TaggedWritable tw = (TaggedWritable) values[i]; String line = ((Text) tw.getData()).toString(); String[] tokens = line.split(",", 2); joinedStr += tokens[1]; } TaggedWritable retv = new TaggedWritable(new Text(joinedStr)); retv.setTag((Text) tags[0]); return retv; } } public static class TaggedWritable extends TaggedMapOutput { private Writable data; public TaggedWritable() { this.tag = new Text(); } public TaggedWritable(Writable data) { this.tag = new Text(""); this.data = data; } public Writable getData() { return data; } public void setData(Writable data) { this.data = data; } public void write(DataOutput out) throws IOException { // this.tag.write(out); // this.data.write(out); this.tag.write(out); out.writeUTF(this.data.getClass().getName()); this.data.write(out); } public void readFields(DataInput in) throws IOException { // this.tag.readFields(in); // this.data.readFields(in); this.tag.readFields(in); String dataClz = in.readUTF(); if (this.data == null || !this.data.getClass().getName().equals(dataClz)) { try { this.data = (Writable) ReflectionUtils.newInstance( Class.forName(dataClz), null); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } } this.data.readFields(in); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); JobConf job = new JobConf(conf, DataJoin.class); Path in = new Path(args[0]); Path out = new Path(args[1]); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setJobName("DataJoin"); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setInputFormat(TextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(TaggedWritable.class); job.set("mapred.textoutputformat.separator", ","); FileSystem fs = in.getFileSystem(conf); fs.delete(out); JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new DataJoin(), args); System.exit(res); } }
The test data is consist of two files
customers
1,Stephanie Leung,555-555-5555 2,Edward Kim,123-456-7890 3,Jose Madriz,281-330-8004 4,David Stork,408-555-0000
orders
3,A,12.95,02-Jun-2008 1,B,88.25,20-May-2008 2,C,32.00,30-Nov-2007 3,D,25.02,22-Jan-2009
the joined result is
1,Stephanie Leung,555-555-5555,B,88.25,20-May-2008 2,Edward Kim,123-456-7890,C,32.00,30-Nov-2007 3,Jose Madriz,281-330-8004,D,25.02,22-Jan-2009 3,Jose Madriz,281-330-8004,A,12.95,02-Jun-2008
相关推荐
hadoop-0.21.0-datajoin.jar
java运行依赖jar包
java运行依赖jar包
java运行依赖jar包
文件汗有三个java类,两个测试文件txt ReduceClass.java MapClass.java TaggedRecordWritable.java customers.txt ...经过亲自测试,可以将两个文件中的信息以groupby的key关联起来,查出类似数据库的join.
Write Pig Latin scripts to sort, group, join, project, and filter your data Use Grunt to work with the Hadoop Distributed File System (HDFS) Build complex data processing pipelines with Pig’s macros ...
Joins in Hadoop has always been a problem for its users: the Map/Reduce framework seems to be specifically designed for group-by aggregation tasks rather than across-table op- erations; on the other ...
org.apache.hadoop.contrib.utils.join org.apache.hadoop.examples org.apache.hadoop.examples.dancing org.apache.hadoop.examples.pi org.apache.hadoop.examples.pi.math org.apache.hadoop.examples....
[INFO] Apache Hadoop Data Join ........................... SUCCESS [5.463s] [INFO] Apache Hadoop Extras .............................. SUCCESS [3.394s] [INFO] Apache Hadoop Pipes ........................
[INFO] Apache Hadoop Data Join ........................... SUCCESS [5.463s] [INFO] Apache Hadoop Extras .............................. SUCCESS [3.394s] [INFO] Apache Hadoop Pipes ........................
[INFO] Apache Hadoop Data Join ........................... SUCCESS [5.463s] [INFO] Apache Hadoop Extras .............................. SUCCESS [3.394s] [INFO] Apache Hadoop Pipes ........................
大数据--Hadoop--Yelp数据集文件夹包含5个问题和pdf,其中包含有关五个问题和Yelp数据集... 问题5:Map Side Join:将所有业务实体加载到分布式缓存中。 使用地图侧连接列出了位于特定区域中的企业的用户ID和评论文本。
大数据--Hadoop--Yelp数据集 文件夹包含5个问题和pdf,其中包含有关五个问题和Yelp数据... 问题5:Map Side Join:将所有业务实体加载到分布式缓存中。 使用地图侧连接列出了位于特定区域中的企业的用户ID和评论文本。
reorganize data to work with other systems, or to make MapReduce analysis easier Join patterns: analyze different datasets together to discover interesting relationships Metapatterns: piece together ...
BigData-In-Practice 大数据项目仓库、涉及 Hadoop、Spark、Kafka、Hbase..... 等,更新中... 综合实践项目 项目名 说明 使用 Spark SQL imooc 访问日志,数据清洗,统计,可视化 入门学习示例 项目名 所属组件 介绍...
of data sources, including Hadoop data warehouses, RDBMSs, NoSQL systems, and stream processing systems. In this paper, we outline a selection of use cases that Presto supports at Facebook. We then ...
2.6 Hadoop Cluster Architecture 2.7 Summary 3 MapReduce Algorithm Design 3.1 Local Aggregation 3.1.1 Combiners and In-Mapper Combining 3.1.2 Algorithmic Correctness with Local Aggregation 3.2 ...
1.2 Hive 和 Hadoop 关系 7 1.3 Hive 和普通关系数据库的异同 8 1.4 HIVE元数据库 9 1.4.1 DERBY 9 1.4.2 Mysql 10 1.5 HIVE的数据存储 11 1.6 其它HIVE操作 11 2. HIVE 基本操作 12 2.1 create table 12 2.1.1 总述...
A Hadoop Based Distributed Loading Approach to Parallel Data Warehouses (Page 1091) Yu Xu (Teradata) Pekka Kostamaa (Teradata) Yan Qi (Teradata) Jian Wen (University of California, Riverside) Kevin ...