- 浏览: 23882 次
- 性别:
- 来自: 深圳
文章分类
最新评论
公司提了一个要求,要基于阿里云的ODPS实现一个简单的数据ETL Demo。
基本需求如下:多条身份证,姓名,来源部门信息遵循两条规则, 有权威部门则采用权威部门数据,无权威部门则采用出现次数多权重数据。
实现过程如下:
1.去阿里云申请accessID, accessKey
2.下载SDK开发工具
3.下载ODPS Eclipse插件并集成
4.仿造WordCount例子实现需求
具体表结构及代码
project=example_project
table=etl_in
columns=idcard:STRING,name:STRING,dept:STRING
522635****0114890X,张三,公安
522635****0114890X,张三,人社
522635****0114890X,张四,计生
522635****09122343,李四,计生
522635****09122343,张四,人社
522635****09122343,张四,综治
522635****09122343,张四,教育
622635****01148903,王五,公安
622635****01148903,王八,计生
622635****01148903,王八,民政
project=example_project
table=etl_out
columns=idcard:STRING,name:STRING
结果数据
522635****09122343,张四
522635****0114890X,张三
622635****01148903,王五
package com.aliyun.odps.examples.mr;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.RunningJob;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
public class EtlTools {
public static class TokenizerMapper extends MapperBase {
Record idCard;
Record name_dept;
Record result;
@Override
public void setup(TaskContext context) throws IOException {
idCard = context.createMapOutputKeyRecord();
name_dept = context.createMapOutputValueRecord();
}
@Override
public void map(long recordNum, Record record, TaskContext context) throws IOException {
idCard.set(new Object[]{record.get("idcard").toString()});
name_dept.set(new Object[] {record.get("name").toString() + "_" + record.get("dept").toString()});
context.write(idCard, name_dept);
}
}
/**
* A combiner class that combines map output by sum them.
*/
public static class SumCombiner extends ReducerBase {
private Record count;
@Override
public void setup(TaskContext context) throws IOException {
count = context.createMapOutputValueRecord();
}
@Override
public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
System.out.println("reduce key:" + key.get(0).toString());
Map<String, Integer> name2Dept = new HashMap<String, Integer>();
String finalName ="";
Integer countValue = 0;
while (values.hasNext()) {
Record val = values.next();
System.out.println("value: "+val.get(0));
String[] vals = (val.get(0).toString()).split("_");
String name = vals[0];
String dept = vals[1];
if("公安".equals(dept)) {//权威部门,规则一
finalName = name;
break;
}
if(name2Dept.containsKey(name)) {
Integer count = name2Dept.get(name) + 1;
name2Dept.put(name, count);
} else {
name2Dept.put(name, 1);
}
}
//规则二,权重,次数多
for(String name : name2Dept.keySet()) {
Integer val = name2Dept.get(name);
if (val.intValue() > countValue.intValue()) {
countValue = val;
finalName = name;
}
}
System.out.println("key: " + key.toString());
System.out.println("finalName: " + finalName);
count.set(new Object[] {finalName});
context.write(key, count);
}
}
/**
* A reducer class that just emits the sum of the input values.
*/
public static class SumReducer extends ReducerBase {
private Record result;
@Override
public void setup(TaskContext context) throws IOException {
result = context.createOutputRecord();
}
@Override
public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
Object obj = null;
while (values.hasNext()) {
Record val = values.next();
obj = val.get(0);
}
result.set(0, key.get(0));
result.set(1, obj);
context.write(result);
}
}
public static void main(String[] args) throws OdpsException {
JobConf conf = new JobConf();
conf.setMapperClass(TokenizerMapper.class);
conf.setCombinerClass(SumCombiner.class);
conf.setReducerClass(SumReducer.class);
conf.setMapOutputKeySchema(SchemaUtils.fromString("idcard:string"));
conf.setMapOutputValueSchema(SchemaUtils.fromString("name:string"));
InputUtils.addTable(TableInfo.builder().tableName("etl_in").cols(new String[]{ "idcard", "name", "dept"}).build(), conf);
OutputUtils.addTable(TableInfo.builder().tableName("etl_out").build(), conf);
RunningJob job = JobClient.runJob(conf);
job.waitForCompletion();
}
}
基本需求如下:多条身份证,姓名,来源部门信息遵循两条规则, 有权威部门则采用权威部门数据,无权威部门则采用出现次数多权重数据。
实现过程如下:
1.去阿里云申请accessID, accessKey
2.下载SDK开发工具
3.下载ODPS Eclipse插件并集成
4.仿造WordCount例子实现需求
具体表结构及代码
project=example_project
table=etl_in
columns=idcard:STRING,name:STRING,dept:STRING
522635****0114890X,张三,公安
522635****0114890X,张三,人社
522635****0114890X,张四,计生
522635****09122343,李四,计生
522635****09122343,张四,人社
522635****09122343,张四,综治
522635****09122343,张四,教育
622635****01148903,王五,公安
622635****01148903,王八,计生
622635****01148903,王八,民政
project=example_project
table=etl_out
columns=idcard:STRING,name:STRING
结果数据
522635****09122343,张四
522635****0114890X,张三
622635****01148903,王五
package com.aliyun.odps.examples.mr;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.RunningJob;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
public class EtlTools {
public static class TokenizerMapper extends MapperBase {
Record idCard;
Record name_dept;
Record result;
@Override
public void setup(TaskContext context) throws IOException {
idCard = context.createMapOutputKeyRecord();
name_dept = context.createMapOutputValueRecord();
}
@Override
public void map(long recordNum, Record record, TaskContext context) throws IOException {
idCard.set(new Object[]{record.get("idcard").toString()});
name_dept.set(new Object[] {record.get("name").toString() + "_" + record.get("dept").toString()});
context.write(idCard, name_dept);
}
}
/**
* A combiner class that combines map output by sum them.
*/
public static class SumCombiner extends ReducerBase {
private Record count;
@Override
public void setup(TaskContext context) throws IOException {
count = context.createMapOutputValueRecord();
}
@Override
public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
System.out.println("reduce key:" + key.get(0).toString());
Map<String, Integer> name2Dept = new HashMap<String, Integer>();
String finalName ="";
Integer countValue = 0;
while (values.hasNext()) {
Record val = values.next();
System.out.println("value: "+val.get(0));
String[] vals = (val.get(0).toString()).split("_");
String name = vals[0];
String dept = vals[1];
if("公安".equals(dept)) {//权威部门,规则一
finalName = name;
break;
}
if(name2Dept.containsKey(name)) {
Integer count = name2Dept.get(name) + 1;
name2Dept.put(name, count);
} else {
name2Dept.put(name, 1);
}
}
//规则二,权重,次数多
for(String name : name2Dept.keySet()) {
Integer val = name2Dept.get(name);
if (val.intValue() > countValue.intValue()) {
countValue = val;
finalName = name;
}
}
System.out.println("key: " + key.toString());
System.out.println("finalName: " + finalName);
count.set(new Object[] {finalName});
context.write(key, count);
}
}
/**
* A reducer class that just emits the sum of the input values.
*/
public static class SumReducer extends ReducerBase {
private Record result;
@Override
public void setup(TaskContext context) throws IOException {
result = context.createOutputRecord();
}
@Override
public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
Object obj = null;
while (values.hasNext()) {
Record val = values.next();
obj = val.get(0);
}
result.set(0, key.get(0));
result.set(1, obj);
context.write(result);
}
}
public static void main(String[] args) throws OdpsException {
JobConf conf = new JobConf();
conf.setMapperClass(TokenizerMapper.class);
conf.setCombinerClass(SumCombiner.class);
conf.setReducerClass(SumReducer.class);
conf.setMapOutputKeySchema(SchemaUtils.fromString("idcard:string"));
conf.setMapOutputValueSchema(SchemaUtils.fromString("name:string"));
InputUtils.addTable(TableInfo.builder().tableName("etl_in").cols(new String[]{ "idcard", "name", "dept"}).build(), conf);
OutputUtils.addTable(TableInfo.builder().tableName("etl_out").build(), conf);
RunningJob job = JobClient.runJob(conf);
job.waitForCompletion();
}
}
发表评论
-
Canal相关理解
2017-12-29 16:18 432转载:http://www.importnew.com/251 ... -
kettle部署
2017-12-26 16:04 6701.将jmbi sql先上生产环境, 参考附件jmbi.sql ... -
crontab定时运行MR不行,手动shell可以执行成功问题排查过程
2017-12-26 15:48 788设置了定时任务,但MR任务没有执行。 第一步:手动执行she ... -
Flume+kafka+Spark Steaming demo2
2017-11-22 13:15 433一,flume配置 # Name the components ... -
Flume+Kafka+Spark Steaming demo
2017-11-21 15:21 411一.准备flume配置 a1.sources = r1 a1. ... -
HBase表导出成HDFS
2017-10-19 19:40 860导出步骤:在old cluster上/opt/cloudera ... -
zepplin实战
2017-10-13 16:10 336一句话介绍Zeppelin 以笔记(Note)的形式展示的数据 ... -
Azkaban安装
2017-10-10 18:32 879一.下载 https://github.com/azkaban ... -
KYKIN安装
2017-09-30 17:35 121. Kylin的一些概念 No. 关键字 解释 1 Kyl ... -
KYKIN安装
2017-09-30 17:40 3331. Kylin的一些概念 No. 关键字 解释 1 Kyl ... -
Logstash安装部署配置
2017-04-28 10:24 958为了实现各业务平台日志信息采集到大数据平台hdf ... -
HBASE API
2017-04-18 11:01 443package org.jumore.test; impor ... -
Ambari卸载shell
2017-03-28 17:28 436#!/bin/bash # Program: # uni ... -
linux ssh 相互密码登录
2017-02-22 13:40 3601.修改集群各机器名称 vim /etc/sysconfig/ ... -
Kettle Linux 安装部署
2017-02-15 17:20 1294一.安装JDK环境:根据自己的linux系统选择相应的版本,比 ... -
hadoop环境搭建
2017-01-23 17:31 326192.168.23.231 server1 192.168. ... -
环境安装
2017-01-17 16:26 365物理机部署分配 3台物理机上部署 Zookeeper 3个,F ... -
Storm demo
2016-12-19 15:50 417public class SentenceSpout exte ... -
运行Hadoop jar 第三方jar包依赖
2016-08-22 13:47 963将自己编写的MapReduce程序打包成jar后,在运行 ha ... -
windows10下运行MR错误
2016-07-05 13:45 1577当在windows下运行MR程序时,会报各种错误。现把这次碰到 ...
相关推荐
【大纲】 ODPS 介绍 ODPS MapReduce MapReduce 作为开放服务
该文档来自于阿里巴巴数据平台事业部ODPS技术专家少杰 (徐东),在2014中国大数据技术大会大数据技术分论坛的演讲“ODPS MapReduce对外开放实践”。
ODPS MapReduce 实现和开放实践.zip
包含5个pdf文档(都是内部文档截止到2014-4-28未开放的): Map-Reduce SDK简介 — ODPS mapreduce快速入门 — ODPS MapReduce — ODPS 如何运行MapReduce — ODPS 应用限制 — ODPS
ODPS Eclipse插件 ...右键单击类WordCount(或类Resource)->运行方式-> ODPS Mapreduce->输入参数->完成 单击菜单栏中的运行->运行配置->右键单击ODPS Mapreduce->新建->输入参数->运行 本地Debug UDF
mapreduce - ch07, ch08 xlab - ch09,使用机器学习算法 use_sdk - ch10,使用ODPS SDK访问ODPS服务 as_dba - ch11,账户/资源/数据管理 数据 - 用于演示书中示例的数据 图像 - 书中的一些(彩色)图像 接触 任何...
阿里云ODPS(Open Data Processing Service)是一种大规模数据处理服务,提供了基于SQL的数据处理能力。ODPS SQL是ODPS的一部分,提供了类似于SQL的语法,用于处理大规模数据。 ODPS SQL的特点 ODPS SQL采用的是...
开放数据处理服务(Open Data Processing Service,ODPS)是基于飞天分布式系统构建的海 量数据处理和分析的服务平台,具有 PB 级别的数据处理能力, 主要适用于实时性要求不高 的海量数据处理,如数据分析、海量数据...
Java连接ODPS文档和代码
ODPS JDBC 驱动程序有两种方法。 1.第一个是使用独立库: 从 . 结帐。 2.二是依靠maven为你解决依赖: < dependency > < groupId >com.aliyun.odps</ groupId > < artifactId >odps-jdbc</ ...
阿里云odpsSql手册1
odps(MaxCompute) 权威详尽说明帮助手册,包括odps 底层优化原理
ODPS是分布式的海量数据处理平台,提供了丰富的数据处理功能和灵活的编程框架。...2.SQL:基于SQL92并进行了本地化扩展,可用于构建大规模数据仓库和企业BI系统,是应用最为广泛的一类服务。3.DAG编程模型:类似Hadoo
用来连接odps的客户端,可以在ideal中引入该插件进行连接odps
阿里的odps的使用说明,简单快速上手,希望对新手有一些帮助
ODPS
阿里巴巴的 odps 文档,帮助新手快速扫盲 本资料共包含以下附件: odps_SQL.pdf
odps的eclipse插件
odps权威指南最新版
阿里odps开放平台的参考手册,里面详细介绍了odps的操作