`
jsh0401
  • 浏览: 10950 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

mapred代码示例--旧api的写法

 
阅读更多
package old;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

/**
* 旧的api写法
*
* @author Administrator
*
*/
public class WordCount {

private static final String INPUT_PATH = "hdfs://hadoop:9000/in/hello";
private static final String OUT_PATH = "hdfs://hadoop:9000/oldout";

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
JobConf job = new JobConf(conf);
job.setJarByClass(WordCount.class);
FileInputFormat.setInputPaths(job, INPUT_PATH);
job.setInputFormat(TextInputFormat.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);


job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

//在定义一个FileSystem变量的时候伪分布式和单机版的方法是不一样的,单机版使用的是FileSystem类的静态函数
//FileSystem hdfs = FileSystem.get(conf)
//不然的话会出现如下错误
//Wrong FS: hdfs://localhost:9000/home/hadoop/hadoop, expected: file:///
//伪分布式下需要使用Path来获得
//Path dstDir = new Path("hdfs://hadoop:9000/"); 
//FileSystem fs = dstDir.getFileSystem(conf);

FileSystem fs = FileSystem.get(conf);
if(fs.exists(new Path(OUT_PATH))){
fs.delete(new Path(OUT_PATH), true);
}
FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
job.setOutputFormat(TextOutputFormat.class);

JobClient.runJob(job);


}

public static class MyMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable>{

Text k2 = new Text();
LongWritable v2 = new LongWritable();
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, LongWritable> output, Reporter reporter)
throws IOException {

String[] splited = value.toString().split("\t");
for (String split : splited) {
k2.set(split);
v2.set(1L);
output.collect(k2, v2);
}
}
}

public static class MyReducer extends MapReduceBase implements Reducer<Text, LongWritable, Text, LongWritable>{

Text k3 = new Text();
LongWritable v3 = new LongWritable();
@Override
public void reduce(Text key, Iterator<LongWritable> values,
OutputCollector<Text, LongWritable> output, Reporter reporter)
throws IOException {

long times = 0;
while (values.hasNext()) {
LongWritable v2 = (LongWritable) values.next();
times += v2.get();

}
v3.set(times);
output.collect(key, v3);
}

}
}

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics