/*
MAP REDUCE 的计算框架
INPUT -> MAP-> COMBINER -> REDUCER -> OUTPUT
计算的每个步骤皆以KEY,VALUE键值对作为输入,输出参数。
参数的类型为HADOOP封装的类型,加快数据的网络传输。
在计算之前,先对数据进行分片,通常情况下,一个分片对应一个64M的数据块,每个分片对应一个TASK.
通过分片实现计算数据本地化,若一行记录被分成两个不同的数据块,则HADOOP会将另外一个数据块的
剩余记录读取到本地,形成一个分片。
INPUT: 数据的输入路径
MAP: 输入KEY参数为每行所在文件的偏移量,输入VALUE参数为每个内容。此步骤主要是做数据的预处理,挑选出需要处理的数据。
REDUCER: 输入参数为MAP的输出参数,对数据进行加工处理。
COMBINER: 减少MAP节点到REDUCER节点的传输数据量,而在MAP之后进行的分片内的数据计算处理。
OUTPUT: 数据的输出路径
//AVG的实现
--打包
javac -classpath ../hadoop-core-1.1.2.jar *.java
jar cvf ./WetherAvg.jar ./*.class
*/
bin/hadoop jar ./AvgTemperature.jar AvgTemperature ./in/sample.txt ./out10
打包后需注意把myclass的class文件删除掉。
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
//Mapper 类是个泛型类,四个形参
//input key,input value,output key,output value
//
//
public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>
{
private static final int MISSING = 9999;
@Override
//Hadoop提供了一系列基础的类型,便于网络序列化传输longwriteable=long
//text=string
//Called once for each key/value pair in the input split.
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') {
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
//map() method also provides an instance of Context to write the output to。
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
//reducer函数也有四个形参用于指定输入和输出类型reduce的函数输入类型必须与map函数的输出类型匹配
public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
//实现这个Iterable接口允许对象成为 "foreach" 语句的目标。
public void reduce(Text key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {
int minValue = Integer.MAX_VALUE;
for (IntWritable value : values) {
minValue = Math.min(minValue, value.get());
}
//reducer() method also provides an instance of Context to write the output to。
context.write(key, new IntWritable(maxValue));
}
}
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//A Job object forms the specification of the job and gives you control over how the job
//is run.
//When we run this job on a Hadoop cluster, we will package the code into a JAR
//file (which Hadoop will distribute around the cluster).
//
//
//
public class MaxTemperature {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));//define the input data path
FileOutputFormat.setOutputPath(job, new Path(args[1]));//define the output data path The directory shouldn’t exist before running the job
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);//Submit the job to the cluster and wait for it to finish.
}
}
----------------------------------------------------------------------------------------
--实现平均天气稳定的代码
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class AvgTemperatureMapper extends Mapper<LongWritable, Text, Text, Text>
{
private static final int MISSING = 9999;
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') {
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new Text(String.valueOf(airTemperature)));
}
}
}
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class AvgTemperatureCombiner extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
int sum = 0,count = 0;
for (Text intvalue : values) {
count++;
sum += Integer.parseInt(intvalue.toString());
}
context.write(key, new Text(sum+","+count));
}
}
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class AvgTemperatureReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
int sum = 0,count = 0;
for (Text value : values) {
String[] sp = value.toString().split(",");
sum += Integer.parseInt(sp[0]);
count += Integer.parseInt(sp[1]);
}
context.write(key, new Text((sum/count)+""));
}
}
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class AvgTemperature {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(AvgTemperature.class);
job.setJobName("Avg temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(AvgTemperatureMapper.class);
job.setCombinerClass(AvgTemperatureCombiner.class);
job.setReducerClass(AvgTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
----------------------------------------------------------------------------------------------------------------------------------/**
* Hadoop网络课程作业程序
* 编写者:James
*/
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Exercise_1 extends Configured implements Tool {
/**
* 计数器
* 用于计数各种异常数据
*/
enum Counter
{
LINESKIP, //出错的行
}
/**
* MAP任务
*/
public static class Map extends Mapper<LongWritable, Text, NullWritable, Text>
{
public void map ( LongWritable key, Text value, Context context ) throws IOException, InterruptedException
{
String line = value.toString(); //读取源数据
try
{
//数据处理
String [] lineSplit = line.split(" ");
String month = lineSplit[0];
String time = lineSplit[1];
String mac = lineSplit[6];
/** 需要注意的部分 **/
String name = context.getConfiguration().get("name");
Text out = new Text(name + ' ' + month + ' ' + time + ' ' + mac);
/** 需要注意的部分 **/
context.write( NullWritable.get(), out); //输出
}
catch ( java.lang.ArrayIndexOutOfBoundsException e )
{
context.getCounter(Counter.LINESKIP).increment(1); //出错令计数器+1
return;
}
}
}
@Override
public int run(String[] args) throws Exception
{
Configuration conf = getConf();
/** 需要注意的部分 **/
conf.set("name", args[2]);
/** 需要注意的部分 **/
Job job = new Job(conf, "Exercise_1"); //任务名
job.setJarByClass(Exercise_1.class); //指定Class
FileInputFormat.addInputPath( job, new Path(args[0]) ); //输入路径
FileOutputFormat.setOutputPath( job, new Path(args[1]) ); //输出路径
job.setMapperClass( Map.class ); //调用上面Map类作为Map任务代码
job.setOutputFormatClass( TextOutputFormat.class );
job.setOutputKeyClass( NullWritable.class ); //指定输出的KEY的格式
job.setOutputValueClass( Text.class ); //指定输出的VALUE的格式
job.waitForCompletion(true);
//输出任务完成情况
System.out.println( "任务名称:" + job.getJobName() );
System.out.println( "任务成功:" + ( job.isSuccessful()?"是":"否" ) );
System.out.println( "输入行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue() );
System.out.println( "输出行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue() );
System.out.println( "跳过的行:" + job.getCounters().findCounter(Counter.LINESKIP).getValue() );
return job.isSuccessful() ? 0 : 1;
}
/**
* 设置系统说明
* 设置MapReduce任务
*/
public static void main(String[] args) throws Exception
{
//判断参数个数是否正确
//如果无参数运行则显示以作程序说明
if ( args.length != 3 )
{
System.err.println("");
System.err.println("Usage: Test_1 < input path > < output path > < name >");
System.err.println("Example: hadoop jar ~/Test_1.jar hdfs://localhost:9000/home/james/Test_1 hdfs://localhost:9000/home/james/output hadoop");
System.err.println("Counter:");
System.err.println("\t"+"LINESKIP"+"\t"+"Lines which are too short");
System.exit(-1);
}
//记录开始时间
DateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );
Date start = new Date();
//运行任务
int res = ToolRunner.run(new Configuration(), new Exercise_1(), args);
//输出任务耗时
Date end = new Date();
float time = (float) (( end.getTime() - start.getTime() ) / 60000.0) ;
System.out.println( "任务开始:" + formatter.format(start) );
System.out.println( "任务结束:" + formatter.format(end) );
System.out.println( "任务耗时:" + String.valueOf( time ) + " 分钟" );
System.exit(res);
}
}
------------------------------------------------------------------------------------------------------------------------------------------------
/**
* Hadoop网络课程模板程序
* 编写者:James
*/
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 有Reducer版本
*/
public class Test_2 extends Configured implements Tool {
/**
* 计数器
* 用于计数各种异常数据
*/
enum Counter
{
LINESKIP, //出错的行
}
/**
* MAP任务
*/
public static class Map extends Mapper<LongWritable, Text, Text, Text>
{
public void map ( LongWritable key, Text value, Context context ) throws IOException, InterruptedException
{
String line = value.toString(); //读取源数据
try
{
//数据处理
String [] lineSplit = line.split(" ");
String anum = lineSplit[0];
String bnum = lineSplit[1];
context.write( new Text(bnum), new Text(anum) ); //输出
}
catch ( java.lang.ArrayIndexOutOfBoundsException e )
{
context.getCounter(Counter.LINESKIP).increment(1); //出错令计数器+1
return;
}
}
}
/**
* REDUCE任务
*/
public static class Reduce extends Reducer<Text, Text, Text, Text>
{
public void reduce ( Text key, Iterable<Text> values, Context context ) throws IOException, InterruptedException
{
String valueString;
String out = "";
String name = context.getConfiguration().get("name");
for ( Text value : values )
{
valueString = value.toString();
out += valueString + "|";
}
out+=name;
context.write( key, new Text(out) );
}
}
@Override
public int run(String[] args) throws Exception
{
Configuration conf = getConf();
conf.set("name", args[2]);
Job job = new Job(conf, "Test_2"); //任务名
job.setJarByClass(Test_2.class); //指定Class
FileInputFormat.addInputPath( job, new Path(args[0]) ); //输入路径
FileOutputFormat.setOutputPath( job, new Path(args[1]) ); //输出路径
job.setMapperClass( Map.class ); //调用上面Map类作为Map任务代码
job.setReducerClass ( Reduce.class ); //调用上面Reduce类作为Reduce任务代码
job.setOutputFormatClass( TextOutputFormat.class );
job.setOutputKeyClass( Text.class ); //指定输出的KEY的格式
job.setOutputValueClass( Text.class ); //指定输出的VALUE的格式
job.waitForCompletion(true);
//输出任务完成情况
System.out.println( "任务名称:" + job.getJobName() );
System.out.println( "任务成功:" + ( job.isSuccessful()?"是":"否" ) );
System.out.println( "输入行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue() );
System.out.println( "输出行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue() );
System.out.println( "跳过的行:" + job.getCounters().findCounter(Counter.LINESKIP).getValue() );
return job.isSuccessful() ? 0 : 1;
}
/**
* 设置系统说明
* 设置MapReduce任务
*/
public static void main(String[] args) throws Exception
{
//判断参数个数是否正确
//如果无参数运行则显示以作程序说明
if ( args.length != 2 )
{
System.err.println("");
System.err.println("Usage: Test_2 < input path > < output path > ");
System.err.println("Example: hadoop jar ~/Test_2.jar hdfs://localhost:9000/home/james/Test_2 hdfs://localhost:9000/home/james/output");
System.err.println("Counter:");
System.err.println("\t"+"LINESKIP"+"\t"+"Lines which are too short");
System.exit(-1);
}
//记录开始时间
DateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );
Date start = new Date();
//运行任务
int res = ToolRunner.run(new Configuration(), new Test_2(), args);
//输出任务耗时
Date end = new Date();
float time = (float) (( end.getTime() - start.getTime() ) / 60000.0) ;
System.out.println( "任务开始:" + formatter.format(start) );
System.out.println( "任务结束:" + formatter.format(end) );
System.out.println( "任务耗时:" + String.valueOf( time ) + " 分钟" );
System.exit(res);
}
}
相关推荐
mapreduce 编程 此示例程序将让您提取有用的统计数据,例如排名前 10 的平均评分电影、使用 Hadoop map-reduce 框架以及链接多个映射器和化简器对 200 万条记录进行基于流派的过滤
Map-ReduceAnagrams 这是在 map-reduce 编程模型中查找字谜的基本实现。 它是在 Hadoop 文件系统中实现的。 要正常工作,您必须提供两个参数: 输入文件的路径(英文单词) 输出文件的路径(字谜) 如果您有任何问题...
hadoop map reduce 的中文简易教程,能轻松帮助普通用户不需了解太多hadoop底层知识就能实现分布式编程,很好的入门教程。
MapReduce编程:使用hadoop计算维基百科文章的内部PageRank。 本课程向您介绍编程和数据操作的MapReduce模型。 它将提供有限的实践经验来分析真实的数据源:维基百科。 数据: 为了完成此任务,已向您提供了许多...
Hadoop为分布式编程提供了一个理想的平台,普通的程序员只要理解了分布式的特点,就可以轻易地实现分布式计算,而不需要理解分布式的细节。本文用实例讲解了在Eclipse下,使用Hadoop对数据集的统计度量的实现过程。 ...
cascading是在hadoop基础之上的map-reduce编程框架。能够简化hadoop上的分布式编程。目前应用在amazon ec2上。一个非常好的工具。
Hadoop 平台的实现原理源自Google 提出的Map-Reduce 编程模型 和GFS 分布式存储系统,在海量的非结构化数据的处理方面有着其他平台难以匹敌的优势。 10 本文在介绍了如何使用Hadoop搭建云计算平台的同时介绍了如何对...
3.2 C语言Map-Reduce程序示例 6 3.2.1计算任务 6 3.2.2 Mapper算法设计 7 3.2.3 Reducer算法设计 8 3.2.4 作业提交命令 9 3.3 shell Map-Reduce程序示例 9 3.3.1计算任务 9 3.3.2 map实现 10 3.3.3 reduce实现 11 ...
hadoop实战中文版pdf,主要讲解如何搭建hadoop集群,SSH免密码登录,map-reduce编程以及实战项目,其中也包括很多hadoop核心类的解读
4.启动eclipse,检查插件是否运行成功(有DFS Locations的东东就是ok的) 若是没有,也别着急,在window-perspective-openperspective-other中将map/reduce打开 5.Mr配置以及jar包导入参考mapreduce编程ppt 6.别...
详细介绍基于hadoop的mapreduce编程,基本原理。hadoop架构,map的处理方式,reduce的处理输入输出等。
使用MapReduce实现多个文本文件中WordCount词频统计功能,实验编写Map处理逻辑、编写Reduce处理逻辑、编写main方法。 二.实验目的 1、通过实验掌握基本的MapReduce编程方法。 2、实现统计HDFS系统中多个文本文件中...
HDFS 的直接编程控制和运行 map reduce 作业 - 所有单元测试都从 IDE 运行,该项目内置了 hadoop 配置并在 git 控制下。 通过实现 Web REST API 客户端将文件上传到 HDFS。 异步文件上传(在撰写本文时仅实现 ...
支持不同类型的作业,例如Hadoop Map-Reduce,管道,流,Pig,Hive和自定义Java应用程序。 基于频率和/或数据可用性的工作流计划。 监视功能,自动重试和失败处理。 可扩展和可插入的体系结构允许任意网格编程范例...
了Map/Reduce编程模型运行原理及其优点,其次介绍 了Map/Reduce模型的开源实现版本——Hadoop分布 式处理平台,在此基础上将搜索引擎的爬行器、索引器和 查询器三个功能模块按照Map/Reduce模型进行设计, 充分利用...
Spring Data 项目的目的是为了简化构建基于 Spring 框架应用的数据访问计数,包括非关系数据库、Map-Reduce 框架、云数据服务等等;另外也包含对关系数据库的访问支持。Spring Data 包含多个子项目:Commons - 提供...
6.4 实现Map/Reduce的C语言实例 6.5 建立Hadoop开发环境 6.5.1 相关准备工作 6.5.2 JDK的安装配置 6.5.3 下载、解压Hadoop, 配置Hadoop环境变量 6.5.4 修改Hadoop配置文件 6.5.5 将配置好的Hadoop文件复制到...
您还可以找到如何在Map Reduce中编写自定义数据类型和自定义分区程序。 #trendfinder文件夹:在Trendfinder文件夹中,您将发现如何使用多个Mappers和Reducers。 在这里,我们根据推文的出现来处理推特数据。 #...