package com.sun.mysql; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 从mysql中读数据(结果存放在HDFS中)然后经mapreduce处理 * @author asheng */ public class ReadDataFromMysql { /** * 重写DBWritable * @author asheng * TblsRecord需要从mysql读取数据 */ public static class TblsRecord implements Writable, DBWritable { String tbl_name; String tbl_type; public TblsRecord() { } @Override public void write(PreparedStatement statement) throws SQLException { statement.setString(1, this.tbl_name); statement.setString(2, this.tbl_type); } @Override public void readFields(ResultSet resultSet) throws SQLException { this.tbl_name = resultSet.getString(1); this.tbl_type = resultSet.getString(2); } @Override public void write(DataOutput out) throws IOException { Text.writeString(out, this.tbl_name); Text.writeString(out, this.tbl_type); } @Override public void readFields(DataInput in) throws IOException { this.tbl_name = Text.readString(in); this.tbl_type = Text.readString(in); } public String toString() { return new String(this.tbl_name + " " + this.tbl_type); } } /** * Mapper * @author asheng * 下面的类中的Mapper一定是包org.apache.hadoop.mapreduce.Mapper;下的 */ public static class ConnMysqlMapper extends Mapper<LongWritable,TblsRecord,Text,Text> //TblsRecord是自定义的类型,也就是上面重写的DBWritable类 { public void map(LongWritable key,TblsRecord values,Context context)throws IOException, InterruptedException { //只是将从数据库读取进来数据转换成Text类型然后输出给reduce context.write(new Text(values.tbl_name), new Text(values.tbl_type)); } } /** * Reducer * @author asheng * 下面的类中的Reducer一定是包org.apache.hadoop.mapreduce.Reducer;下的 */ public static class ConnMysqlReducer extends Reducer<Text,Text,Text,Text> { public void reduce(Text key,Iterable<Text> values,Context context)throws IOException, InterruptedException { //循环遍历并写入相应的指定文件中 for(Iterator<Text> itr = values.iterator();itr.hasNext();) { context.write(key, itr.next()); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver","jdbc:mysql://127.0.0.1:3306/mapreduce_test", "root", "root"); Job job = new Job(conf,"test mysql connection"); job.setJarByClass(ReadDataFromMysql.class); job.setMapperClass(ConnMysqlMapper.class); job.setReducerClass(ConnMysqlReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(DBInputFormat.class); FileOutputFormat.setOutputPath(job, new Path("hdfs://127.0.0.1:9000/user/lxw/output/")); //对应数据库中的列名 String[] fields = { "TBL_NAME", "TBL_TYPE" }; //setInput方法六个参数分别的含义: //1.Job;2.Class<? extends DBWritable>按照什么类型读取的 //3.表名;4.where条件 //5.order by语句;6.列名所组成的数组 DBInputFormat.setInput(job, TblsRecord.class,"lxw_tabls", "TBL_NAME like 'lxy%'", "TBL_NAME", fields); System.exit(job.waitForCompletion(true) ? 0 : 1); //本程序表示从mysql数据库mapreduce_test的表lxw_tabls中查询处列TAB_NAME为lxy开头的数据并放入hdfs中 //执行完后的查看bin/hadoop fs -cat /user/lxw/output/part-r-00000 /*结果 lxyae lxyaccg lxybf */ } } /* mysql> select * from lxw_tabls; +----------+----------+ | TBL_NAME | TBL_TYPE | +----------+----------+ | zhao | a | | qian | b | | sun | c | | li | d | | lxya | e | | lxyb | f | | lxyacc | g | +----------+----------+ 7 rows in set (0.00 sec) */
相关推荐
maven 项目操作MapReduce读取单词案例,运行方式:通过maven 生成jar 包, 上传到hadoop 服务器,通过hadoop jar 生成的jar包 运行,无需上传参数
18、MapReduce的计数器与通过MapReduce读取_写入数据库示例 网址:https://blog.csdn.net/chenwewi520feng/article/details/130454774 本文介绍MapReduce的计数器使用以及自定义计数器、通过MapReduce读取与写入...
mapreduce案例测试数据
包中含有hadoop-eclipse-plugin-2.6.0.jar ,hadoop.dll,winutils.exe 三个文件,是windows 运行mapreduce 的配置文件。hadoop2.8.1亲测可用
MapReduce学习笔记,呕心沥血写出来的,里面有很多经验 MapReduce学习笔记,呕心沥血写出来的,里面有很多经验 MapReduce学习笔记,呕心沥血写出来的,里面有很多经验
mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce ...
使用MapReduce读取hbase数据库中千万级别的数据,处理数据并统计,将统计后的结果存入mysql
MapReduce--->实现简单的数据清洗需要的数据文件
22、MapReduce使用Gzip压缩、Snappy压缩和Lzo压缩算法写文件和读取相应的文件 网址:https://blog.csdn.net/chenwewi520feng/article/details/130456088 本文的前提是hadoop环境正常。 本文最好和MapReduce操作常见...
4 分别在自编 MapReduce 程序 WordCount 运行过程中和运行结束后查看 MapReduce Web 界面。 5. 分别在自编 MapReduce 程序 WordCount 运行过程中和运行结束后练习 MapReduce Shell 常用命令。 。。
mapreduce解析网络日志文件(或从mysql数据库获取记录)并计算相邻日志记录间隔时长
mapreduce基本数据读取,通俗易懂。 此项目情景为,遗传关系族谱。 (爷爷 、父母 、孩子) 经典案例
Mapreduce实验报告 前言和简介 MapReduce是Google提出的一种编程模型,在这个模型的支持下可以实现大规模并行化计 算。在Mapreduce框架下一个计算机群通过统一的任务调度将一个巨型任务分成许多部分 ,分别解决然后...
MapReduce发明人关于MapReduce的介绍
【MapReduce篇07】MapReduce之数据清洗ETL1
(2)打开网站localhost:8088和localhost:50070,查看MapReduce任务启动情况 (3)写wordcount代码并把代码生成jar包 (4)运行命令 (1):把linus下的文件放到hdfs上 (2):运行MapReduce (5):查看运行结果 ...
简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接
Hadoop 用mapreduce实现Wordcount实例,绝对能用
图解MapReduce,系统介绍Hadoop MapReduce工作过程原理
hadoop网站通过SVN下载下来的mapreduce代码。欢迎现在学习!