`
datamachine
  • 浏览: 157312 次
社区版块
存档分类
最新评论

Hadoop如何实现关联计算

阅读更多
    选择Hadoop,低成本和高扩展性是主要原因,但但它的开发效率实在无法让人满意。
    以关联计算为例。
    假设:HDFS上有2个文件,分别是客户信息和订单信息,customerID是它们之间的关联字段。如何进行关联计算,以便将客户名称添加到订单列表中?
    一般方法是:输入2个源文件。根据文件名在Map中处理每条数据,如果是Order,则在foreign key上加标记”O”,形成combined key;如果是Customer则做标记”C”。Map之后的数据按照key分区,再按照combined key分组排序。最后在reduce中合并结果再输出。
实现代码:
public static class JMapper extends Mapper<LongWritable, Text, TextPair, Text> {
    //mark every row with "O" or "C" according to file name
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
    if (pathName.contains("order.txt")) {//identify order by file name
            String values[] = value.toString().split("\t");
            TextPair tp = new TextPair(new Text(values[1]), new Text("O"));//mark with "O"
            context.write(tp, new Text(values[0] + "\t" + values[2]));
        }
   if (pathName.contains("customer.txt")) {//identify customer by file name
           String values[] = value.toString().split("\t");
           TextPair tp = new TextPair(new Text(values[0]), new Text("C"));//mark with "C"
           context.write(tp, new Text(values[1]));
        }
    }
}
public static class JPartitioner extends Partitioner<TextPair, Text> {
    //partition by key, i.e. customerID
    @Override
    public int getPartition(TextPair key, Text value, int numParititon) {
        return Math.abs(key.getFirst().hashCode() * 127) % numParititon;
    }
}
public static class JComparator extends WritableComparator {
    //group by muti-key
    public JComparator() {
        super(TextPair.class, true);
    }
    @SuppressWarnings("unchecked")
    public int compare(WritableComparable a, WritableComparable b) {
        TextPair t1 = (TextPair) a;
        TextPair t2 = (TextPair) b;
        return t1.getFirst().compareTo(t2.getFirst());
    }
}
public static class JReduce extends Reducer<TextPair, Text, Text, Text> {
    //merge and output
    protected void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException,InterruptedException {
    Text pid = key.getFirst();
    String desc = values.iterator().next().toString();
    while (values.iterator().hasNext()) {
        context.write(pid, new Text(values.iterator().next().toString() + "\t" + desc));
   }
    }
}
public class TextPair implements WritableComparable<TextPair> {
    //make muti-key
    private Text first;
    private Text second;
    public TextPair() {
        set(new Text(), new Text());
    }
    public TextPair(String first, String second) {
        set(new Text(first), new Text(second));
    }
    public TextPair(Text first, Text second) {
        set(first, second);
    }
    public void set(Text first, Text second) {
  this.first = first;
  this.second = second;
    }
    public Text getFirst() {
  return first;
    }
    public Text getSecond() {
  return second;
    }
    public void write(DataOutput out) throws IOException {
  first.write(out);
  second.write(out);
    }
    public void readFields(DataInput in) throws IOException {
  first.readFields(in);
  second.readFields(in);
    }
    public int compareTo(TextPair tp) {
  int cmp = first.compareTo(tp.first);
  if (cmp != 0) {
       return cmp;
  }
    return second.compareTo(tp.second);
    }
}
public static void main(String agrs[]) throws IOException, InterruptedException, ClassNotFoundException {
    //job entrance
    Configuration conf = new Configuration();
    GenericOptionsParser parser = new GenericOptionsParser(conf, agrs);
    String[] otherArgs = parser.getRemainingArgs();
    if (agrs.length < 3) {
   System.err.println("Usage: J <in_path_one> <in_path_two> <output>");
   System.exit(2);
    }
    Job job = new Job(conf, "J");
    job.setJarByClass(J.class);//Join class
    job.setMapperClass(JMapper.class);//Map class
    job.setMapOutputKeyClass(TextPair.class);//Map output key class
    job.setMapOutputValueClass(Text.class);//Map output value class
    job.setPartitionerClass(JPartitioner.class);//partition class
    job.setGroupingComparatorClass(JComparator.class);//condition group class after partition
    job.setReducerClass(Example_Join_01_Reduce.class);//reduce class
    job.setOutputKeyClass(Text.class);//reduce output key class
    job.setOutputValueClass(Text.class);//reduce ouput value class
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//one of source files
    FileInputFormat.addInputPath(job, new Path(otherArgs[1]));//another file
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));//output path
    System.exit(job.waitForCompletion(true) ? 0 : 1);//run untill job ends
}

    不能直接使用原始数据,而是要搞一堆代码处理标记,并绕过MapReduce原本的架构,最后从底层设计并计算数据之间的关联关系。这还是最简单的关联计算,如果用MapReduce进行多表关联或逻辑更复杂的关联计算,复杂度会呈几何级数递增。
0
0
分享到:
评论

相关推荐

    Hadoop开发者第一期入门专刊

    目录:1 Hadoop 介绍 2 Hadoop 在国内应用情况 3 Hadoop 源代码eclipse 编译教程 7 在Windows 上安装Hadoop 教程 13 在Linux 上安装Hadoop...59 表关联在MapReduce 上的实现 63 Hadoop 计算平台和Hadoop 数据仓库的区别

    基于 hadoop实现的金庸江湖人物关系网分析+源代码+文档说明

    在人物关系图中,通过标签传播算法可以将关联度比较大的人物分到同一标签,可以直观地分析人物间的关系。 -------- 不懂运行,下载完可以私聊问,可远程教学 该资源内项目源码是个人的毕设,代码都测试ok,都是...

    基于Java(hadoop)实现的图书推荐系统【100011261】

    大学SDU大数据BigData课程设计,基于hadoop实现的图书推荐系统。 基于 Apriori 关联规则挖掘算法进行图书推荐的应用算法设计和实现,将利用大量图书评论数据,使用 MapReduce 并行化处理技术来完成图书的 k-频繁项集...

    基于Hadoop&amp,Spark的关联规则实践+源代码+文档说明

    基于Hadoop&Spark的关联规则算法实践 --- ### 1. 实践目的 &gt; 此次实践主要目的在于,希望通过亲身实践,加深自己对Hadoop、Spark两类大数据工具的理解,熟悉其从集群部署到运作的基本流程,了解FPGrowth算法的基本...

    Hadoop实战丛书

    《Hadoop实战》作为云计算所青睐的分布式架构,Hadoop是一个用Java语言实现的软件框架,在由大量计算机组成的集群中运行海量数据的分布式计算,是谷歌实现云计算的重要基石。《Hadoop实战》分为3个部分,深入浅出地...

    大数据环境下基于Hadoop框架的数据挖掘算法的研究与实现

    为了提高大数据环境下的数据挖掘速度,对分布式计算构架Hadoop进行分析与研究,提出一种基于Hadoop平台的大数据关联规则挖掘算法MRPrePost。该算法在PrePost算法基础上改进而来,采用Hadoop平台降低分布式编程的难度...

    一种基于Hadoop的并行关联规则算法 (2011年)

    针对传统的并行计算,存在不能处理节点失效,难以处理负载均衡等问题,提出基于Hadoop架构实现并行关联规则计算的设计.理论和实验证明,基于Hadoop的并行关联规则计算,能处理节点失效,并且能做到节点负载均衡.

    Hadoop海量网络数据处理平台的关键技术

    首先该算法在主节点中引入了节点动态性能推断模块,该模块采用基于指数平滑预测法实现对该集群中运行的作业历史数据学习分析,从而计算出集群中各个节点的计算能力。然后本文结合集群节点的性能指标对Reduce任务分配...

    基于Map_Reduce的并行关联分析方法

    本文在研究BIRCH算法、规则...map/reduce相结合,实现了算法的并行化计算,在一定程度上提高了算法的运行效率。最后的 实验结果表明,采用该关联方法可以有效提高聚类效果,减少重复报警,缩短关联分析的执行 时间。

    hadoop倒排索引实现 完整代码+报告

    因为两者代码具有关联性,故放在一起说。 首先在基本要求中,Map 我们对于输入的文件每句进行切割,将单词与文件名作为(text)key,并且对每个词设置词频 1(text)。 接下来在 combiner 中,我们统计每个单词的 ...

    基于Hadoop集群的多表并行关联算法及应用

    针对因特网环境下并行数据库实现多个大数据表关联存在的计算瓶颈,基于 Hadoop集群设计了一个并行关联多个大数据表的简便算法MR_Join。以商业网站凡客诚品的销售数据为例进行实验,验证算法的可行性并做出应用实例。...

    滴滴出行实时计算系统架构及实践

    (5)实时数据流与外部系统或服务关联计算 2. OLAP系统架构选型:KV store与column store 哪种存储更适合OLAP应用? 3. 简要介绍Kafka的特性,阐述为何Kafka是实时计算系统中理想的数据存储方案 4. 详细介绍Druid的...

    基于spark的电商商品智能分析系统,采用流式计算电商商品关注度,实现商品智能推荐及关联分析.zip

    两种东西,其一是IBM微软数据产品为代表的,其二是Hadoop+Hive+Apache Hive数据仓库软件有助于使用SQL读取,写入和管理驻留在分布式存储中的大型数据集。 可以将结构投影到已经存储的数据上。 提供了命令行工具和...

    大数据应用测试经验总结.pdf

    计算引擎:使⽤Hive on Tez计算引擎实现ETL跑批任务;使⽤spark streaming实现实时计算;使⽤Phoenix做前台交互式查询。 3. 数据存储:使⽤Kafka、Hive、Hbase、MySQL满⾜各层次存储技术需求。 4. 任务调度:使⽤...

    Stock-Volatility-Hadoop-MapReduce

    Mapper-1的方法和实现我们给了2970个股票数据,每个数据具有三年的数据。 第一个mapreduce作业将读取所有输入的CSV文件,且映射器的数量等于文件的数量。 每个映射器将逐行读取数据。 每行包含7个数据信息,即日期,...

    软件技术《行业标准-人工智能》.doc

    目前流行的分布式计算框架如 OpenStack、Hadoop、Storm、Spark、Samza、Bigflow 等。各种开源深度学 习框架也层出不穷,其中包括 TensorFlow、Caffe、Keras、CNTK、Torch7、MXNet、 Leaf、Theano、DeepLearning4、...

    hash-join-algorithm-mapreduce:哈希连接算法的 Java 实现,该算法在给定连接键的情况下计算多个表的连接操作

    这是哈希连接算法的 Java 实现,它在给定连接键的几个表上应用连接操作。 该算法已经在机场频率数据集和机场通用数据集上进行了测试。... 在 src/HashJoinMapReduce 中: 您可以找到使用 Hadoop 的分布式实现

    MapReduce的模式、算法和用例

    所有描述性的文字和代码都使用了标准hadoop的MapReduce模型,包括Mappers,Reduces,Combiners,Partitioners,和sorting。如下图所示。问题陈述:有许多文档,每个文档都有一些字段组成。需要计算出每个字段在所有文档中...

    python大数据-为什么Python编程非常适合大数据?.pdf

    这些库包含以下软件包: 数值计算 数据分析 统计分析 可视化 机器学习 Python与 与Hadoop的兼容性 的兼容性 Python和Hadoop都是开源⼤数据平台。 这就是为什么Python⽐其他编程语⾔更兼容Hadoop的原因。 您可以将...

    大数据建设与应用汇报.pptx

    当前状况:Hadoop分布式计算框架被广泛应用 胜出 大数据建设与应用汇报全文共26页,当前为第6页。 运营商到底有哪些数据 e7d195523061f1c01ef2b70529884c179423570dbaad84926380ABC1F97BAEF0C8FC051856578EAB7874501...

Global site tag (gtag.js) - Google Analytics