`
Belinda407
  • 浏览: 33462 次
  • 性别: Icon_minigender_2
  • 来自: 北京
社区版块
存档分类
最新评论

通过复合key借助hadoop自身的排序实现secondary排序

 
阅读更多

问题描述:数据结构<任务id,资源类型,发布日期,词,频度>

已经按 任务id,资源类型,发布日期,词 汇总了频度信息,现在需要以<任务id,资源类型,发布日期>为分组,组内按频度倒排,提取前200条记录

 

参考hadoop自带示例中的org.apache.hadoop.examples.SecondarySort实现

复合key: WordFreq<TagHead,词,频度>,其中TagHead表达group,即<任务id,资源类型,发布日期>

1.在WordFreq中通过Override compareTo实现组内按频度倒排

@Override

public int compareTo(WordFreq other) {

return ComparisonChain.start()

.compare(this.getGroup(), other.getGroup())

.compare(other.count, this.count)

.compare(this.tag.getWord(), other.tag.getWord()).result();

}

2.在TagHead中Override如下3项

@Override

public int compareTo(TagHead other) {

return ComparisonChain.start().compare(this.tagsid, other.tagsid)

.compare(this.sourceType, other.sourceType)

.compare(this.releaseDateDay, other.releaseDateDay).result();

}

 

@Override

public boolean equals(Object o) {

if (o instanceof TagHead) {

TagHead other = (TagHead) o;

return this.tagsid.equals(other.tagsid)

&& this.sourceType.equals(other.sourceType)

&& this.releaseDateDay.equals(other.releaseDateDay);

}

return false;

}

 

@Override

public int hashCode() {

int hash = (this.tagsid != null ? Integer.parseInt(this.tagsid) : 0);

hash += (this.sourceType != null ? Integer.parseInt(this.sourceType) * 13

: 0);

hash += (this.releaseDateDay != null ? Integer

.parseInt(this.releaseDateDay.replace("-", "")) * 7 : 0);

return hash;

 

}

 

3.简单项:SimpleWordFreq<词,频度>

4.

public class SubSortingWordFreqMapper extends

 

Mapper<LongWritable, Text, WordFreq, SimpleWordFreq>{

...

}

 

public static class SubSortingWordFreqReducer extends

Reducer<WordFreq, SimpleWordFreq, Text, NullWritable> {

@Override

protected void reduce(WordFreq key, Iterable<SimpleWordFreq> values,

Context context) throws IOException, InterruptedException {

for (SimpleWordFreq value : values) {

...

}

 

}

}

5.自定义Partitioner,计算nature key即group的哈希值

public class TagCloudPartitioner extends Partitioner<WordFreq, SimpleWordFreq> {

private static Logger log = LoggerFactory

.getLogger(TagCloudPartitioner.class);

 

@Override

public int getPartition(WordFreq key, SimpleWordFreq value, int numPartitions) {

int hashCode = key.getGroup().hashCode();

log.debug(key.getGroup().getHead("_") + ";hashCode=" + hashCode);

return hashCode % numPartitions;

}

 

}

 

6.自定义groupComparator

public class TagCloudHeadGroupingComparator extends WritableComparator {

 

protected TagCloudHeadGroupingComparator() {

super(WordFreq.class, true);

}

 

@Override

public int compare(WritableComparable tp1, WritableComparable tp2) {

WordFreq wordFreq = (WordFreq) tp1;

WordFreq wordFreq2 = (WordFreq) tp2;

return wordFreq.compareGroup(wordFreq2);

}

}

7.调用时特殊设置

job.setPartitionerClass(TagCloudPartitioner.class);

job.setGroupingComparatorClass(TagCloudHeadGroupingComparator.class);

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics