MapReduce处理数据的基本原则之一就是将输入数据分割成片(split),按片读取数据,每个分片交由一个Mapper去做处理。注意,输入分片只是一种逻辑划分,有别于hdfs的数据分块(block),数据分块是数据的物理划分。InputFormat类抽象出了数据分片和读取这两个操作,具体实现交由子类去完成,除了hadoop默认提供的一些子类之外,我们可以自己根据实际需要进行扩展。
下图列出了涉及MapReduce读取数据的几个核心类以及常见的几种扩展。
如图所示,InputFormat类抽象了两个方法,创建分片的getSplit( )和创建数据读取工具的createRecordReader( ),可以扩展InputFormat重写这两个方法来实现不同数据源读取数据,或者采用不同的方式读取数据。
InputSplit表示数据的逻辑分片,最常见的是用于表示文本文件分片的FileSplit类,该类扩展了InputSpit,包含了文件的路径、分片起始位置在源文件中的字节偏移量、分片的字节长度以及分片所属文件块存在数据节点信息。用于分片信息在客户端提交作业时会被序列化到文件然后提交,并且在作业执行中会被反序列化,所以FileSplit还实现了Writable接口,实现了write(DataOutput out)和readFields(DataInput in)两个方法。
正真为MapReduce提供数据输入的是RecordReader类,给它分配一个分片,它就从数据源中读取分片指定的数据段、并将数据组织成指定的数据结构。
hadoop默认提供的一些数据读取类基本可以满足多数需求,特殊情况下我们也可以自己扩展。以下用一个简单了例子介绍扩展方式。
扩展的目的是从多个不同类型的数据库(mysql、oracle、db2等)、或者多张表读取数据。代码如下:
public class MultiTableInputSplit extends DBInputSplit implements Writable{ //查询sql private String inputSql; //数据库链接url private String dbConUrl; //数据库用户名 private String userName; private String passWord; //数据库类型 private String dbType; //作业执行过程中通过反射创建实例,所以必须有无参构造函数 public MultiTableInputSplit(){ } public MultiTableInputSplit(long start, long end, String intputSql, String dbConUrl, String userName, String passWord, String dbType) { super(start, end); this.inputSql = intputSql; this.dbConUrl = dbConUrl; this.userName = userName; this.passWord = passWord; this.dbType = dbType; } @Override public void write(DataOutput output) throws IOException { super.write(output); output.writeUTF(inputSql); output.writeUTF(dbConUrl); output.writeUTF(userName); output.writeUTF(passWord); output.writeUTF(dbType); } @Override public void readFields(DataInput input) throws IOException { super.readFields(input); this.inputSql = input.readUTF(); this.dbConUrl = input.readUTF(); this.userName = input.readUTF(); this.passWord = input.readUTF(); this.dbType = input.readUTF(); } //get、set等方法省略 }
MultiInputSplit 类间接扩展了InputSplit类,添加了数据库连接信息和查询数据所使用的sql语句。
public class MultiTableInputFormat extends InputFormat<LongWritable, MapDBWritable>{ /** * 单表分片 * sqlInfo字符串的格式为:单个sqlInfo里面各个属性之间用"##"分隔,如: * dbType##driver##url##user##password##sql##counts * 多个sqlInfo之间用"#_#"分隔,如: * sqlInfo1#_#sqlInfo2 */ @Override public List<InputSplit> getSplits(JobContext job) throws IOException { List<InputSplit> splits = new ArrayList<InputSplit>(); String inputQuery = job.getConfiguration().get("sqlInfo"); String[] inputQueryArr = inputQuery.split("#_#"); for(String sql : inputQueryArr){ getSplit(sql, job, splits); } return splits; } @Override public RecordReader<LongWritable, MapDBWritable> createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { try { return new MultiTableRecordReader((MultiTableInputSplit)split, context.getConfiguration()); } catch (SQLException ex) { throw new IOException(ex.getMessage()); } } /** * 可以根据表的数量、表的大小控制分片的数量 */ private int getSplitCount(String[] sqlInfo) { return 1; //简单实现 } /** * 计算分片的大小 */ private int getSplitSize(String[] sqlInfo){ return 100000; //简单实现 } public void getSplit(String inputQuery, JobContext job, List<InputSplit> splits){ String[] sql = inputQuery.split("##"); int recordCount = Integer.parseInt(sql[6]); long splitCount = getSplitCount(sql); long splitSize = getSplitSize(sql); for (int i = 0; i < splitCount; i++) { InputSplit split; if (i + 1 == splitCount) { split = new MultiTableInputSplit(i * splitSize, recordCount + 1, sql[5], sql[2], sql[3], sql[4], sql[0]); } else { split = new MultiTableInputSplit(i * splitSize, i * splitSize + splitSize, sql[5], sql[2], sql[3], sql[4], sql[0]); } splits.add(split); } } }
class MultiTableRecordReader extends RecordReader<LongWritable, MapDBWritable> { protected ResultSet results = null; protected DBInputFormat.DBInputSplit split; protected long pos = 0; protected LongWritable key = null; protected MapDBWritable value = null; protected String dbType; protected Connection connection; protected Statement statement; public MultiTableRecordReader(DBInputFormat.DBInputSplit split, Configuration conf) throws SQLException { this.split = split; initConnection(); //初始化数据库链接 } @Override public boolean nextKeyValue() throws IOException { try { if (key == null) { key = new LongWritable(); } if (value == null) { value = new MapDBWritable(); } if (null == this.results) { this.results = executeQuery(getSelectQuery()); } if (!results.next()) { return false; } key.set(pos + split.getStart()); value.readFields(results); pos++; } catch (SQLException e) { throw new IOException("SQLException in nextKeyValue", e); } return true; } @Override public LongWritable getCurrentKey() { return key; } @Override public MapDBWritable getCurrentValue() { return value; } @Override public float getProgress() throws IOException { return pos / (float) split.getLength(); } @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { } /** * 根据不同的数据库类型实现不同的分页查询 */ protected String getSelectQuery() { StringBuilder query = new StringBuilder(); try { DBInputFormat.DBInputSplit split = getSplit(); MultiTableInputSplit extSplit = (MultiTableInputSplit) split; if (extSplit.getLength() > 0 && extSplit.getStart() >= 0) { query.append(extSplit.getInputSql()); String dbType = ((MultiTableInputSplit)split).getDbType(); if(dbType.equalsIgnoreCase("TERADATA")){ query.append(" QUALIFY ROW_NUM>=").append(split.getStart()) .append(" AND ROW_NUM<").append(split.getEnd()); } if(dbType.equalsIgnoreCase("ORACLE")){ query.append(" WHERE ROW_NUM>=").append(extSplit.getStart()) .append(" AND ROW_NUM<").append(extSplit.getEnd()); } else{ query.append(" LIMIT ").append(extSplit.getStart()) .append(" ,").append(extSplit.getEnd()); } } } catch (IOException ex) { ex.printStackTrace(); } return query.toString(); } public void initConnection() throws SQLException{ MultiTableInputSplit sp = (MultiTableInputSplit)split; String conUrl = sp.getDbConUrl(); String userName = sp.getUserName(); String passWord = sp.getPassWord(); connection = DriverManager.getConnection(conUrl, userName, passWord); statement= connection.createStatement(); } @Override public void close() throws IOException { try { if (null != results) { results.close(); } if (null != statement) { statement.close(); } if (null != connection) { connection.close(); } } catch (SQLException e) { throw new IOException(e.getMessage()); } } protected ResultSet executeQuery(String query) throws SQLException { return statement.executeQuery(query); } public Connection getConnection() { return connection; } public DBInputFormat.DBInputSplit getSplit() { return split; } protected void setStatement(PreparedStatement stmt) { this.statement = stmt; } }
public class MapDBWritable implements DBWritable{ /** * 表中的列名称--->列值 */ private Map<String, Object> values = null; /** * 表中的列名称 */ private String[] colNames; /** * 列名称--->字段数据类型 */ private Map<String, String> colType; public MapDBWritable(){ } public void readFields(ResultSet resultSet) throws SQLException { ResultSetMetaData meta = resultSet.getMetaData(); int count = meta.getColumnCount(); colNames = new String[count]; values = new HashMap<String, Object>(count); colType = new HashMap<String, String>(count); for (int i = 0; i < count; i++) { String colName = meta.getColumnName(i + 1); colNames[i] = colName; values.put(colName, resultSet.getString(colName)); colType.put(colName, meta.getColumnTypeName(i + 1)); } } @Override public String toString() { StringBuilder builder = new StringBuilder(); for (Map.Entry<String, Object> entry : this.values.entrySet()) { builder.append(entry.getKey() + "->" + entry.getValue()); builder.append(";"); } return builder.substring(0, builder.length() - 1); } public void write(PreparedStatement preparedstatement) throws SQLException { } public Map<String, Object> getValues() { return values; } public void setValues(Map<String, Object> values) { this.values = values; } public String[] getColNames() { return colNames; } public void setColNames(String[] colNames) { this.colNames = colNames; } public Map<String, String> getColType() { return colType; } public void setColType(Map<String, String> colType) { this.colType = colType; } }
MapDBWritable从ResultSet中读取列的类型信息保存在以列名为key类型为value的map中,读取列值保存在以列名为key列值为value的map中,Mapper的输入就为MapDBWritable。
通过扩展以上几个类就可以从多个数据库、多个表读取数据了。
相关推荐
在大数据场景下,有时需要将MapReduce处理的结果存储到关系型数据库中,或者从数据库中读取数据进行处理。Hadoop提供了JDBC(Java Database Connectivity)接口,使得MapReduce作业能够与数据库进行连接和操作。 要...
总结一下,这个“MapReduce读取单词个数”的案例是一个典型的Hadoop MapReduce应用,它展示了如何利用分布式计算处理大量文本数据,统计其中的单词出现频率。整个流程涉及Maven项目构建、生成jar包、上传到Hadoop...
MapReduce之数据清洗ETL详解 MapReduce是一种基于Hadoop的分布式计算框架,广泛应用于大数据处理领域。数据清洗(Data Cleaning)是数据处理过程中非常重要的一步,旨在清洁和转换原始数据,使其更加可靠和有用。...
mapreduce案例测试数据
MapReduce是一种编程模型,用于处理和生成大数据集的并行运算。它由Google提出,并被Apache Hadoop框架广泛采用。MapReduce模型将复杂的数据处理过程分解为两个阶段:Map(映射)和Reduce(归约)。简单来说,Map...
MapReduce是Apache Hadoop的核心组件之一,主要用于处理和分析大规模数据。在这个名为“mapreduce气象数据(用于测试)”的压缩包中,我们有一个可能是用于教学目的的数据集,旨在帮助初学者理解如何在Hadoop环境下...
Pandas可以对数据进行读取、处理和分析,实现数据的快速处理和分析。 电影排名和推荐系统的开发可以分为以下几步: 1. 数据导入:使用Pandas从电影数据库中导入电影数据。 2. 数据预处理:使用Pandas对电影数据...
【Java大数据作业_5Mapreduce、数据挖掘】的课后作业涵盖了多个MapReduce和大数据处理的关键知识点,包括日志分析、Job执行模式、HBase的相关类、容量调度配置、MapReduce流程以及二次排序算法。下面将对这些内容...
MapReduce--->实现简单的数据清洗需要的数据文件
在MapReduce程序的最后,Reducer产生的输出会被写入到HDFS(Hadoop Distributed File System),然后可以读取这些数据并生成图表或报表,以便直观地展示统计结果。 总结来说,这个实例涵盖了MapReduce的基本用法,...
在大数据处理领域,Java语言因其稳定性和可扩展性,成为了实现MapReduce算法的首选工具。MapReduce是一种分布式计算模型,由Google提出,主要用于处理和生成大规模数据集。在这个项目"0324大数据代码与数据_JAVA...
本案例中,我们关注的是如何使用MapReduce处理1901年和1902年的天气源数据,找出这两年内每天的最大温度和最低温度。 首先,`MrTemperature.java`是实现MapReduce任务的主要Java源代码文件。在这个文件中,我们将...
- **Map 程序调用用户 map() 方法**:每当 Map 程序读取一行数据时,就会调用用户的 `map()` 方法,并将这一行数据的起始偏移量作为 key,数据内容作为 value 传入。 - **Reduce 程序调用用户 reduce() 方法**:...
1. 数据读取:从HDFS(Hadoop分布式文件系统)中读取原始YouTube数据。 2. 数据切分:将大文件分割成多个键值对,键可能是视频ID或时间戳,值可能包含用户行为的具体信息。 3. 数据映射:对每个键值对执行自定义逻辑...
3. **MapTask**:MapTask根据InputFormat读取数据,执行Map函数,生成中间键值对。 4. **Shuffle阶段**:MapTask产生的中间结果会按照键值进行分区和排序,然后传输给ReduceTask。 5. **ReduceTask**:ReduceTask...
1. 读取每一行数据,解析出日期和其他气象指标。 2. 将日期与对应的温度、湿度打包成键值对。 3. 输出键值对,准备进入Shuffle和Sort阶段。 Shuffle和Sort阶段是MapReduce流程中的一个重要环节,它确保相同键的值会...
MapReduce是一种分布式计算模型,由Google在2004年提出,主要用于处理和生成大规模数据集。Hadoop是Apache开源项目,它实现了MapReduce模型,为大数据处理提供了可靠的分布式计算框架。在这个场景中,我们将讨论如何...
首先,Map阶段读取原始招聘数据,对每条记录进行初步处理,如去除空白行和格式化日期。接下来,Reduce阶段合并和去重数据,确保每个职位信息唯一且完整。该过程大幅提升了数据处理效率,尤其适用于分布式计算环境。 ...
MapReduce是一种分布式编程模型,由Google在2004年提出,主要用于处理和生成大规模数据集。这个模型将复杂的并行计算分解为两个主要阶段:Map(映射)和Reduce(化简)。在Hadoop框架中,MapReduce被广泛应用于大...