`

Hadoop: Data Join

阅读更多

Reduce-side joining / repartitioned sort-merge join

 



 

Note:DataJoinReducerBase, on the other hand, is the workhorse of the datajoin package, and it simplifies our programming by performing a full outer join for us. Our reducer subclass only has to implement the combine() method to filter out unwanted combinations to get the desired join operation (inner join, left outer join, etc.). It’s also in the combine() method that we format the combination into the appropriate
output format.


When run the sample code in <<hadoop in action>> chapter 5. There will some errors coming up. see

http://stackoverflow.com/questions/10201500/hadoop-reduce-side-join-using-datajoin

The correct code after modified  likes

package com.test.datamine.topic;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DataJoin extends Configured implements Tool {

	public static class MapClass extends DataJoinMapperBase {

		protected Text generateInputTag(String inputFile) {
			String datasource = inputFile.split("-")[0];
			return new Text(datasource);
		}

		protected Text generateGroupKey(TaggedMapOutput aRecord) {
			String line = ((Text) aRecord.getData()).toString();
			String[] tokens = line.split(",");
			String groupKey = tokens[0];
			return new Text(groupKey);
		}

		protected TaggedMapOutput generateTaggedMapOutput(Object value) {
			TaggedWritable retv = new TaggedWritable((Text) value);
			retv.setTag(this.inputTag);
			return retv;
		}
	}

	public static class Reduce extends DataJoinReducerBase {

		protected TaggedMapOutput combine(Object[] tags, Object[] values) {
			if (tags.length < 2)
				return null;
			String joinedStr = "";
			for (int i = 0; i < values.length; i++) {
				if (i > 0)
					joinedStr += ",";
				TaggedWritable tw = (TaggedWritable) values[i];
				String line = ((Text) tw.getData()).toString();
				String[] tokens = line.split(",", 2);
				joinedStr += tokens[1];
			}
			TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
			retv.setTag((Text) tags[0]);
			return retv;
		}
	}

	public static class TaggedWritable extends TaggedMapOutput {

		private Writable data;

		public TaggedWritable() {
			this.tag = new Text();
		}

		public TaggedWritable(Writable data) {
			this.tag = new Text("");
			this.data = data;
		}

		public Writable getData() {
			return data;
		}

		public void setData(Writable data) {
			this.data = data;
		}

		public void write(DataOutput out) throws IOException {
			// this.tag.write(out);
			// this.data.write(out);
			this.tag.write(out);
			out.writeUTF(this.data.getClass().getName());
			this.data.write(out);
		}

		public void readFields(DataInput in) throws IOException {
			// this.tag.readFields(in);
			// this.data.readFields(in);
			this.tag.readFields(in);
			String dataClz = in.readUTF();
			if (this.data == null
					|| !this.data.getClass().getName().equals(dataClz)) {
				try {
					this.data = (Writable) ReflectionUtils.newInstance(
							Class.forName(dataClz), null);
				} catch (ClassNotFoundException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
			this.data.readFields(in);
		}
	}

	public int run(String[] args) throws Exception {
		Configuration conf = getConf();

		JobConf job = new JobConf(conf, DataJoin.class);

		Path in = new Path(args[0]);
		Path out = new Path(args[1]);
		FileInputFormat.setInputPaths(job, in);
		FileOutputFormat.setOutputPath(job, out);

		job.setJobName("DataJoin");
		job.setMapperClass(MapClass.class);
		job.setReducerClass(Reduce.class);

		job.setInputFormat(TextInputFormat.class);
		job.setOutputFormat(TextOutputFormat.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(TaggedWritable.class);
		job.set("mapred.textoutputformat.separator", ",");

		FileSystem fs = in.getFileSystem(conf);
		fs.delete(out);

		JobClient.runJob(job);
		return 0;
	}

	public static void main(String[] args) throws Exception {
		int res = ToolRunner.run(new Configuration(), new DataJoin(), args);

		System.exit(res);
	}
}

 

 The test data is consist of two files

customers

1,Stephanie Leung,555-555-5555
2,Edward Kim,123-456-7890
3,Jose Madriz,281-330-8004
4,David Stork,408-555-0000

 orders

3,A,12.95,02-Jun-2008
1,B,88.25,20-May-2008
2,C,32.00,30-Nov-2007
3,D,25.02,22-Jan-2009

 

the joined result is

1,Stephanie Leung,555-555-5555,B,88.25,20-May-2008
2,Edward Kim,123-456-7890,C,32.00,30-Nov-2007
3,Jose Madriz,281-330-8004,D,25.02,22-Jan-2009
3,Jose Madriz,281-330-8004,A,12.95,02-Jun-2008

 

 

 

  • 大小: 92.4 KB
  • 大小: 47.9 KB
分享到:
评论

相关推荐

    hadoop-0.21.0-datajoin.jar

    hadoop-0.21.0-datajoin.jar

    hadoop-datajoin-2.6.0.jar

    java运行依赖jar包

    hadoop-datajoin-2.6.0-sources.jar

    java运行依赖jar包

    hadoop-datajoin-2.6.0-test-sources.jar

    java运行依赖jar包

    Hadoop datajoin示例(客户和订单信息)

    文件汗有三个java类,两个测试文件txt ReduceClass.java MapClass.java TaggedRecordWritable.java customers.txt ...经过亲自测试,可以将两个文件中的信息以groupby的key关联起来,查出类似数据库的join.

    Programming Pig: Dataflow Scripting with Hadoop [2016]

    Write Pig Latin scripts to sort, group, join, project, and filter your data Use Grunt to work with the Hadoop Distributed File System (HDFS) Build complex data processing pipelines with Pig’s macros ...

    hadoop join implement

    Joins in Hadoop has always been a problem for its users: the Map/Reduce framework seems to be specifically designed for group-by aggregation tasks rather than across-table op- erations; on the other ...

    hadoop0.23.9离线api

    org.apache.hadoop.contrib.utils.join org.apache.hadoop.examples org.apache.hadoop.examples.dancing org.apache.hadoop.examples.pi org.apache.hadoop.examples.pi.math org.apache.hadoop.examples....

    hadoop-2.2.0-x64.tar.gz part3

    [INFO] Apache Hadoop Data Join ........................... SUCCESS [5.463s] [INFO] Apache Hadoop Extras .............................. SUCCESS [3.394s] [INFO] Apache Hadoop Pipes ........................

    hadoop-2.2.0-x64.tar.gz part2

    [INFO] Apache Hadoop Data Join ........................... SUCCESS [5.463s] [INFO] Apache Hadoop Extras .............................. SUCCESS [3.394s] [INFO] Apache Hadoop Pipes ........................

    hadoop-2.2.0-x64.tar.gz part1

    [INFO] Apache Hadoop Data Join ........................... SUCCESS [5.463s] [INFO] Apache Hadoop Extras .............................. SUCCESS [3.394s] [INFO] Apache Hadoop Pipes ........................

    Big-Data_hadoop_Yelp_Data_Analysis

    大数据--Hadoop--Yelp数据集文件夹包含5个问题和pdf,其中包含有关五个问题和Yelp数据集... 问题5:Map Side Join:将所有业务实体加载到分布式缓存中。 使用地图侧连接列出了位于特定区域中的企业的用户ID和评论文本。

    Big-Data---Hadoop---Yelp-Dataset:文件夹包含5个问题和pdf,其中包含有关五个问题和Yelp数据集的描述

    大数据--Hadoop--Yelp数据集 文件夹包含5个问题和pdf,其中包含有关五个问题和Yelp数据... 问题5:Map Side Join:将所有业务实体加载到分布式缓存中。 使用地图侧连接列出了位于特定区域中的企业的用户ID和评论文本。

    MapReduce Design Pattern

    reorganize data to work with other systems, or to make MapReduce analysis easier Join patterns: analyze different datasets together to discover interesting relationships Metapatterns: piece together ...

    java实现数据同步源码-BigData-In-Practice:大数据实践项目Hadoop、Spark、Kafka、Hbase、Flink

    BigData-In-Practice 大数据项目仓库、涉及 Hadoop、Spark、Kafka、Hbase..... 等,更新中... 综合实践项目 项目名 说明 使用 Spark SQL imooc 访问日志,数据清洗,统计,可视化 入门学习示例 项目名 所属组件 介绍...

    Presto_SQL_on_Everything.pdf

    of data sources, including Hadoop data warehouses, RDBMSs, NoSQL systems, and stream processing systems. In this paper, we outline a selection of use cases that Presto supports at Facebook. We then ...

    Data-Intensive Text Processing with MapReduce Jimmy Lin and Chris Dyer

    2.6 Hadoop Cluster Architecture 2.7 Summary 3 MapReduce Algorithm Design 3.1 Local Aggregation 3.1.1 Combiners and In-Mapper Combining 3.1.2 Algorithmic Correctness with Local Aggregation 3.2 ...

    Hive用户指南

    1.2 Hive 和 Hadoop 关系 7 1.3 Hive 和普通关系数据库的异同 8 1.4 HIVE元数据库 9 1.4.1 DERBY 9 1.4.2 Mysql 10 1.5 HIVE的数据存储 11 1.6 其它HIVE操作 11 2. HIVE 基本操作 12 2.1 create table 12 2.1.1 总述...

    sigmod2011全部论文(2)

    A Hadoop Based Distributed Loading Approach to Parallel Data Warehouses (Page 1091) Yu Xu (Teradata) Pekka Kostamaa (Teradata) Yan Qi (Teradata) Jian Wen (University of California, Riverside) Kevin ...

Global site tag (gtag.js) - Google Analytics