`

hadoop 2.x wordcount练习

 
阅读更多

 

 

 

一、 本地环境运行:(也可以本地程序调用hdfs的数据,但必须指定运行的用户,或者将分布式数据权限改成所有人都可以读写,否则权限异常elipse中可以设置-DHADOOP_USER_NAME=hadoop )

 程序不在集群中运行。(数据可以是本地地址 也可以是hdfs地址(hdfs://cloud:9000/wc/wordcount/input))

1 设置环境

    HADOOP_HOME E:\source_src\hadoop-2.5.2

    path中添加 ;%HADOOP_HOME%\bin;

 2 winutils工具包添加到hadoop的bin目录中(见附件)

 

二、   elipse 运行mapreduce程序在yarn的集群中,方便断点调试 ,最好在linux的elipse运行,

  window上运行可能会出现兼容问题(不好弄)

  1 必须指定程序在yarn中运行(拷贝mapred-site.xml 和yarn-site.xml到src下面,如果不添加这些配置则程序是运行在本地,不会提交集群)

     也可以conf.set来手动设置

  2 指定运行的jar包 conf.set(mapreduce.job.jar,"winjob.jar")

     jar包是在elipse中打好放到src下面的

 三、直接在hadoop中执行jar包 

 

    注意:map和reduce都有 setup 和cleanup方法,在执行前和执行后都可以执行一些方法。

 

1 WCMapper 

  

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WCMapper extends Mapper<LongWritable, Text , Text, LongWritable >{

	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		 //key 是这行数据的偏移量  ,value 这一行的数据
		String line = value.toString();
		String[] words  = StringUtils.split(line, " ");
		for(String word :words ){
			context.write(new Text(word) , new LongWritable(1) );; 
		}
		 
	}
}

 2 WCReducer 

 

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
	
	@Override
	protected void reduce(Text key, Iterable<LongWritable> values,
			 Context context)
			throws IOException, InterruptedException {
	 
		long count 	= 0 ;
		for(LongWritable value : values ){
			count = count+value.get() ; 
		}
		context.write(new Text(key),new LongWritable( count) );
	}

}

 

 3  主类 job

    

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
public class WCRunner {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		
		Configuration conf = new Configuration() ; 
		Job job = Job.getInstance(conf) ;
		
		job.setJarByClass(WCRunner.class );
		
		job.setMapperClass( WCMapper.class );
		job.setReducerClass( WCReducer.class );
		
		job.setOutputValueClass( Text.class );
		job.setOutputValueClass(LongWritable.class );
		
		job.setMapOutputKeyClass( Text.class);
		job.setMapOutputValueClass( LongWritable.class );
		//默认用分布式的环境中运行
		FileInputFormat.setInputPaths(job,  "/wc/srcdata");
		FileOutputFormat.setOutputPath(job,  new Path("/wc/output"));
//              直接指定分布式环境上拿去数据,用window来运行程序方便断点调试, 但必须指定运行的用户(-DHADOOP_USER_NAME=hadoop ),否则权限异常
//		FileInputFormat.setInputPaths(job,  "hdfs://cloud1:9000/wc/srcdata");
//		FileOutputFormat.setOutputPath(job,  new Path("hdfs://cloud1:9000/wc/output"));
                //本地环境运行,输入和输出用本地路径即可,配置HADOOP_HOME环境变量并且添加winutils工具包(拷贝到hadoop的bin目录中)
//		FileInputFormat.setInputPaths(job,  "D:\\wordcount\\wordcount.txt");
//		FileOutputFormat.setOutputPath(job,  new Path("D:\\wordcount\\output"));
		job.waitForCompletion(true) ; 
		
	}
}

 4 运行 hadoop jar wordcount.jar  ,如果打包的时候没有设置主类则需要写主类全名,否则不用。

 

 注意:mapreduce程序也可以在windows下运行,但问题比较多,还需要winutils的工具包,需要设置hadoop的bin目录到环境变量下。

 流程图:

 

  • 大小: 55.6 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics