`

Hadoop排序

阅读更多
数据排序是许多实际任务在执行时要完成的第一项工作,比如学生成绩评比、数据建立索引等。
本次实例和数据去重类似,都是先对原始数据进行初步处理,为进一步的数据操作打好基础。

实例描述:
对输入文件中的数据进行排序。输入文件中的每行内容均为一个数字,即一个数据。要求在输出中每行有两个间隔的数字,其中,第二个数字代表原始数据,第一个数字代表这个原始数据在原始数据集中的位次。


样例输入:



样例输出:



程序代码
package com.songjy.hadoop.demo;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Sort {

	public static class MyMapper extends
			Mapper<Object, Text, IntWritable, IntWritable> {

		@Override
		protected void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {

			String line = value.toString();
			IntWritable data = new IntWritable(Integer.parseInt(line));

			context.write(data, new IntWritable(1));
		}

	}

	public static class MyReducer extends
			Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

		private static IntWritable linenum = new IntWritable(1);

		@Override
		protected void reduce(IntWritable key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {

			for (IntWritable v : values) {
				context.write(linenum, key);
				linenum = new IntWritable(linenum.get() + 1);
			}

			// linenum = new IntWritable(linenum.get() + 1);//代码放在这输出结果是啥样呢?o(∩_∩)o 哈哈

		}

	}

	public static void main(String[] args) throws IOException,
			ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();

		String[] otherArgs = new GenericOptionsParser(conf, args)
				.getRemainingArgs();

		if (otherArgs.length != 2) {
			System.out.println("Usage: wordcount <in> <out>");
			System.exit(2);
		}

		Job job = new Job(conf, Sort.class.getName());
		job.setJarByClass(Sort.class);
		job.setMapperClass(MyMapper.class);
		job.setReducerClass(MyReducer.class);
		//job.setPartitionerClass(MyPartitioner.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(IntWritable.class);

		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}

}


以上引自书籍《Hadoop实战》第2版的第五章,不过我去掉了自定义Partition部分代码,从结果来看,输出结果仍是正确(参看上面已有截图),是否仍需要自定义Partition的必要,望大牛们指点!

Partition部分代码
	/**
	 * 自定义Partitioner函数,此函数根据输入数据的最大值和MapReduce框架中
	 * Partitioner的数量获取将输入数据按照大小分块的边界,然后根据输入数值和
	 * 边界的关系返回对应的Partitioner ID
	 */
	public static class MyPartitioner extends
			Partitioner<IntWritable, IntWritable> {

		@Override
		public int getPartition(IntWritable key, IntWritable value,
				int numPartitions) {

			System.out.println("numPartitions=" + numPartitions);

			int maxnum = 652232;

			int bound = maxnum / numPartitions + 1;

			System.out.println("bound=" + bound);

			int keynum = key.get();

			for (int i = 0; i < numPartitions; i++) {
				if ((keynum < (bound * i)) && (keynum >= (bound * (i - 1))))
					//return i - 1;
					return (i - 1) >= 0 ? (i - 1) : 0;//partition是从0开始的,默认的返回应该给个0
			}

			//return -1;
			return 0;//partition是从0开始的,默认的返回应该给个0

		}

	}


下面的错误信息是因为partition是从0开始的,默认的返回应该给个0

15/04/06 15:32:40 INFO mapred.JobClient:  map 0% reduce 0%
15/04/06 15:36:54 INFO mapred.JobClient: Task Id : attempt_201503291109_0008_m_000002_0, Status : FAILED
java.io.IOException: Illegal partition for 26 (-1)
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1078)
        at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:690)
        at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
        at com.songjy.hadoop.demo.Sort$MyMapper.map(Sort.java:29)
        at com.songjy.hadoop.demo.Sort$MyMapper.map(Sort.java:1)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)
        at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
        at org.apache.hadoop.mapred.Child.main(Child.java:249)

attempt_201503291109_0008_m_000002_0: numPartitions=1
attempt_201503291109_0008_m_000002_0: bound=652233
15/04/06 15:38:11 INFO mapred.JobClient: Task Id : attempt_201503291109_0008_m_000001_0, Status : FAILED
attempt_201503291109_0008_m_000001_0: numPartitions=1
attempt_201503291109_0008_m_000001_0: bound=652233
15/04/06 15:38:24 INFO mapred.JobClient: Task Id : attempt_201503291109_0008_m_000000_0, Status : FAILED
java.io.IOException: Illegal partition for 2 (-1)
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1078)
        at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:690)
        at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
        at com.songjy.hadoop.demo.Sort$MyMapper.map(Sort.java:29)
        at com.songjy.hadoop.demo.Sort$MyMapper.map(Sort.java:1)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)
        at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
        at org.apache.hadoop.mapred.Child.main(Child.java:249)

attempt_201503291109_0008_m_000000_0: numPartitions=1
attempt_201503291109_0008_m_000000_0: bound=652233
15/04/06 15:38:24 INFO mapred.JobClient: Task Id : attempt_201503291109_0008_m_000001_1, Status : FAILED
java.io.IOException: Illegal partition for 5956 (-1)
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1078)
  • 大小: 10.8 KB
  • 大小: 7.7 KB
  • 大小: 6.9 KB
分享到:
评论

相关推荐

    hadoop排序和google三大论文

    NULL 博文链接:https://xuyuanshuaaa.iteye.com/blog/1181687

    hadoop 二次排序 原理

    Hadoop 大数据方向 mapreduce计算中的二次排序,讲解透彻

    hadoop分区二次排序示例.zip

    hadoop分区二次排序示例,对基站数据,按电话号码升序、到达时间降序进行排序

    hadoop实现分区二次排序代码示例.zip

    hadoop分区二次排序代码示例,包含基站数据集,对基站数据,按电话号码升序、到达时间降序进行排序,只需打包成jar,即可在hadoop集群中运行

    hadoop详细视频教程

    45_hadoop2.x_温度排序,分区,分组,自定义封装类02 46_hadoop2.x_温度排序,分区,分组,自定义封装类03 47_hadoop2.x_温度排序,分区,分组,自定义封装类04 48_hadoop2.x_温度排序,分区,分组,自定义封装类05 ...

    新版Hadoop视频教程 段海涛老师Hadoop八天完全攻克Hadoop视频教程 Hadoop开发

    04-hadoop的自定义排序实现.avi 05-mr程序中自定义分组的实现.avi 06-shuffle机制.avi 07-mr程序的组件全貌.avi 08-textinputformat对切片规划的源码分析.avi 09-倒排索引的mr实现.avi 10-多个job在同一个...

    Hadoop大作业排序.zip

    Hadoop大作业排序代码 由于 MapReduce 中对 key 进行比较和排序,而 key 可以是任何实 现了 Writable 接口的类。 在 java 中,要实现类的大小比较可以实现 Comparable 接口并通 过重写 compareTo 方法来实现。 在 ...

    Hadoop应用案例分析:雅虎、eBay、百度、Facebook.pdf

    ,Hadoop 技术已经在互联网领域得到了广泛的应用。互联网公司往往需要 存储海量的数据并对其进行处理,而这正是Hadoop 的强项。如Facebook 使用Hadoop ...阿里巴巴则将Hadoop 用于商业数据的排序和搜索引擎的优化等。

    论文研究-基于Hadoop的多关键字排序方法研究.pdf

    在单机环境下按多关键字对大数据排序需要较长的执行时间,为了提高按多关键字对大数据排序的效率,根据Hadoop的MapReduce模型,给出了两种基于Hadoop的多关键字排序方法。方法一在Reduce函数中使用链式基数排序算法...

    hadoop shuffle和排序1

    hadoop学习笔记-shuffle和排序 shuffle是指将map输出作为输入传给reduce的过程。

    Hadoop权威指南 第二版(中文版)

     在Apache Hadoop上的TB字节数量级排序  使用Pig和Wukong来探索10亿数量级边的 网络图  测量社区  每个人都在和我说话:Twitter回复关系图  degree(度)  对称链接  社区提取 附录A 安装Apache Hadoop  先决...

    Hadoop从入门到上手企业开发

    060 MapReduce执行流程之Shuffle和排序流程以及Map端分析 061 MapReduce执行流程之Reduce端分析 062 MapReduce Shuffle过程讲解和Map Shuffle Phase讲解 063 Reduce Shuffle Phase讲解 064 源代码跟踪查看Map Task和...

    大数据 hadoop mapreduce 词频统计

    在hadoop平台上,用mapreduce编程实现大数据的词频统计

    Hadoop权威指南(中文版)2015上传.rar

    在Apache Hadoop上的TB字节数量级排序 使用Pig和Wukong来探索10亿数量级边的 网络图 测量社区 每个人都在和我说话:Twitter回复关系图 (度)degree 对称链接 社区提取 附录A 安装Apache Hadoop 附录B Cloudera’s ...

    Hadoop平台技术 排序操作案例.docx

    Hadoop平台技术 排序操作案例.docx 学习资料 复习资料 教学资源

    hadoop段海涛老师八天实战视频

    第一天 hadoop的基本概念 伪分布式hadoop集群安装 hdfs mapreduce 演示 01-hadoop职位需求状况.avi 02-hadoop课程安排.avi ... 04-hadoop的自定义排序实现.avi 05-mr程序中自定义分组的实现.avi

    论文研究-改进的Hadoop作业调度算法.pdf

    针对如上问题,深入分析了Hadoop源代码,理清了Hadoop的运行原理,在Hadoop资源管理机制Yarn中改进了Hadoop任务的排序,建立了新的任务排序规则,提出了对各节点性能评价的指标,分为动态性能指标和静态性能指标。...

    hadoop二次排序的原理和实现方法

    主要介绍了hadoop二次排序的原理和实现,本文通过实例代码给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下

    云计算应用实验报告 武汉理工大学云计算应用 hadoop单机模式和伪分布式

    1、hadoop单机模式和伪分布式 2、hadoop集群 3、hadoop运行WordCount程序 ...要求从输入数据中找出工厂名和地址名的对应关系,输出“工厂名——地址名”表,按工厂名排序输出。数据文件自己设计样例。

Global site tag (gtag.js) - Google Analytics