`
臻是二哥
  • 浏览: 183432 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
博客专栏
Group-logo
Java技术分享
浏览量:0
社区版块
存档分类
最新评论

Hadoop与关系数据库

 
阅读更多

Hadoop对关系数据库无非两种操作,即从关系数据库输入到HDFS和从HDFS输出到关系数据库。Hadoop中分别提供了DBInputFormat类和DBOutputFormat类,前者用于从关系数据库输入到HDFS,该类将关系数据库中的一条记录作为向Mapper输入的value值,后者用于将HDFS中的文件输出到关系数据库,该类将Reducer输出的key值存储到数据库。我们只要在主程序中设置job的输入输出格式为这两个类中的一种,就可以让Hadoop从关系数据库输入或者向关系数据库输出。
正如我上面提到的,我们在操作的过程中使用了“记录”这个对象,因此需要写一个类对应到关系数据库中我们要操作的那个表,这个类要实现DBWritable接口和Writable接口,具体参见HadoopAPI。
具体代码参见文档。
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.lib.db.*;
import java.sql.*;
import java.io.*;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.fs.Path;

public class SDBConnInput {
	public static class CustomerRecord implements Writable,DBWritable{
		String customerID;
		String customerName;
		String phoneNumber;
		public void readFields(ResultSet resultSet)  throws SQLException{
			customerID=resultSet.getString(1);
			customerName=resultSet.getString(2);
			phoneNumber=resultSet.getString(3);
			}
		public void write(PreparedStatement statement)  throws SQLException{
			statement.setString(1, customerID);
			statement.setString(2, customerName);
			statement.setString(3,phoneNumber);
		}
			 
		 public void readFields(DataInput in) throws IOException{
			 customerID=in.readUTF();
			 customerName=in.readUTF();
			 phoneNumber=in.readUTF();
		 }
		 public void write(DataOutput out) throws IOException{
			 out.writeUTF(customerID);
			 out.writeUTF(customerName);
			 out.writeUTF(phoneNumber);
		 }
		 public void setCustomerID(String customerID){
			 this.customerID=customerID;
		 }
		 public void setCustomerName(String customerName){
			 this.customerName=customerName;			 
		 }
		 public void setPhoneNumber(String phoneNumber){
			 this.phoneNumber=phoneNumber;
		 }
		 public String toString(){
			 return this.customerID+","+this.customerName+","+this.phoneNumber; 
		 }
	}
	public static class MapperClass extends MapReduceBase implements Mapper<LongWritable,CustomerRecord,LongWritable,Text>{
		Text result= new Text();
		 public void map(LongWritable key, CustomerRecord value,OutputCollector<LongWritable, Text> collector, Reporter reporter) throws IOException{
			result.set(value.toString());
			collector.collect(key, result);
		}
	}
	public static class ReducerClass extends MapReduceBase implements Reducer<LongWritable, Text,NullWritable,Text>{
	    public void reduce(LongWritable key, Iterator<Text> values, OutputCollector<NullWritable,Text> output, Reporter reporter) throws IOException{
	    	 String str="";
	    	 while(values.hasNext()){
	    		  str+=values.next().toString();
	    	 }
	    	 output.collect(null, new Text(str));	
	  }
    }
	public static void main(String [] args) throws Exception{
		/**
		 * 从关系数据库读取数据到HDFS
		 */
		JobConf job = new JobConf();
		job.setJarByClass(SDBConnInput.class);
	    job.setOutputKeyClass(LongWritable.class);
	    job.setOutputValueClass(Text.class);
	    job.setInputFormat(DBInputFormat.class);
	    FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/user/xuyizhen/out"));
	    DBConfiguration.configureDB(job, "com.mysql.jdbc.Driver",
				"jdbc:mysql://192.168.0.25:3306/hadoop","root","1117");
	    String fieldNames []={"customerID","customerName","phoneNumber"};
	    DBInputFormat.setInput(job, CustomerRecord.class,"customers",null,"customerID", fieldNames);
	    job.setMapperClass(MapperClass.class);
	    job.setReducerClass(ReducerClass.class);
	    JobClient.runJob(job);
	}
}

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.lib.db.*;
import java.sql.*;
import java.io.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.filecache.*;

public class SDBConnOutput {
	public static class CustomerRecord implements Writable,DBWritable{
		String customerID;
		String customerName;
		String phoneNumber;
		public void readFields(ResultSet resultSet)  throws SQLException{
			customerID=resultSet.getString(1);
			customerName=resultSet.getString(2);
			phoneNumber=resultSet.getString(3);
			}
		public void write(PreparedStatement statement)  throws SQLException{
			statement.setString(1, customerID);
			statement.setString(2, customerName);
			statement.setString(3,phoneNumber);
		}
			 
		 public void readFields(DataInput in) throws IOException{
			 customerID=in.readUTF();
			 customerName=in.readUTF();
			 phoneNumber=in.readUTF();
		 }
		 public void write(DataOutput out) throws IOException{
			 out.writeUTF(customerID);
			 out.writeUTF(customerName);
			 out.writeUTF(phoneNumber);
		 }
		 public void setCustomerID(String customerID){
			 this.customerID=customerID;
		 }
		 public void setCustomerName(String customerName){
			 this.customerName=customerName;			 
		 }
		 public void setPhoneNumber(String phoneNumber){
			 this.phoneNumber=phoneNumber;
		 }
		 public String toString(){
			 return this.customerID+","+this.customerName+","+this.phoneNumber; 
		 }
	}
	public static class MapperClass extends MapReduceBase implements Mapper<LongWritable,Text,CustomerRecord,Text>{
		CustomerRecord customer=new CustomerRecord();
		 public void map(LongWritable key, Text value,OutputCollector<CustomerRecord,Text> collector, Reporter reporter)  throws IOException{
			 String [] strs=value.toString().split(",");
			customer.setCustomerID(strs[0]);
			customer.setCustomerName(strs[1]);
			customer.setPhoneNumber(strs[2]);
			collector.collect( customer,value);
		}
		
	}
	/**
	*将HDFS中的文件输出到数据库
	*/
	public static void main(String [] args) throws Exception{
	
		
		/**
		 * 从关系数据库读取数据到HDFS
		 */
		
		JobConf job = new JobConf(SDBConnInput.class);
		//DBOutputFormat类只会将MapReduce框架输出结果的K值输出到关系数据库中
	    job.setOutputFormat(DBOutputFormat.class);
	    FileInputFormat.addInputPath(job, new Path("hdfs://master:9000/user/xuyizhen/in/customer.txt"));
	    DBConfiguration.configureDB(job, "com.mysql.jdbc.Driver",
				"jdbc:mysql://192.168.0.25:3306/hadoop","root","1117");
	    String fieldNames []={"customerID","customerName","phoneNumber"};
	    DBOutputFormat.setOutput(job, "customers", fieldNames);
	    job.setMapperClass(MapperClass.class);
	    job.setNumReduceTasks(0);
	    JobClient.runJob(job);
	}
}

注意:运行MapReduce时候报错:
java.io.IOException: com.mysql.jdbc.Driver
一般是由于程序找不到mysql驱动包。解决方法是让每个tasktracker运行MapReduce程序时都可以找到该驱动包。
添加包有两种方式:
1.在每个节点下的${HADOOP_HOME}/lib下添加该包,然后重启集群,这是比较原始的方法。
2.把包传到集群上:hadoop fs -put mysql驱动jar包名称/lib,并且在提交job前,添加语句DistributedCache.addFileToClassPath(new Path("/lib/mysql驱动jar包名称"),conf);
以上方法使用与所有需要额外jar包的MapReduce代码。
3
2
分享到:
评论

相关推荐

    hadoop基础教程

    Amazon网络服务提供的托管Hadoop集群的运行方式,以及它与用户直接管理的Hadoop集群有何区别, Hadoop与关系数据库的融合,使用Hive执行SQL查询,使用Sqoop迁移数据, 组成Hadoop生态系统的其他项目和工具,以及...

    基于Hadoop 及关系型数据库的海量数据分析研究

    Hadoop 可以在大量廉价的硬件设备组成的集群上运行应用程序,全面地将计算推向数据,在处理 ...本文对Hadoop 和关系型数据库进行了比较分析,讨论了将二者结 合构建海量数据分析系统的可行性,同时给出了实际的应用场景

    实验四:NoSQL和关系数据库的操作比较

    A.4实验四:NoSQL和关系数据库的操作比较 本实验对应第6章的内容。 A.4.1实验目的 (1)理解4种数据库(MySQL、HBase,Redis和 MongoDB)的概念及不同点。(2)熟练使用4种数据库操作常用的 Shell命令。 (3)熟悉4种...

    基于Hadoop及关系型数据库的海量数据分析研究

    Hadoop可以在大量廉价的硬件设备组成的集群上运行应用程序,全面地将计算推向数据,在处理...本文对Hadoop和关系型数据库进行了比较分析,讨论了将二者结合构建海量数据分析系统的可行性,同时给出了实际的应用场景。

    Hadoop集群之—MySQL关系数据库_V1.0

    Hadoop集群之—MySQL关系数据库_V1.0,很详细很好的Hadoop资料,与大家分享,下载后给个赞,谢谢!

    HADOOP学习

    1、 hadoop官方网站,首页会有最新动态。 2、 Nutch -&gt;谷歌GFS论文-&gt;doug 根据GFS设计了NDFS、06年启动hadoop项目。 3、 环境支持 操作系统 Linux 、WINDOWS-Cygwin、hadoop-for-windows JDK支持 下载jdk,解压jdk...

    大数据运维技术第9章 Sqoop组件安装配置.pptx

    当大数据存储和Hadoop生态系统的MapReduce,Hive,HBase等分析器出现时,他们需要一种工具来与关系数据库服务器进行交互,以导入和导出驻留在其中的大数据。Sqoop在Hadoop生态系统中占据一席之地,为关系数据库...

    大数据与云计算培训学习资料 Hadoop集群 细细品味Hadoop_第10期_MySQL关系数据库 共47页.pdf

    大数据与云计算培训学习资料 Hadoop集群 细细品味Hadoop_第10期_MySQL关系数据库 共47页.pdf

    Hadoop集群pdf文档

    Hadoop 集群配置详解 Hadoop_Hadoop集群(第1期)_CentOS安装配置 Hadoop_Hadoop集群(第2期)_机器信息分布表 Hadoop_Hadoop集群(第4期...Hadoop_Hadoop集群(第10期)_MySQL关系数据库 Web(Json-Lib类库使用手册)

    Hadoop集群(1-11期)

    Hadoop集群·CentOS安装配置(第1期) ...Hadoop集群·MySQL关系数据库(第10期) Hadoop集群·常用MySQL数据库命令(第10期副刊) Hadoop集群·HBase简介及安装(第11期) Hadoop集群·HBase之旅(第11期副刊)

    Hadoop集群(第10期)MySQL关系数据库汇编.pdf

    Hadoop集群(第10期)MySQL关系数据库汇编.pdf

    Hadoop HBase数据库简介

    HBase 和传统关系数据库不同,它采用了 BigTable 的数据模型增强的稀疏排序映射表(Key/Value ),其中,键由行关键字、列关键字和时间戳构成。 HBase 提供了对大规模数据的随机、实时读写访问。HBase 的目标是存储...

    sqoop-1.4.6.bin-hadoop-2.0.4-alpha版本的压缩包,直接下载到本地,解压后即可使用

    Sqoop(发音:skup)是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql...)间进行数据的传递,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,...

    基于Hadoop和关系型数据库的电力用采大数据混合服务架构

    基于Hadoop和关系型数据库混合构架,提出新型用采数据服务架构。对平台的高可用性、系统监控、IaaS部署等进行分析,提升了系统 的可靠性,降低了运维难度。提出了可扩展的数据预处理过程和数据质量管理模型,保障了...

    sqoop-1.4.6.bin__hadoop-2.0.4-alpha安装包

    Sqoop是一个用于在Hadoop和关系数据库或大型机之间传输数据的工具。您可以使用Sqoop将关系数据库管理系统(RDBMS)中的数据导入Hadoop分布式文件系统(HDFS),转换Hadoop MapReduce中的数据,然后将数据导出回RDBMS...

    HADOOP课程大纲.pdf

    模块二 Hadoop生态系统介绍和演示 Hadoop HDFS 和 MapReduce Hadoop数据库之HBase Hadoop数据仓库之Hive Hadoop数据处理脚本Pig Hadoop数据接口Sqoop和Flume,Scribe DataX Hadoop工作流引擎 Oozie 运用Hadoop...

    基于Hadoop的大规模数据交换的研究

    另一方面,关系数据库对于结构化数据的快速查询的能力却是Hadoop 没有的。因此,企业 的数据总是存储于关系型数据库中,以满足快速查询的需要。尽管同时使用Hadoop 和关系 10 数据库可以弥补彼此的不足。然而令人...

    Hadoop相较于其他系统的优势-关系型数据库管理系统2022优秀文档.pptx

    Hadoop相较于其他系统的优势-关系型数据库管理系统2022优秀文档.pptx

    Hadoop实战中文版

    书籍目录: 第一部分 Hadoop——一种分布式编程框架 第1章 Hadoop简介 1.1 为什么写《Hadoop 实战》 1.2 什么是Hadoop 1.3 了解分布式系统和Hadoop 1.4 比较SQL 数据库和Hadoop 1.5 理解MapReduce 1.5.1 动手...

    Hadoop权威指南 第二版(中文版)

     本书是Hadoop权威参考,程序员可从中探索如何分析海量数据集,管理员可以从中了解如何安装与运行Hadoop集群。 目录 第1章 初识Hadoop  数据!数据!  数据存储与分析  与其他系统相比  关系型数据库管理系统...

Global site tag (gtag.js) - Google Analytics