- 浏览: 183432 次
- 性别:
- 来自: 杭州
博客专栏
-
Java技术分享
浏览量:0
文章分类
最新评论
-
masuweng:
学习了,学习了
mybatis是如何防止SQL注入的 -
somefuture:
终于知道了#$的区别
mybatis是如何防止SQL注入的 -
masuweng:
...
tomct处理请求的流程 -
zhp8341:
masuweng 写道寻求cas的更多例子, http://w ...
JUC之CAS -
臻是二哥:
java.util.concurrent包中到处都使用了CAS ...
JUC之CAS
Hadoop对关系数据库无非两种操作,即从关系数据库输入到HDFS和从HDFS输出到关系数据库。Hadoop中分别提供了DBInputFormat类和DBOutputFormat类,前者用于从关系数据库输入到HDFS,该类将关系数据库中的一条记录作为向Mapper输入的value值,后者用于将HDFS中的文件输出到关系数据库,该类将Reducer输出的key值存储到数据库。我们只要在主程序中设置job的输入输出格式为这两个类中的一种,就可以让Hadoop从关系数据库输入或者向关系数据库输出。
正如我上面提到的,我们在操作的过程中使用了“记录”这个对象,因此需要写一个类对应到关系数据库中我们要操作的那个表,这个类要实现DBWritable接口和Writable接口,具体参见HadoopAPI。
具体代码参见文档。
import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.lib.db.*; import java.sql.*; import java.io.*; import java.util.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.*; import org.apache.hadoop.fs.Path; public class SDBConnInput { public static class CustomerRecord implements Writable,DBWritable{ String customerID; String customerName; String phoneNumber; public void readFields(ResultSet resultSet) throws SQLException{ customerID=resultSet.getString(1); customerName=resultSet.getString(2); phoneNumber=resultSet.getString(3); } public void write(PreparedStatement statement) throws SQLException{ statement.setString(1, customerID); statement.setString(2, customerName); statement.setString(3,phoneNumber); } public void readFields(DataInput in) throws IOException{ customerID=in.readUTF(); customerName=in.readUTF(); phoneNumber=in.readUTF(); } public void write(DataOutput out) throws IOException{ out.writeUTF(customerID); out.writeUTF(customerName); out.writeUTF(phoneNumber); } public void setCustomerID(String customerID){ this.customerID=customerID; } public void setCustomerName(String customerName){ this.customerName=customerName; } public void setPhoneNumber(String phoneNumber){ this.phoneNumber=phoneNumber; } public String toString(){ return this.customerID+","+this.customerName+","+this.phoneNumber; } } public static class MapperClass extends MapReduceBase implements Mapper<LongWritable,CustomerRecord,LongWritable,Text>{ Text result= new Text(); public void map(LongWritable key, CustomerRecord value,OutputCollector<LongWritable, Text> collector, Reporter reporter) throws IOException{ result.set(value.toString()); collector.collect(key, result); } } public static class ReducerClass extends MapReduceBase implements Reducer<LongWritable, Text,NullWritable,Text>{ public void reduce(LongWritable key, Iterator<Text> values, OutputCollector<NullWritable,Text> output, Reporter reporter) throws IOException{ String str=""; while(values.hasNext()){ str+=values.next().toString(); } output.collect(null, new Text(str)); } } public static void main(String [] args) throws Exception{ /** * 从关系数据库读取数据到HDFS */ JobConf job = new JobConf(); job.setJarByClass(SDBConnInput.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); job.setInputFormat(DBInputFormat.class); FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/user/xuyizhen/out")); DBConfiguration.configureDB(job, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.0.25:3306/hadoop","root","1117"); String fieldNames []={"customerID","customerName","phoneNumber"}; DBInputFormat.setInput(job, CustomerRecord.class,"customers",null,"customerID", fieldNames); job.setMapperClass(MapperClass.class); job.setReducerClass(ReducerClass.class); JobClient.runJob(job); } }
import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.lib.db.*; import java.sql.*; import java.io.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.filecache.*; public class SDBConnOutput { public static class CustomerRecord implements Writable,DBWritable{ String customerID; String customerName; String phoneNumber; public void readFields(ResultSet resultSet) throws SQLException{ customerID=resultSet.getString(1); customerName=resultSet.getString(2); phoneNumber=resultSet.getString(3); } public void write(PreparedStatement statement) throws SQLException{ statement.setString(1, customerID); statement.setString(2, customerName); statement.setString(3,phoneNumber); } public void readFields(DataInput in) throws IOException{ customerID=in.readUTF(); customerName=in.readUTF(); phoneNumber=in.readUTF(); } public void write(DataOutput out) throws IOException{ out.writeUTF(customerID); out.writeUTF(customerName); out.writeUTF(phoneNumber); } public void setCustomerID(String customerID){ this.customerID=customerID; } public void setCustomerName(String customerName){ this.customerName=customerName; } public void setPhoneNumber(String phoneNumber){ this.phoneNumber=phoneNumber; } public String toString(){ return this.customerID+","+this.customerName+","+this.phoneNumber; } } public static class MapperClass extends MapReduceBase implements Mapper<LongWritable,Text,CustomerRecord,Text>{ CustomerRecord customer=new CustomerRecord(); public void map(LongWritable key, Text value,OutputCollector<CustomerRecord,Text> collector, Reporter reporter) throws IOException{ String [] strs=value.toString().split(","); customer.setCustomerID(strs[0]); customer.setCustomerName(strs[1]); customer.setPhoneNumber(strs[2]); collector.collect( customer,value); } } /** *将HDFS中的文件输出到数据库 */ public static void main(String [] args) throws Exception{ /** * 从关系数据库读取数据到HDFS */ JobConf job = new JobConf(SDBConnInput.class); //DBOutputFormat类只会将MapReduce框架输出结果的K值输出到关系数据库中 job.setOutputFormat(DBOutputFormat.class); FileInputFormat.addInputPath(job, new Path("hdfs://master:9000/user/xuyizhen/in/customer.txt")); DBConfiguration.configureDB(job, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.0.25:3306/hadoop","root","1117"); String fieldNames []={"customerID","customerName","phoneNumber"}; DBOutputFormat.setOutput(job, "customers", fieldNames); job.setMapperClass(MapperClass.class); job.setNumReduceTasks(0); JobClient.runJob(job); } }
注意:运行MapReduce时候报错:
java.io.IOException: com.mysql.jdbc.Driver
一般是由于程序找不到mysql驱动包。解决方法是让每个tasktracker运行MapReduce程序时都可以找到该驱动包。
添加包有两种方式:
1.在每个节点下的${HADOOP_HOME}/lib下添加该包,然后重启集群,这是比较原始的方法。
2.把包传到集群上:hadoop fs -put mysql驱动jar包名称/lib,并且在提交job前,添加语句DistributedCache.addFileToClassPath(new Path("/lib/mysql驱动jar包名称"),conf);
以上方法使用与所有需要额外jar包的MapReduce代码。
- mysql-connector-java-5.1.22-bin.jar (813.4 KB)
- 下载次数: 3
发表评论
-
Hive安装
2014-11-21 20:43 1123Hive安装 hive是基于Hadoop的一个数据仓库工具 ... -
Pig安装
2014-11-13 16:20 1112Pig有两种使用模式:本地模式和MapReduce模式。 ... -
通过全局文件复制实现多数据源的Map端连接
2014-11-10 17:45 1292在DataJoin实现多数据源reduce端连接的过程中,连接 ... -
用DataJoin实现多数据源的Reduce端链接
2014-11-08 16:51 1321DataJoin是Hadoop处理多数据源问题的一个jar包, ... -
hadoop之用户定制
2014-11-04 09:30 1731Hadoop提供了9中内置数据类型,分别为: BooleanW ... -
使用复合键优化倒排索引
2014-11-03 11:18 1607巧用复合键优化倒排索引程序 之前写了一个倒排索引的程序,但 ... -
倒排索引
2014-10-31 11:49 1478倒排索引是文档检索系统中最常见的数据结构,被广泛的应用于搜索 ... -
Reducer多少个最佳
2014-10-29 20:29 875从MapReduce框架的执行流程,我们知道,输入文件会被分 ... -
从WordCount看MapReduce框架执行流程
2014-10-29 16:51 3546代码如下: import java.io.IOExcept ... -
第一个hadoop程序-WordCount
2014-10-28 20:46 1938首先说明一下环境:我在前面的博客中搭建的hadoop平台,具 ... -
HDFS可靠性措施
2014-10-27 08:21 2865HDFS可靠性措施 一、 ... -
Win7上的Eclipse3.3远程连接ubuntu14.04中的hadoop0.20.2
2014-10-24 19:15 1010Win7上的Eclipse3.3远程连 ... -
ubuntu14.04的hadoop环境搭建(全分布模式)
2014-10-20 10:53 1576hadoop0.20.2软件下载http://pan.ba ... -
ubuntu实现无密码登陆
2014-10-19 10:43 3480环境说明: 打在ubuntu系统的两台计算机mas ... -
Ubuntu14.04安装jdk1.7.0_71
2014-10-17 19:41 2742Ubuntu14.04安装jdk1.7.0_71 将位于~ ... -
ubuntu14.04设置静态IP
2014-09-19 21:48 6524最近在研究集群,于是弄了几台破电脑装ubuntu,结果ubun ...
相关推荐
Amazon网络服务提供的托管Hadoop集群的运行方式,以及它与用户直接管理的Hadoop集群有何区别, Hadoop与关系数据库的融合,使用Hive执行SQL查询,使用Sqoop迁移数据, 组成Hadoop生态系统的其他项目和工具,以及...
Hadoop 可以在大量廉价的硬件设备组成的集群上运行应用程序,全面地将计算推向数据,在处理 ...本文对Hadoop 和关系型数据库进行了比较分析,讨论了将二者结 合构建海量数据分析系统的可行性,同时给出了实际的应用场景
A.4实验四:NoSQL和关系数据库的操作比较 本实验对应第6章的内容。 A.4.1实验目的 (1)理解4种数据库(MySQL、HBase,Redis和 MongoDB)的概念及不同点。(2)熟练使用4种数据库操作常用的 Shell命令。 (3)熟悉4种...
Hadoop可以在大量廉价的硬件设备组成的集群上运行应用程序,全面地将计算推向数据,在处理...本文对Hadoop和关系型数据库进行了比较分析,讨论了将二者结合构建海量数据分析系统的可行性,同时给出了实际的应用场景。
Hadoop集群之—MySQL关系数据库_V1.0,很详细很好的Hadoop资料,与大家分享,下载后给个赞,谢谢!
1、 hadoop官方网站,首页会有最新动态。 2、 Nutch ->谷歌GFS论文->doug 根据GFS设计了NDFS、06年启动hadoop项目。 3、 环境支持 操作系统 Linux 、WINDOWS-Cygwin、hadoop-for-windows JDK支持 下载jdk,解压jdk...
当大数据存储和Hadoop生态系统的MapReduce,Hive,HBase等分析器出现时,他们需要一种工具来与关系数据库服务器进行交互,以导入和导出驻留在其中的大数据。Sqoop在Hadoop生态系统中占据一席之地,为关系数据库...
大数据与云计算培训学习资料 Hadoop集群 细细品味Hadoop_第10期_MySQL关系数据库 共47页.pdf
Hadoop 集群配置详解 Hadoop_Hadoop集群(第1期)_CentOS安装配置 Hadoop_Hadoop集群(第2期)_机器信息分布表 Hadoop_Hadoop集群(第4期...Hadoop_Hadoop集群(第10期)_MySQL关系数据库 Web(Json-Lib类库使用手册)
Hadoop集群·CentOS安装配置(第1期) ...Hadoop集群·MySQL关系数据库(第10期) Hadoop集群·常用MySQL数据库命令(第10期副刊) Hadoop集群·HBase简介及安装(第11期) Hadoop集群·HBase之旅(第11期副刊)
Hadoop集群(第10期)MySQL关系数据库汇编.pdf
HBase 和传统关系数据库不同,它采用了 BigTable 的数据模型增强的稀疏排序映射表(Key/Value ),其中,键由行关键字、列关键字和时间戳构成。 HBase 提供了对大规模数据的随机、实时读写访问。HBase 的目标是存储...
Sqoop(发音:skup)是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql...)间进行数据的传递,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,...
基于Hadoop和关系型数据库混合构架,提出新型用采数据服务架构。对平台的高可用性、系统监控、IaaS部署等进行分析,提升了系统 的可靠性,降低了运维难度。提出了可扩展的数据预处理过程和数据质量管理模型,保障了...
Sqoop是一个用于在Hadoop和关系数据库或大型机之间传输数据的工具。您可以使用Sqoop将关系数据库管理系统(RDBMS)中的数据导入Hadoop分布式文件系统(HDFS),转换Hadoop MapReduce中的数据,然后将数据导出回RDBMS...
模块二 Hadoop生态系统介绍和演示 Hadoop HDFS 和 MapReduce Hadoop数据库之HBase Hadoop数据仓库之Hive Hadoop数据处理脚本Pig Hadoop数据接口Sqoop和Flume,Scribe DataX Hadoop工作流引擎 Oozie 运用Hadoop...
另一方面,关系数据库对于结构化数据的快速查询的能力却是Hadoop 没有的。因此,企业 的数据总是存储于关系型数据库中,以满足快速查询的需要。尽管同时使用Hadoop 和关系 10 数据库可以弥补彼此的不足。然而令人...
Hadoop相较于其他系统的优势-关系型数据库管理系统2022优秀文档.pptx
书籍目录: 第一部分 Hadoop——一种分布式编程框架 第1章 Hadoop简介 1.1 为什么写《Hadoop 实战》 1.2 什么是Hadoop 1.3 了解分布式系统和Hadoop 1.4 比较SQL 数据库和Hadoop 1.5 理解MapReduce 1.5.1 动手...
本书是Hadoop权威参考,程序员可从中探索如何分析海量数据集,管理员可以从中了解如何安装与运行Hadoop集群。 目录 第1章 初识Hadoop 数据!数据! 数据存储与分析 与其他系统相比 关系型数据库管理系统...