`
qihuiyong6
  • 浏览: 39482 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

windows下编写mapreduce程序

阅读更多
   配置linux的hadoop环境比较繁琐,为了方便的编写测试mapreduce程序。本文将讲解如何在windows运行mapreduce程序。步骤如下:
1、首先让window可运行unix命令,我们做如下配置:
在环境变量PATH中配置D:\UnxUtils\bin;D:\UnxUtils\usr\local\wbin(UnxUtils见附件)
2、编写mapreduce程序
a>新建一个java程序依赖hadoop-core-0.20.jar(我用的maven它会传递依赖其他jar包)
完整的依赖如下:
引用
[INFO] --- maven-dependency-plugin:2.1:tree (default-cli) @ MapReduceDemo ---
[INFO] qhy.test:MapReduceDemo:jar:0.0.1-SNAPSHOT
[INFO] +- org.apache.hadoop:hadoop-core:jar:0.20.2:compile
[INFO] |  +- commons-cli:commons-cli:jar:1.2:compile
[INFO] |  +- xmlenc:xmlenc:jar:0.52:compile
[INFO] |  +- commons-httpclient:commons-httpclient:jar:3.0.1:compile
[INFO] |  |  \- commons-logging:commons-logging:jar:1.0.3:compile
[INFO] |  +- commons-codec:commons-codec:jar:1.3:compile
[INFO] |  +- commons-net:commons-net:jar:1.4.1:compile
[INFO] |  +- org.mortbay.jetty:jetty:jar:6.1.14:compile
[INFO] |  +- org.mortbay.jetty:jetty-util:jar:6.1.14:compile
[INFO] |  +- tomcat:jasper-runtime:jar:5.5.12:compile
[INFO] |  +- tomcat:jasper-compiler:jar:5.5.12:compile
[INFO] |  +- org.mortbay.jetty:jsp-api-2.1:jar:6.1.14:compile
[INFO] |  +- org.mortbay.jetty:jsp-2.1:jar:6.1.14:compile
[INFO] |  |  \- ant:ant:jar:1.6.5:compile
[INFO] |  +- commons-el:commons-el:jar:1.0:compile
[INFO] |  +- net.java.dev.jets3t:jets3t:jar:0.7.1:compile
[INFO] |  +- org.mortbay.jetty:servlet-api-2.5:jar:6.1.14:compile
[INFO] |  +- net.sf.kosmosfs:kfs:jar:0.3:compile
[INFO] |  +- junit:junit:jar:4.5:compile
[INFO] |  +- hsqldb:hsqldb:jar:1.8.0.10:compile
[INFO] |  +- oro:oro:jar:2.0.8:compile
[INFO] |  \- org.eclipse.jdt:core:jar:3.1.1:compile
[INFO] \- commons-io:commons-io:jar:2.4:compile
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS

b>Mapreduce程序:
功能非常简单就是计算取每年的数字的平均值。
输入文件(D:\mr\in\a.txt)内容如下:
引用

20140301 12
20140302 12
20140304 12
20140305 12
20140306 12
20140307 12
20140308 12
20140309 12
20140310 12
20130301 13
20130302 13
20130304 13

mapper类(MyMapper):
package com.demo.mapred.max;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
	protected void map(
			Object key,
			Text value,
			org.apache.hadoop.mapreduce.Mapper<Object, Text, Text, IntWritable>.Context context)
			throws java.io.IOException, InterruptedException {
		String year= value.toString().substring(0, 4);
		String number= value.toString().substring(9);
		System.out.println("MAA|mapper----"+year+"========>"+number);
		context.write(new Text(year), new IntWritable(Integer.parseInt(number)));
	};
}

reduce类(MyReduce):
package com.demo.mapred.max;

import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
	protected void reduce(
			Text key,
			java.lang.Iterable<IntWritable> values,
			org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, Text, IntWritable>.Context context)
			throws java.io.IOException, InterruptedException {
		int max = this.getAvg(values.iterator());
		System.out.println("MAA|reduce----" + key.toString() + "========>"
				+ max);
		context.write(key, new IntWritable(max));
	};

	private int getMax(Iterator<IntWritable> iterator) {
		int max = 0;
		while (iterator.hasNext()) {
			IntWritable num = iterator.next();
			if (Integer.parseInt((num.toString())) > max) {
				max = Integer.parseInt((num.toString()));
			}
		}
		return max;
	}

	private int getAvg(Iterator<IntWritable> iterator) {
		int sum = 0;
		int count = 0;
		while (iterator.hasNext()) {
			count++;
			IntWritable num = iterator.next();
			sum += Integer.parseInt((num.toString()));
		}
		return sum / count;
	}
}

运行Mapreduce程序的:
package com.demo.mapred.max;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class TestRunJob  extends Configured{
	public static void main(String[] args) {
		try{
			Configuration conf = new Configuration();
//			String file = "/config/classifyDimCount.properties";
			String inPath = "D:\\mr\\in\\a.txt";
			String outPath = "D:\\mr\\out\\MAX"+System.currentTimeMillis();
			Path out = new Path(outPath); 
			Job job = new Job(conf, "classify dim genetator");
			job.setJarByClass(TestRunJob.class);
			job.setMapperClass(MyMapper.class);
			job.setReducerClass(MyReduce.class);
			job.setNumReduceTasks(1);
			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(IntWritable.class);
//			job.setOutputFormatClass(DimOutputFormat.class);
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(IntWritable.class);
			FileInputFormat.addInputPath(job, new Path(inPath));
			FileOutputFormat.setOutputPath(job, out);
			
			System.exit(job.waitForCompletion(true) ? 0 : 1);
		}
		catch (InterruptedException e)
		{
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		catch (ClassNotFoundException e)
		{
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		catch (IOException e)
		{
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

运行结果:
引用

2014-4-20 9:45:57 org.apache.hadoop.metrics.jvm.JvmMetrics init
信息: Initializing JVM Metrics with processName=JobTracker, sessionId=
2014-4-20 9:45:57 org.apache.hadoop.mapred.JobClient configureCommandLineOptions
警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
2014-4-20 9:45:57 org.apache.hadoop.mapred.JobClient configureCommandLineOptions
警告: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
2014-4-20 9:45:57 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2014-4-20 9:45:57 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0001
2014-4-20 9:45:57 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2014-4-20 9:45:57 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
信息: io.sort.mb = 100
2014-4-20 9:45:57 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
信息: data buffer = 79691776/99614720
2014-4-20 9:45:57 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
信息: record buffer = 262144/327680
MAA|mapper----2014========>12
MAA|mapper----2014========>12
MAA|mapper----2014========>12
MAA|mapper----2014========>12
MAA|mapper----2014========>12
MAA|mapper----2014========>12
MAA|mapper----2014========>12
MAA|mapper----2014========>12
MAA|mapper----2014========>12
MAA|mapper----2013========>13
MAA|mapper----2013========>13
MAA|mapper----2013========>13
2014-4-20 9:45:57 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2014-4-20 9:45:57 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2014-4-20 9:45:57 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
2014-4-20 9:45:57 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息:
2014-4-20 9:45:57 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000000_0' done.
2014-4-20 9:45:57 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息:
2014-4-20 9:45:57 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
2014-4-20 9:45:57 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 134 bytes
2014-4-20 9:45:57 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息:
MAA|reduce----2013========>13
MAA|reduce----2014========>12
2014-4-20 9:45:57 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
2014-4-20 9:45:57 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息:
2014-4-20 9:45:57 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0001_r_000000_0 is allowed to commit now
2014-4-20 9:45:57 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0001_r_000000_0' to D:/mr/out/MAX1397958356938
2014-4-20 9:45:57 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2014-4-20 9:45:57 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_r_000000_0' done.
2014-4-20 9:45:58 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2014-4-20 9:45:58 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0001
2014-4-20 9:45:58 org.apache.hadoop.mapred.Counters log
信息: Counters: 12
2014-4-20 9:45:58 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2014-4-20 9:45:58 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=27182
2014-4-20 9:45:58 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=54292
2014-4-20 9:45:58 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2014-4-20 9:45:58 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=2
2014-4-20 9:45:58 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=0
2014-4-20 9:45:58 org.apache.hadoop.mapred.Counters log
信息:     Map input records=12
2014-4-20 9:45:58 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2014-4-20 9:45:58 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=2
2014-4-20 9:45:58 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=24
2014-4-20 9:45:58 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=108
2014-4-20 9:45:58 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=0
2014-4-20 9:45:58 org.apache.hadoop.mapred.Counters log
信息:     Map output records=12
2014-4-20 9:45:58 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=12


总结:刚开始运行mapreduce程序的时候报错了,是因为mapper第一个参数的输入类型必须是LongWritable。
具体错误如下:
引用
警告: job_local_0001
java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:845)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:541)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:621)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:177)
2014-4-20 9:41:49 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics