一、背景
为了方便MapReduce直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInputFormat和DBOutputFormat两个类。通过DBInputFormat类把数据库表数据读入到HDFS,根据DBOutputFormat类把MapReduce产生的结果集导入到数据库表中。
推荐阅读:
Hadoop 中利用 MapReduce 读写 MySQL 数据
http://www.linuxidc.com/Linux/2013-07/88117.htm
二、技术细节
1、DBInputFormat(Mysql为例),先创建表:
CREATE TABLE studentinfo (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(32) NOT NULL);
2、DBInputFormat用法如下:
public class DBInput {
// DROP TABLE IF EXISTS `hadoop`.`studentinfo`;
// CREATE TABLE studentinfo (
// id INTEGER NOT NULL PRIMARY KEY,
// name VARCHAR(32) NOT NULL);
public static class StudentinfoRecord implements Writable, DBWritable {
int id;
String name;
public StudentinfoRecord() {
}
public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.name = Text.readString(in);
}
public void write(DataOutput out) throws IOException {
out.writeInt(this.id);
Text.writeString(out, this.name);
}
public void readFields(ResultSet result) throws SQLException {
this.id = result.getInt(1);
this.name = result.getString(2);
}
public void write(PreparedStatement stmt) throws SQLException {
stmt.setInt(1, this.id);
stmt.setString(2, this.name);
}
public String toString() {
return new String(this.id + " " + this.name);
}
}
public class DBInputMapper extends MapReduceBase implements
Mapper<LongWritable, StudentinfoRecord, LongWritable, Text> {
public void map(LongWritable key, StudentinfoRecord value,
OutputCollector<LongWritable, Text> collector, Reporter reporter)
throws IOException {
collector.collect(new LongWritable(value.id), new Text(value
.toString()));
}
}
public static void main(String[] args) throws IOException {
JobConf conf = new JobConf(DBInput.class);
DistributedCache.addFileToClassPath(new Path(
"/lib/mysql-connector-java-5.1.0-bin.jar"), conf);
conf.setMapperClass(DBInputMapper.class);
conf.setReducerClass(IdentityReducer.class);
conf.setMapOutputKeyClass(LongWritable.class);
conf.setMapOutputValueClass(Text.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
conf.setInputFormat(DBInputFormat.class);
FileOutputFormat.setOutputPath(conf, new Path("/hua01"));
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
"jdbc:mysql://192.168.3.244:3306/hadoop", "hua", "hadoop");
String[] fields = { "id", "name" };
DBInputFormat.setInput(conf, StudentinfoRecord.class, "studentinfo",
null, "id", fields);
JobClient.runJob(conf);
}
}
a)StudnetinfoRecord类的变量为表字段,实现Writable和DBWritable两个接口。
实现Writable的方法:
public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.name = Text.readString(in);
}
public void write(DataOutput out) throws IOException {
out.writeInt(this.id);
Text.writeString(out, this.name);
}
实现DBWritable的方法:
public void readFields(ResultSet result) throws SQLException {
this.id = result.getInt(1);
this.name = result.getString(2);
}
public void write(PreparedStatement stmt) throws SQLException {
stmt.setInt(1, this.id);
stmt.setString(2, this.name);
}
b)读入Mapper的value类型是StudnetinfoRecord。
c)配置如何连入数据库,读出表studentinfo数据。
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
"jdbc:mysql://192.168.3.244:3306/hadoop", "hua", "hadoop");
String[] fields = { "id", "name" };
DBInputFormat.setInput(conf, StudentinfoRecord.class, "studentinfo", null, "id", fields);
3、DBOutputFormat用法如下:
public class DBOutput {
public static class StudentinfoRecord implements Writable, DBWritable {
int id;
String name;
public StudentinfoRecord() {
}
public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.name = Text.readString(in);
}
public void write(DataOutput out) throws IOException {
out.writeInt(this.id);
Text.writeString(out, this.name);
}
public void readFields(ResultSet result) throws SQLException {
this.id = result.getInt(1);
this.name = result.getString(2);
}
public void write(PreparedStatement stmt) throws SQLException {
stmt.setInt(1, this.id);
stmt.setString(2, this.name);
}
public String toString() {
return new String(this.id + " " + this.name);
}
}
public static class MyReducer extends MapReduceBase implements
Reducer<LongWritable, Text, StudentinfoRecord, Text> {
public void reduce(LongWritable key, Iterator<Text> values,
OutputCollector<StudentinfoRecord, Text> output, Reporter reporter)
throws IOException {
String[] splits = values.next().toString().split("/t");
StudentinfoRecord r = new StudentinfoRecord();
r.id = Integer.parseInt(splits[0]);
r.name = splits[1];
output.collect(r, new Text(r.name));
}
}
public static void main(String[] args) throws IOException {
JobConf conf = new JobConf(DBOutput.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(DBOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path("/hua/hua.bcp"));
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
"jdbc:mysql://192.168.3.244:3306/Hadoop", "hua", "hadoop");
DBOutputFormat.setOutput(conf, "studentinfo", "id", "name");
conf.setMapperClass(org.apache.hadoop.mapred.lib.IdentityMapper.class);
conf.setReducerClass(MyReducer.class);
JobClient.runJob(conf);
}
}
a)StudnetinfoRecord类的变量为表字段,实现Writable和DBWritable两个接口,同.DBInputFormat的StudnetinfoRecord类。
b)输出Reducer的key/value类型是StudnetinfoRecord。
c)配置如何连入数据库,输出结果到表studentinfo。
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
"jdbc:mysql://192.168.3.244:3306/hadoop", "hua", "hadoop");
DBOutputFormat.setOutput(conf, "studentinfo", "id", "name");
三、总结
运行MapReduce时候报错:java.io.IOException: com.mysql.jdbc.Driver,一般是由于程序找不到mysql驱动包。解决方法是让每个
tasktracker运行MapReduce程序时都可以找到该驱动包。
添加包有两种方式:
1.在每个节点下的${HADOOP_HOME}/lib下添加该包。重启集群,一般是比较原始的方法。
2.a)把包传到集群上: hadoop fs -put mysql-connector-java-5.1.0- bin.jar /lib
b)在mr程序提交job前,添加语句:istributedCache.addFileToClassPath(new Path("/lib/mysql- connector-java- 5.1.0-bin.jar"), conf);
分享到:
相关推荐
hadoop中Map-Reduce使用示例,输入(DBInputFormat),输出(DBOu-MR_HBase
hadoop的hadoop.dll和winutils.exe下载
Hadoop是一个主要由Java语言开发的项目,基于Hadoop的MapReduce程序也主要是使用Java语言来编写。...经过调研,在MapReduce任务中使用C++程序的方法主要有三种:Hadoop Streaming、Hadoop Pipes以及Hadoop JNI。
Hadoop 是一个能够让用户轻松搭建和使用的分布式计算平台,能 够让用户轻松地在 Hadoop 上开发和运行处理海量数据的应用程序。 Hadoop 架构有两个主要的组件:分布式文件系统 HDFS 和 MapReduce 引擎。 在 Hadoop 中...
在windows环境下开发hadoop时,需要配置HADOOP_HOME环境变量,变量值D:\hadoop-common-2.7.3-bin-master,并在Path...解决方案:下载本资源解压将hadoop.dll和winutils.exe文件复制到hadoop2.7.3的bin目录下即可解决。
ES和HADOOP使用问题和需求
支持如下版本的Hadoop hadoop-2.6.0 hadoop-2.6.3 hadoop-2.6.4 hadoop-2.7.1 hadoop-2.8.1 hadoop-2.8.3 hadoop-3.0.0
Hadoop介绍,HDFS和MapReduce工作原理
hadoop调试工具hadoop.dll和hadoop.exp和winutils.exe 64位
hadoop权威指南4和源码hadoop权威指南4和源码hadoop权威指南4和源码hadoop权威指南4和源码
hadoop.dll 和 winutils.exe
windows平台上,使用Eclipse hadoop插件,开发基于hdfs文件的中文分词统计和排序功能,以唐诗三百首为例,找出其中使用频率最高的词语。
hadoop2.7.2在windows环境中相关依赖文件hadoop.dll和winutils.exe
摘要 Hadoop 是一个处理、存储和分析海量的...Hadoop 和 Google 内部使用的分布式计算系统原理相同,其开源特性使其成为分布式计算系统的事实上的国际标准。 Yahoo、Facebook、Amazon,以及国内的百度、阿里巴巴等众多
hadoop hadoop的hadoop.dll和winutils.exe 解决方法, 把winutils.exe加入你的hadoop-x.x.x/bin下 Could not locate executable null\bin\winutils.exe in the Hadoop binaries
项目负责人tomwhite透过本书详细阐述了如何使用hadoop构建可靠、可伸缩的分布式系统,程序员可从中探索如何分析海量数据集,管理员可以从中了解如何安装和运行hadoop集群。 本书结合丰富的案例来展示如何用hadoop...
在Python代码中使用os模块配置:os.environ[‘HADOOP_HOME’] = ‘HADOOP解压文件夹路径’ winutils.exe,并放入Hadoop解压文件夹的bin目录内 hadoop.dll,并放入:C:/Windows/System32 文件夹内 如果配置出现问题,...
Hadoop使用常见问题以及解决方法,简单实用
下面的文档是一些概念介绍和操作教程,可帮助你开始使用Hadoop。如果遇到了问题,你可以向邮件列表求助或者浏览一下存档邮件。 Hadoop快速入门 Hadoop集群搭建 Hadoop分布式文件系统 Hadoop Map-Reduce教程 ...
hadoop1.1.2安装和配置hadoop1.1.2安装和配置hadoop1.1.2安装和配置hadoop1.1.2安装和配置hadoop1.1.2安装和配置hadoop1.1.2安装和配置hadoop1.1.2安装和配置hadoop1.1.2安装和配置hadoop1.1.2安装和配置hadoop1.1.2...