论坛首页 移动开发技术论坛

如何在Hadoop里面实现二次排序

浏览 1811 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2014-04-15  
在hadoop里面处理的数据,默认按输入内容的key进行排序的,大部分情况下,都可以满足的我们的业务需求,但有时候,可能出现类似以下的需求,输入内容:

<pre name="code" class="java">秦东亮;72
秦东亮;34
秦东亮;100
三劫;899
三劫;32
三劫;1
a;45
b;567
b;12
</pre>
要求输出1:
<pre name="code" class="java">a 45
b 12,567
三劫 1,32,899
秦东亮 34,72,100
</pre>
要求输出2:
<pre name="code" class="java">a 45
b 12
b 567
三劫 1
三劫 32
三劫 899
秦东亮 34
秦东亮 72
秦东亮 100
</pre>
注意上面的输出1,和输出2,其实都是一样的逻辑,只不过,输出的形式稍微改了下,那么今天散仙,就来分析下,怎么在hadoop里面,实现这样的需求。

其实这样的需求,就类似数据库的标准SQL分组
SELECT A,B FROM TABLE GROUP BY  A,B ORDER BY A,B
当然也不一定,是2个字段分组,可能有2个或2个以上的多个字段分组。
下面,我们先来看下MapReduce内部执行2次排序的流程图,这图是散仙收集的,画的很不错。



由上图可知,Map在处理数据时,先由InputFormat组件提供输入格式,然后Split一行数据,默认的是TextInputFormat,Key为字节偏移量,Value为内容,然后把这行数据,传给Map,Map根据某种约定的分隔符,进行拆分数据,进行业务处理,如果是计数的直接在Value上输出1,在Map输出前,如果有Combine组件,则会执行Combine阶段,进行本地Reduce,一般是用来优化程序用的,Combine执行完后,会执行Partition组件,进行数据分区,默认的是HashPartition,按照输出的Key的哈希值与上Integer的最大值,然后对reduce的个数进行取余得到的值,经过Partition后,数据就会被按桶输出到本地磁盘上,在输出的时候,会按照Key进行排序,然后等所有的Map执行完毕后,就会进入Reduce阶段,这个阶段会进行一个大的混洗阶段,术语叫shuffle,每个reduce都会去每个map输出的分区里面,拉取对应的一部分数据,这个时候,是最耗网络IO,以及磁盘IO的,是影响性能的一个重要瓶颈,当Reduce把所有的数据拉取完毕后,就会进行分组并按照Key进行排序,每处理好一个分组,都会调用一次Reduce函数,进行累加,或其他的业务处理,处理完毕后,就会通过OutputFormat进行输出到HDFS上,至此,整个流程就执行完毕。


代码如下:
<pre name="code" class="java">package com.qin.groupsort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import com.qin.operadb.PersonRecoder;
import com.qin.operadb.ReadMapDB;


/**
* @author qindongliang
*
* 大数据交流群:376932160
*
*
* **/
public class GroupSort {

/**
* map任务
*
* */
public static class GMapper extends Mapper<LongWritable, Text, DescSort, IntWritable>{


private DescSort tx=new DescSort();
private IntWritable second=new IntWritable();

@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
System.out.println("执行map");
// System.out.println("进map了");
//mos.write(namedOutput, key, value);
String ss[]=value.toString().split(";");
String mkey=ss[0];
int mvalue=Integer.parseInt(ss[1]);
tx.setFirstKey(mkey);
tx.setSecondKey(mvalue);
second.set(mvalue);
context.write(tx, second);
}


}




/***
  * Reduce任务
  *
  * **/
public static class GReduce extends Reducer<DescSort, IntWritable, Text, Text>{
@Override
protected void reduce(DescSort arg0, Iterable<IntWritable> arg1, Context ctx)
throws IOException, InterruptedException {
System.out.println("执行reduce");
StringBuffer sb=new StringBuffer();

for(IntWritable t:arg1){

// sb.append(t).append(",");


//con

ctx.write(new Text(arg0.getFirstKey()), new Text(t.toString()));


/**这种写法,是这种输出

*a 45
*b 12
b 567
   三劫 1
  三劫 32
  三劫 899
  秦东亮 34
  秦东亮 72
  秦东亮 100
*/


}

if(sb.length()>0){
sb.deleteCharAt(sb.length()-1);//删除最后一位的逗号
}


// 在循环里拼接,在循环外输出是这种格式
// b 12,567
// 三劫 1,32,899
// 秦东亮 34,72,100
// ctx.write(new Text(arg0.getFirstKey()), new Text(sb.toString()));


}



}


/***
  *
  * 自定义组合键
  * **/
public static class DescSort implements  WritableComparable{

public DescSort() {
// TODO Auto-generated constructor stub
}
private String firstKey;
private int secondKey;


public String getFirstKey() {
return firstKey;
}
public void setFirstKey(String firstKey) {
this.firstKey = firstKey;
}
public int getSecondKey() {
return secondKey;
}
public void setSecondKey(int secondKey) {
this.secondKey = secondKey;
}




// @Override
// public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,
// int arg4, int arg5) {
// return -super.compare(arg0, arg1, arg2, arg3, arg4, arg5);//注意使用负号来完成降序
// }
//
// @Override
// public int compare(Object a, Object b) {
//
// return   -super.compare(a, b);//注意使用负号来完成降序
// }
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
firstKey=in.readUTF();
secondKey=in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(firstKey);
out.writeInt(secondKey);

}
@Override
public int compareTo(Object o) {
// TODO Auto-generated method stub
DescSort d=(DescSort)o;
//this在前代表升序
return this.getFirstKey().compareTo(d.getFirstKey());
}


}


/**
* 主要就是对于分组进行排序,分组只按照组建键中的一个值进行分组
*
* **/
public static class TextComparator extends WritableComparator{

public TextComparator() {
// TODO Auto-generated constructor stub
super(DescSort.class,true);//注册Comparator
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
System.out.println("执行TextComparator分组排序");
DescSort d1=(DescSort)a;
DescSort d2=(DescSort)b;

return  d1.getFirstKey().compareTo(d2.getFirstKey());
}



}

/**
* 组内排序的策略
* 按照第二个字段排序
*
* */
public static class TextIntCompartator extends WritableComparator{

public TextIntCompartator() {
super(DescSort.class,true);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {
DescSort d1=(DescSort)a;
DescSort d2=(DescSort)b;
System.out.println("执行组内排序TextIntCompartator");
if(!d1.getFirstKey().equals(d2.getFirstKey())){
return d1.getFirstKey().compareTo(d2.getFirstKey());
}else{

return d1.getSecondKey()-d2.getSecondKey();//0,-1,1

}
}

}

/**
* 分区策略
*
* */
public static class KeyPartition extends Partitioner<DescSort, IntWritable>{


@Override
public int getPartition(DescSort key, IntWritable arg1, int arg2) {
// TODO Auto-generated method stub
System.out.println("执行自定义分区KeyPartition");
return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%arg2;
}
}


public static void main(String[] args) throws Exception{
JobConf conf=new JobConf(ReadMapDB.class);
//Configuration conf=new Configuration();
    conf.set("mapred.job.tracker","192.168.75.130:9001");
//读取person中的数据字段
    conf.setJar("tt.jar");
//注意这行代码放在最前面,进行初始化,否则会报


/**Job任务**/
Job job=new Job(conf, "testpartion");
job.setJarByClass(GroupSort.class);
System.out.println("模式:  "+conf.get("mapred.job.tracker"));;
// job.setCombinerClass(PCombine.class);



// job.setNumReduceTasks(3);//设置为3
job.setMapperClass(GMapper.class);
job.setReducerClass(GReduce.class);

/**设置分区函数*/
job.setPartitionerClass(KeyPartition.class);

//分组函数,Reduce前的一次排序
job.setGroupingComparatorClass(TextComparator.class);
//组内排序Map输出完毕后,对key进行的一次排序



job.setSortComparatorClass(TextIntCompartator.class);

//TextComparator.class
//TextIntCompartator.class
// job.setGroupingComparatorClass(TextIntCompartator.class);
//组内排序Map输出完毕后,对key进行的一次排序
// job.setSortComparatorClass(TextComparator.class);



job.setMapOutputKeyClass(DescSort.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
   
String path="hdfs://192.168.75.130:9000/root/outputdb";
FileSystem fs=FileSystem.get(conf);
Path p=new Path(path);
if(fs.exists(p)){
fs.delete(p, true);
System.out.println("输出路径存在,已删除!");
}
FileInputFormat.setInputPaths(job, "hdfs://192.168.75.130:9000/root/input");
FileOutputFormat.setOutputPath(job,p );
System.exit(job.waitForCompletion(true) ? 0 : 1); 


}



}
</pre>
在eclipse下,执行,打印日志内容如下:
<pre name="code" class="java">模式:  192.168.75.130:9001
输出路径存在,已删除!
WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1
WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded
INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404152114_0003
INFO - JobClient.monitorAndPrintJob(1393) |  map 0% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 33%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 100%
INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404152114_0003
INFO - Counters.log(585) | Counters: 29
INFO - Counters.log(587) |   Job Counters
INFO - Counters.log(589) |     Launched reduce tasks=1
INFO - Counters.log(589) |     SLOTS_MILLIS_MAPS=7040
INFO - Counters.log(589) |     Total time spent by all reduces waiting after reserving slots (ms)=0
INFO - Counters.log(589) |     Total time spent by all maps waiting after reserving slots (ms)=0
INFO - Counters.log(589) |     Launched map tasks=1
INFO - Counters.log(589) |     Data-local map tasks=1
INFO - Counters.log(589) |     SLOTS_MILLIS_REDUCES=9807
INFO - Counters.log(587) |   File Output Format Counters
INFO - Counters.log(589) |     Bytes Written=86
INFO - Counters.log(587) |   FileSystemCounters
INFO - Counters.log(589) |     FILE_BYTES_READ=162
INFO - Counters.log(589) |     HDFS_BYTES_READ=205
INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=111232
INFO - Counters.log(589) |     HDFS_BYTES_WRITTEN=86
INFO - Counters.log(587) |   File Input Format Counters
INFO - Counters.log(589) |     Bytes Read=93
INFO - Counters.log(587) |   Map-Reduce Framework
INFO - Counters.log(589) |     Map output materialized bytes=162
INFO - Counters.log(589) |     Map input records=9
INFO - Counters.log(589) |     Reduce shuffle bytes=162
INFO - Counters.log(589) |     Spilled Records=18
INFO - Counters.log(589) |     Map output bytes=138
INFO - Counters.log(589) |     Total committed heap usage (bytes)=176033792
INFO - Counters.log(589) |     CPU time spent (ms)=970
INFO - Counters.log(589) |     Combine input records=0
INFO - Counters.log(589) |     SPLIT_RAW_BYTES=112
INFO - Counters.log(589) |     Reduce input records=9
INFO - Counters.log(589) |     Reduce input groups=4
INFO - Counters.log(589) |     Combine output records=0
INFO - Counters.log(589) |     Physical memory (bytes) snapshot=258830336
INFO - Counters.log(589) |     Reduce output records=9
INFO - Counters.log(589) |     Virtual memory (bytes) snapshot=1461055488
INFO - Counters.log(589) |     Map output records=9
</pre>
执行完,我们在输出目录里里面查看



执行完,内容如下:
<pre name="code" class="java">a 45
b 12
b 567
三劫 1
三劫 32
三劫 899
秦东亮 34
秦东亮 72
秦东亮 100
</pre>

我们发现,跟我们预期的结果一致,熟悉MapReduce的执行原理,可以帮助我们更好的使用Hive,因为Hive本身就是一个或多个MapReduce作业构成的,Hive语句的优化,对MapReduce作业的影响的性能也是不容忽视的,所以我们一定要多熟悉熟悉MapReduce编程的模型,以便于我们对它有一个更清晰的认识和了解。





  • 大小: 164.1 KB
  • 大小: 78.7 KB
论坛首页 移动开发技术版

跳转论坛:
Global site tag (gtag.js) - Google Analytics