`

基于ODPS的MapReduce例子

阅读更多
公司提了一个要求,要基于阿里云的ODPS实现一个简单的数据ETL Demo。
基本需求如下:多条身份证,姓名,来源部门信息遵循两条规则, 有权威部门则采用权威部门数据,无权威部门则采用出现次数多权重数据。
实现过程如下:
1.去阿里云申请accessID, accessKey
2.下载SDK开发工具
3.下载ODPS Eclipse插件并集成
4.仿造WordCount例子实现需求

具体表结构及代码

project=example_project
table=etl_in
columns=idcard:STRING,name:STRING,dept:STRING

522635****0114890X,张三,公安
522635****0114890X,张三,人社
522635****0114890X,张四,计生
522635****09122343,李四,计生
522635****09122343,张四,人社
522635****09122343,张四,综治
522635****09122343,张四,教育
622635****01148903,王五,公安
622635****01148903,王八,计生
622635****01148903,王八,民政


project=example_project
table=etl_out
columns=idcard:STRING,name:STRING
结果数据
522635****09122343,张四
522635****0114890X,张三
622635****01148903,王五



package com.aliyun.odps.examples.mr;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import com.aliyun.odps.OdpsException;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.RunningJob;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;

public class EtlTools {

  public static class TokenizerMapper extends MapperBase {

    Record idCard;
    Record name_dept;
    Record result;

    @Override
    public void setup(TaskContext context) throws IOException {
      idCard = context.createMapOutputKeyRecord();
      name_dept = context.createMapOutputValueRecord();
    }

    @Override
    public void map(long recordNum, Record record, TaskContext context) throws IOException {         
    idCard.set(new Object[]{record.get("idcard").toString()});
    name_dept.set(new Object[] {record.get("name").toString() + "_" + record.get("dept").toString()});
    context.write(idCard, name_dept);    
    }
  }

  /**
   * A combiner class that combines map output by sum them.
   */
  public static class SumCombiner extends ReducerBase {
    private Record count;

    @Override
    public void setup(TaskContext context) throws IOException {
      count = context.createMapOutputValueRecord();
    }

    @Override
    public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
   
      System.out.println("reduce key:" + key.get(0).toString());
      Map<String, Integer>  name2Dept = new HashMap<String, Integer>();
      String finalName ="";
      Integer countValue = 0;
      while (values.hasNext()) {
        Record val = values.next();
        System.out.println("value: "+val.get(0));
        String[] vals = (val.get(0).toString()).split("_");
        String name = vals[0];
        String dept = vals[1];
        if("公安".equals(dept)) {//权威部门,规则一
        finalName = name;
        break;
        }
      
        if(name2Dept.containsKey(name)) {
        Integer count = name2Dept.get(name) + 1;
        name2Dept.put(name, count);
       
        } else {
        name2Dept.put(name, 1);
        }               
    
      }
   
      //规则二,权重,次数多
      for(String name : name2Dept.keySet()) {
    Integer val =  name2Dept.get(name);
    if (val.intValue() > countValue.intValue()) {
    countValue = val;
    finalName = name;
    }    
     
      }
     
      System.out.println("key: " + key.toString());
      System.out.println("finalName: " + finalName);
      count.set(new Object[] {finalName});
      context.write(key, count);
    }
  }

  /**
   * A reducer class that just emits the sum of the input values.
   */
  public static class SumReducer extends ReducerBase {
    private Record result;
 

    @Override
    public void setup(TaskContext context) throws IOException {
      result = context.createOutputRecord();      
    }

    @Override
    public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {

      Object obj = null;
      while (values.hasNext()) {
        Record val = values.next();
        obj = val.get(0);   
      }
      result.set(0, key.get(0));
      result.set(1, obj);
      context.write(result);
    }
  }

public static void main(String[] args) throws OdpsException {
JobConf conf = new JobConf();
conf.setMapperClass(TokenizerMapper.class);
conf.setCombinerClass(SumCombiner.class);
conf.setReducerClass(SumReducer.class);
conf.setMapOutputKeySchema(SchemaUtils.fromString("idcard:string"));
conf.setMapOutputValueSchema(SchemaUtils.fromString("name:string"));
InputUtils.addTable(TableInfo.builder().tableName("etl_in").cols(new String[]{ "idcard", "name", "dept"}).build(), conf);
OutputUtils.addTable(TableInfo.builder().tableName("etl_out").build(), conf);

RunningJob  job = JobClient.runJob(conf);
job.waitForCompletion();
}

}



分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics