package com.sun.mysql; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; /** * 将mapreduce的结果数据写入mysql中 * @author asheng */ public class WriteDataToMysql { /** * 重写DBWritable * @author asheng * TblsWritable需要向mysql中写入数据 */ public static class TblsWritable implements Writable, DBWritable { String tbl_name; String tbl_type; public TblsWritable() { } public TblsWritable(String tbl_name,String tab_type) { this.tbl_name = tbl_name; this.tbl_type = tab_type; } @Override public void write(PreparedStatement statement) throws SQLException { statement.setString(1, this.tbl_name); statement.setString(2, this.tbl_type); } @Override public void readFields(ResultSet resultSet) throws SQLException { this.tbl_name = resultSet.getString(1); this.tbl_type = resultSet.getString(2); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.tbl_name); out.writeUTF(this.tbl_type); } @Override public void readFields(DataInput in) throws IOException { this.tbl_name = in.readUTF(); this.tbl_type = in.readUTF(); } public String toString() { return new String(this.tbl_name + " " + this.tbl_type); } } public static class ConnMysqlMapper extends Mapper<LongWritable,Text,Text,Text> //TblsRecord是自定义的类型,也就是上面重写的DBWritable类 { public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException { //<首字母偏移量,该行内容>接收进来,然后处理value,将abc和x作为map的输出 //key对于本程序没有太大的意义,没有使用 String name = value.toString().split(" ")[0]; String type = value.toString().split(" ")[1]; context.write(new Text(name),new Text(type)); } } public static class ConnMysqlReducer extends Reducer<Text,Text,TblsWritable,TblsWritable> { public void reduce(Text key,Iterable<Text> values,Context context)throws IOException, InterruptedException { //接收到的key value对即为要输入数据库的字段,所以在reduce中: //wirte的第一个参数,类型是自定义类型TblsWritable,利用key和value将其组合成TblsWritable, 然后等待写入数据库 //wirte的第二个参数,wirte的第一个参数已经涵盖了要输出的类型,所以第二个类型没有用,设为null for(Iterator<Text> itr = values.iterator();itr.hasNext();) { context.write(new TblsWritable(key.toString(),itr.next().toString()),null); } } } public static void main(String args[]) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver","jdbc:mysql://127.0.0.1:3306/mapreduce_test", "root", "root"); Job job = new Job(conf,"test mysql connection"); job.setJarByClass(ReadDataFromMysql.class); job.setMapperClass(ConnMysqlMapper.class); job.setReducerClass(ConnMysqlReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(DBOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); DBOutputFormat.setOutput(job, "lxw_tabls", "TBL_NAME","TBL_TYPE"); System.exit(job.waitForCompletion(true) ? 0 : 1); } } //执行输入参数为/home/asheng/hadoop/in/test3.txt //test3.txt中的内容为 /* abc x def y chd z */ //即将abc x分别做为TBL_NAME,和TBL_TYPE插入数据库中 //输出结果在mysql数据库中查看 //select * from lxw_tabls; //发现新增三行 /* abc x def y chd z */
相关推荐
使用MapReduce读取hbase数据库中千万级别的数据,处理数据并统计,将统计后的结果存入mysql
mapreduce-db-operatmapreduce实现数据从hdfs到mysql之间的相互传递
利用hadoop的mapreduce把oracle/mysql中的数据导入到hbase和hdfs中的两个java程序
从hdfs进行mapreduce数据导入到数据库 hadoop连接数据库查询数据,并添加到hdfs;从hdfs进行mapreduce数据导入到数据库 hadoop连接数据库查询数据,并添加到hdfs;从hdfs进行mapreduce数据导入到数据库
18、MapReduce的计数器与通过MapReduce读取_写入数据库示例 ...本文介绍MapReduce的计数器使用以及...本文的前提依赖是hadoop可正常使用、mysql数据库中的表可用且有数据。 本文分为2个部分,即计数器与读写mysql数据库。
通过爬虫代码获取Java岗位相关数据,并利用MapReduce进行数据处理和分析,最终通过前后端分离展示在一个大屏上。 后端方面使用SpringBoot集成JPA撰写接口,提供快速启动部署能力;前端方面,使用的Vue2脚手架,结合...
以落地方式从mysql数据库读取数据到HBASE中的mapreduce程序
扩展: 1、数据处理主要技术 Sqoop:作为⼀款开源的离线数据传输⼯具,主要⽤于Hadoop(Hive) 与传统数据库(MySql,PostgreSQL)间的数据传递。它可以 将⼀个关系数据库中数据导⼊Hadoop的HDFS中,也可以将HDFS中的...
本次要实践的数据日志来源于国内某技术学习论坛,该论坛由某培训机构主办,汇聚...使用Sqoop把Hive产生的统计结果导出到mysql中; 两个日志文件,一共有200MB,符合大数据量级,可以作为推荐系统数据集和hadoop测试集。
包含了idea代码关于使用mapreduce清洗数据,以及上传数据到HDFS。 包含了hive创表以及sqoop导出数据到MySQL。
采集电影名称、电影简介、电影评分、其他信息、电影连接等字段,抓取电影票房总收入排名情况(取前20),删除冗余和空值字,利用Python的PyMysql库连接本地Mysql数据库并导入movies表,可以将数据保存到本地,从而...
Hive是一个数据仓库,它部署在Hadoop集群上,它的数据是存储在HDFS上的,Hive所建的表在HDFS上对应...HIve特别神奇的地方是我们只需写一条SQL语句它就会自动转换为MapReduce任务去执行,不用我们再手动去写MapReduce了
传统关系数据库,如:Oracle、MYSQL 无法储存几亿行长,几百万行宽的表格,巨大的数据直接导致数据库崩溃 半结构化数据和脏数据将会导致出错(类型不严格) 传统方法失效 ! 如何解决? 大数据处理技术的解决办法:...
《数据算法:Hadoop/Spark大数据处理技巧》介绍了很多基本设计模式、优化技术和数据挖掘及机器学习解决方案,以解决生物信息学、基因组学、统计和社交网络分析等领域的很多问题。这还概要介绍了MapReduce、Hadoop和...
本项目以电影数据为主题,基于hadoop伪分布式搭建,结合hive数据仓库调用物理机mysql数据库实现电影相关数据统计、通过Mapreduce编程对hdfs文件系统的文件进行词频统计。使用python进行电影数据采集、处理、分析及...
本项目以电影数据为主题,基于hadoop伪分布式搭建,结合hive数据仓库调用物理机mysql数据库实现电影相关数据统计、通过Mapreduce编程对hdfs文件系统的文件进行词频统计。使用python进行电影数据采集、处理、分析及...
技术点4 使用MapReduce 将数据导入数据库 技术点5 使用Sqoop 从MySQL 导入数据 2.2.4 HBase 技术点6 HBase 导入HDFS 技术点7 将HBase 作为MapReduce 的数据源 2.3 将数据导出Hadoop 2.3.1 将数据...
数 据 层 结构化业务数据、机器数据 半结构化数据、机器数据 序列化 算法库 机器学习 Storm内存 流式计算框架 Hadoop MapReduce 计算框架 Spark 并行计算框架 计 算 层 运营 分析 日志 分析 个性化 推荐 供应链 分析...
2.2.3 从数据库中拉数据技术点4 使用MapReduce 将数据导入数据库 技术点5 使用Sqoop 从MySQL 导入数据 2.2.4 HBase 技术点6 HBase 导入HDFS 技术点7 将HBase 作为MapReduce 的数据源2.3 将数据导出...