`

MapReduce2中自定义排序分组

 
阅读更多

 

1 Map 、Reduce和主类 

  

package com.wzt.mapreduce.secondsort;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.wzt.mapreduce.wordcount.WCRunner;

public class SecSortMain {

	public static class SecSortMapper extends Mapper<LongWritable, Text, FirstSortEntity, IntWritable> {
		
		protected void map(LongWritable key, Text value, Context context)
				throws  IOException, InterruptedException {
			 
			String line = value.toString();
			String[] spilted = line.split(" ");
			
			// 为了显示效果而输出Mapper的输出键值对信息
			System.out.println("Mapper输出<" + spilted[0] + "," + spilted[1] + ">"+this);
			context.write(new FirstSortEntity(spilted[0], Integer.parseInt(spilted[1]))  , new IntWritable(Integer.parseInt(spilted[1])) );
		};
		
	}

	public static class SecSortReducer extends Reducer<FirstSortEntity, IntWritable , FirstSortEntity, IntWritable> {
		
		@Override
		protected void reduce(
				FirstSortEntity key,
				Iterable<IntWritable> values,
				Context context)
				throws IOException, InterruptedException {
			
			// 显示次数表示redcue函数被调用了多少次,表示k2有多少个分组
			System.out.println("Reducer输入分组<" + key+ ",N(N>=1)>"+this);
			StringBuffer sb = new StringBuffer() ; 
			for (IntWritable value : values) {
				//count += value.get();
				// 显示次数表示输入的k2,v2的键值对数量
				sb.append( value+" , " ) ;
				System.out.println("Reducer输入键值对<" + key.toString() + "," + value.get() + ">  组"+sb.toString() );
			}
//			if(sb.length()>0){
//				sb.deleteCharAt( -1 ) ;
//			}

			context.write(key, key.getSecondkey());
			//context.write(key.getFirstkey(),  new Text(sb.toString() ));
			
		}
		
	}

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		
		Configuration conf = new Configuration() ; 
		Job job = Job.getInstance(conf) ;
		
		job.setJarByClass(WCRunner.class );
		
		job.setMapperClass( SecSortMapper.class );
		job.setMapOutputKeyClass( FirstSortEntity.class);
		job.setMapOutputValueClass( IntWritable.class );
		 
		//设置分区方法
		job.setPartitionerClass( SSPartintioner.class);//不同
		//会有几个reduce去执行最后的汇总数据, 有几个分区就要有几个reduce ,最后就会生成几个reduce ,这里设置为1 ,没看到调用但是确实分区了,没弄明白
		job.setNumReduceTasks(1);//当任务数为1的时候设置Partitioner是没有用的
		
		//数据做总的排序
		job.setSortComparatorClass(MySSSortComparator.class) ; //排序
		//总数据  记性分组 
		job.setGroupingComparatorClass( GroupComparator.class );//分组
		
		job.setReducerClass( SecSortReducer.class );
		job.setOutputKeyClass( FirstSortEntity.class );
		job.setOutputValueClass(IntWritable.class );
		
		
//		FileInputFormat.setInputPaths(job,  "/wc/input/xiyou.txt");
//		FileOutputFormat.setOutputPath(job,  new Path("/wc/output6"));
		FileInputFormat.setInputPaths(job,  "/sort/input");
		FileOutputFormat.setOutputPath(job,  new Path("/sort/output1"));
		
 		job.waitForCompletion(true) ; 
	}
}

 

2 自定义 组合key 

 

package com.wzt.mapreduce.secondsort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
/**
 * 自定义组合件 
 * @author root
 *
 */
public class FirstSortEntity implements WritableComparable<FirstSortEntity>{

	private Text firstkey ; 
	private IntWritable secondkey ;
	
	public FirstSortEntity( ) {
	}
	
	public FirstSortEntity(Text firstkey, IntWritable secondkey) {
		this.firstkey = firstkey;
		this.secondkey = secondkey;
	}
	public FirstSortEntity(String firstkey, int secondkey) {
		this.firstkey = new Text(firstkey);
		this.secondkey = new IntWritable(secondkey);
	}
	
	public Text getFirstkey() {
		return firstkey;
	}
	public void setFirstkey(Text firstkey) {
		this.firstkey = firstkey;
	}
	public IntWritable getSecondkey() {
		return secondkey;
	}
	public void setSecondkey(IntWritable secondkey) {
		this.secondkey = secondkey;
	}
	/**
	 * 对象序列化
	 */
	@Override
	public void write(DataOutput out) throws IOException {
		 out.writeUTF(firstkey.toString() );
		 out.writeInt(  secondkey.get() );
	}

	//对象反序列化
	@Override
	public void readFields(DataInput in) throws IOException {
		 
		firstkey = new Text(in.readUTF() );
		secondkey = new IntWritable(in.readInt()); 
	}

	
	/**
	 * 排序在map执行后数据传出后 会调用这个方法对key进行排序 
	 * 数据map后,如果设置了分区并且reduce>1 的话,会执行分区类方法,进行分区
	 */
	@Override
	public int compareTo(FirstSortEntity entity) {
		//利用这个来控制升序或降序
		//this本对象写在前面代表是升序
		//this本对象写在后面代表是降序
		return this.firstkey.compareTo( entity.getFirstkey());
		//return this.secondkey.get()>entity.getSecondkey().get()?1:-1;	
	}
	@Override
	public String toString() {
		return this.getFirstkey() +" "+this.getSecondkey()+ "   "  ;
	} 

}

 3 自定义分区 

 

package com.wzt.mapreduce.secondsort;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

//自定义 分区
public class SSPartintioner extends Partitioner<FirstSortEntity, IntWritable>{
 
	/**
	 * key map输出的key
	 * value map 输出的value 
	 *  map后的数据 经过排序后传进这个分区方法,如果返回的值相同的数据,值相同的数据会分配到一组中 ,即 放到一堆 
	 *  到此 数据为N堆,并且数据是经过排序的 
	 */
	@Override
	public int getPartition(FirstSortEntity key, IntWritable value,
			int numPartitions) {
			System.out.println("Partitioner  key:"+key.getFirstkey()+"  value:"+value+"  "+ ( ( key.getFirstkey().hashCode()&Integer.MAX_VALUE)%numPartitions ) +"   "+this);
			//System.out.println("Partitioner  key:"+key.getFirstkey()+"  value:"+value+"  "+ ((key.getSecondkey().get()&Integer.MAX_VALUE)%numPartitions) +"   "+this);
			
	       return (key.getFirstkey().hashCode()&Integer.MAX_VALUE)%numPartitions;
			//return (key.getSecondkey().get()&Integer.MAX_VALUE)%numPartitions;
	}
	 
	
}

   个人理解以上都是在Map阶段进行,即本地操作,以下为Map到Reduce这段进行的

 

4  自定义整体排序 

  

package com.wzt.mapreduce.secondsort;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;


//组内自定义排序策略
/**
 * @author root
 *
 */
public class MySSSortComparator extends WritableComparator{

	public MySSSortComparator() {//注册处理的试题类型 
		super(FirstSortEntity.class,true);
	}
	
	/**
	 *  reduce 处理数据之前 
	 *  对全量数据排序 
	 *  逻辑:分组一样则按照第二个参数排序  ,分组不一样,则按照第一个参数排序  
	 */
	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		
		FirstSortEntity e1 = (FirstSortEntity)a;
		FirstSortEntity e2 = (FirstSortEntity)b;
		System.out.println( e1.getFirstkey()+"==MySSSortComparator 排序 。。 "+e2.getFirstkey());
		//首先要保证是同一个组内,同一个组的标识就是第一个字段相同
		if(!e1.getFirstkey().equals( e2.getFirstkey())){
			return e1.getFirstkey().compareTo(e2.getFirstkey());
		}else{
			return e1.getSecondkey().get() - e2.getSecondkey().get() ; 
		}
	}
}

 

5 自定义分组  

 

   

package com.wzt.mapreduce.secondsort;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;


//对象分组策略 
//数据放到 reduce前 ,对数据进行分组 
public class GroupComparator extends WritableComparator{

	public GroupComparator() { //注册处理的试题类型 
		super(FirstSortEntity.class,true ) ; 
	}
	
	
	/**
	 * 对排序后的数据 分组, 
	 * 第一个参数相同的,放到一个key的 迭代器 集合中  
	 */
	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		FirstSortEntity e1 = (FirstSortEntity)a;
		FirstSortEntity e2 = (FirstSortEntity)b;
		System.out.println( e1.getFirstkey()+"==GroupComparator = 分组=="+e2.getFirstkey());
		return  e1.getFirstkey().toString().compareTo( e2.getFirstkey().toString());
		//return  e1.getSecondkey().compareTo( e2.getSecondkey());
	}
}

 在以后就是主类中的reduce进行数据处理

  下面这个类作为自己的记录,这里没用:

   

package com.wzt.mapreduce.secondsort;

import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;

//自定义分组比较器
//这个类 暂时没用, 分组比较器的 实现,但没有测试 
public class SSGroupComparator implements RawComparator<FirstSortEntity>{

	@Override
	public int compare(FirstSortEntity o1, FirstSortEntity o2) {
	 
		return o1.getSecondkey().get()>o2.getSecondkey().get()?1:-1;
	}
 
    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
    	 
    	//对象可以这样反序列化 
    	//IntWritable d ; 
    	System.out.println( "SSGroupComparator   自定义分组 =" );
    	ByteArrayInputStream bis = new ByteArrayInputStream(b1);
    	DataInput in1 = new DataInputStream(bis); 
    	FirstSortEntity entity1 = new FirstSortEntity();
    	
    	ByteArrayInputStream bis2 = new ByteArrayInputStream(b2);
    	DataInput in2 = new DataInputStream(bis2); 
    	FirstSortEntity entity2 = new FirstSortEntity();
    	try {
			entity1.readFields(in1);
			entity2.readFields(in2);
		} catch (IOException e) {
			e.printStackTrace();
		}
     
        return entity1.getFirstkey().compareTo( entity2.getFirstkey());
    }
 

}

 

 

 

 

 

分享到:
评论

相关推荐

    16、MapReduce的基本用法示例-自定义序列化、排序、分区、分组和topN

    16、MapReduce的基本用法示例-自定义序列化、排序、分区、分组和topN 网址:https://blog.csdn.net/chenwewi520feng/article/details/130454036 本文介绍MapReduce常见的基本用法。 前提是hadoop环境可正常运行。 ...

    Hadoop中MapReduce基本案例及代码(五)

    对相同分区的数据,按照key进行排序(默认按照字典排序)、分组。相同key的value放在一个集合中。 (可选)分组后对数据进行归约。 注意:MapReduce中,Mapper可以单独存在,但是Reducer不能存在。

    Hadoop 培训课程(4)MapReduce_2

    Hadoop 培训课程(4)MapReduce_2 标准和自定义计数器* Combiner和Partitioner编程** 自定义排序和分组编程** 常见的MapReduce算法** ---------------------------加深拓展---------------------- 常见大数据处理方法*

    htuple:在 MapReduce 中简化复合字段分区、排序和分组的库

    在 MapReduce 中,使用复合映射输出键并自定义对哪些字段进行分区、排序和分组可能很乏味,尤其是在跨多个作业执行此操作时。 这个库的目标是提供一个Tuple类,它可以包含多个元素,并提供一个ShuffleUtils类,为您...

    mapreduceDemo.zip

    主要包括mapreduce自定义分区,排序,分组的过程,最后以实际在生产中的小demo作为演示,可以让初学者对于mapreduce程序有更深层次的了解。

    hadoop段海涛老师八天实战视频

    第一天 hadoop的基本概念 伪分布式hadoop集群安装 hdfs mapreduce 演示 01-hadoop职位需求状况.avi 02-hadoop课程安排.avi ... 04-hadoop的自定义排序实现.avi 05-mr程序中自定义分组的实现.avi

    Hadoop01DemoJar

    温度排序,实现自定义分组分区排序,到出来的jar

    Hadoop实战(第2版)

    技术点46 避免reducer 技术点47 过滤和投影技术点48 使用 combiner技术点49 超炫的使用比较器的快速排序6.4.4 减轻倾斜技术点50 收集倾斜数据技术点51 减轻reducer 阶段倾斜6.4.5 在MapReduce 中优化...

    新版Hadoop视频教程 段海涛老师Hadoop八天完全攻克Hadoop视频教程 Hadoop开发

    04-hadoop的自定义排序实现.avi 05-mr程序中自定义分组的实现.avi 06-shuffle机制.avi 07-mr程序的组件全貌.avi 08-textinputformat对切片规划的源码分析.avi 09-倒排索引的mr实现.avi 10-多个job在同一个...

    Hadoop硬实战 [(美)霍姆斯著][电子工业出版社][2015.01]_PDF电子书下载 带书签目录 高清完整版.rar )

    技术点26 在HDFS、MapReduce、Pig 和Hive 中使用数据压缩 技术点27 在MapReduce、Hive 和Pig 中处理可分割的LZOP 5.3 本章小结 6 诊断和优化性能问题 6.1 衡量MapReduce 和你的环境 6.1.1 提取作业统计...

    hadoop中级班视频教程.zip

    目录网盘文件永久链接 1-MapReduce.rar 2 MapReducel的源简介和自定义类型rar 3 mapReducel的剩余核环节解rar 4 MapReduce的自定V排序和分组rar 5 hadoop的集群安装和安全模式个绍rar 代码部分rar

    Hadoop权威指南 第二版(中文版)

    第2章 关于MapReduce  一个气象数据集  数据的格式  使用Unix工具进行数据分析  使用Hadoop分析数据  map阶段和reduce阶段  横向扩展  合并函数  运行一个分布式的MapReduce作业  Hadoop的Streaming  Ruby...

    Hadoop权威指南(中文版)2015上传.rar

    第2章 关于MapReduce 一个气象数据集 数据的格式 使用Unix工具进行数据分析 使用Hadoop分析数据 map阶段和reduce阶段 横向扩展 合并函数 运行一个分布式的MapReduce作业 Hadoop的Streaming Ruby版本 Python版本 ...

Global site tag (gtag.js) - Google Analytics