`

Mapreduce《案例之数据去重复》

阅读更多

Mapreduce《案例之数据去重复》

源数据:

a.txt内容:

2012-3-1 b

2012-3-2 a

2012-3-3 b

2012-3-4 d

2012-3-5 a

2012-3-6 c

2012-3-7 d

2012-3-3 c

 

b.txt内容:

2012-3-1 a

2012-3-2 b

2012-3-3 c

2012-3-4 d

2012-3-5 a

2012-3-6 b

2012-3-7 c

2012-3-3 c

 

输出结果:

2012-3-1 a

2012-3-1 b

2012-3-2 a

2012-3-2 b

2012-3-3 b

2012-3-3 c

2012-3-4 d

2012-3-5 a

2012-3-6 b

2012-3-6 c

2012-3-7 c

2012-3-7 d

 

 

 

 

//===================================JAVA CODE=========================

package gq;

 

import java.io.IOException;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**

 * 数据去重复

 * @author tallqi

 *

 */

public class Dereplication {

 

public static class Map extends Mapper<Object, Text, Text, Text>{

private static Text line = new Text();

public void map(Object key,Text value,Context context) throws IOException, InterruptedException{

line =value;

System.out.println("Map:"+value);

context.write(line, new Text(""));

}

 

}

public static class Reduce extends Reducer<Text, Text, Text, Text>{

public void reduce(Text key,Iterable<Text> value,Context context) throws IOException, InterruptedException{

System.out.println("Reduce:"+key);

context.write(key, new Text(""));;

}

 

}

public static void main(String[] args) throws Exception{

 

      Configuration conf = new Configuration();

 

Job job = new Job(conf,"Dereplication");

job.setJarByClass(Dereplication.class);

 

//设置Map、Combine和Reduce处理类

job.setMapperClass(Map.class);

// job.setCombinerClass(Reduce.class);

job.setReducerClass(Reduce.class);

   

 

//输出Key,value的类型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

 

//数据源地址,数据输出地址

FileInputFormat.addInputPath(job, new Path("hdfs://h0:9000/user/tallqi/in/input"));

FileOutputFormat.setOutputPath(job, new Path("hdfs://h0:9000/user/tallqi/in/output"));

System.exit(job.waitForCompletion(true)?0:1);

 

}

 

}

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics