<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>0.96.2-hadoop2</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>0.96.2-hadoop2</version> </dependency> </dependencies>
package com.abloz.hbase; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapred.TableInputFormat; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapred.jobcontrol.JobControl; import org.apache.hadoop.mapred.lib.db.DBConfiguration; import org.apache.hadoop.mapred.lib.db.DBOutputFormat; import org.apache.hadoop.mapred.lib.db.DBWritable; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @SuppressWarnings("deprecation") public class CopyToMysql extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(CopyToMysql.class); public static final String driverClassName = "com.mysql.jdbc.Driver"; public static final String URL = "jdbc:mysql://Hadoop48/toplists"; public static final String USERNAME = "root";//mysql username public static final String PASSWORD = "";//mysql password private static final String tableName="myaward"; private Connection connection; public static class AwardInfoRecord implements Writable, DBWritable { String userid; String nick; String loginid; public AwardInfoRecord() { } public void readFields(DataInput in) throws IOException { this.userid = Text.readString(in); this.nick = Text.readString(in); this.loginid = Text.readString(in); } public void write(DataOutput out) throws IOException { Text.writeString(out,this.userid); Text.writeString(out, this.nick); Text.writeString(out, this.loginid); } public void readFields(ResultSet result) throws SQLException { this.userid = result.getString(1); this.nick = result.getString(2); this.loginid = result.getString(3); } public void write(PreparedStatement stmt) throws SQLException { stmt.setString(1, this.userid); stmt.setString(2, this.nick); stmt.setString(3, this.loginid); } public String toString() { return new String(this.userid + " " + this.nick +" " +this.loginid); } } public static Configuration conf; public static class MyMapper extends MapReduceBase implements Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, ImmutableBytesWritable> { @Override public void map(ImmutableBytesWritable key, Result rs, OutputCollector<ImmutableBytesWritable, ImmutableBytesWritable> output, Reporter report) throws IOException { String rowkey = new String(key.get()); String userid = new String(rs.getValue("info".getBytes(), "UserId".getBytes())); String nick = new String(rs.getValue("info".getBytes(), "nickName".getBytes()),HConstants.UTF8_ENCODING); String loginid = new String(rs.getValue("info".getBytes(), "loginId".getBytes())); output.collect(new ImmutableBytesWritable(userid.getBytes()),new ImmutableBytesWritable((nick+","+loginid).getBytes())); //LOG.info("map: userid:"+userid+",nick:"+nick); } @Override public void configure(JobConf job) { super.configure(job); } } public static class MyReducer extends MapReduceBase implements Reducer<ImmutableBytesWritable, ImmutableBytesWritable,AwardInfoRecord, Text>{ @Override public void configure(JobConf job) { super.configure(job); } @Override public void reduce(ImmutableBytesWritable key, Iterator<ImmutableBytesWritable> it, OutputCollector<AwardInfoRecord, Text> output, Reporter report) throws IOException { AwardInfoRecord record = new AwardInfoRecord(); record.userid=new String(key.get()); String info = new String(it.next().get()); record.nick = new String(info.split(",")[0]); record.loginid = new String(info.split(",")[1]); //LOG.debug("reduce: userid:"+record.userid+",nick:"+record.nick); output.collect(record, new Text()); } } public static void main(String[] args) throws Exception { conf = HBaseConfiguration.create(); int ret = ToolRunner.run(conf, new CopyToMysql(), args); System.exit(ret); } @Override public int run(String[] args) throws Exception { createConnection(driverClassName, URL); JobControl control = new JobControl("mysql"); JobConf job = new JobConf(conf,CopyToMysql.class); job.setJarByClass(CopyToMysql.class); String fromTable = "award"; job.set("mapred.input.dir", fromTable); job.set("hbase.mapred.tablecolumns", "info:UserId info:nickName info:loginId"); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(ImmutableBytesWritable.class); job.setInputFormat(TableInputFormat.class); DBConfiguration.configureDB(job, driverClassName, URL, USERNAME, PASSWORD); String[] fields = {"userid","nick","loginid"}; DBOutputFormat.setOutput(job, tableName, fields); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setNumReduceTasks(1); Job controlJob = new Job(job); control.addJob(controlJob); //JobClient.runJob(job); //control.run(); Thread theController = new Thread(control); theController.start(); //final while(!control.allFinished()){ Thread.sleep(3000); System.out.print("."); } control.stop(); System.out.println(); LOG.info("job end!"); return 0; } //connect private void createConnection(String driverClassName, String url) throws Exception { Class.forName(driverClassName); connection = DriverManager.getConnection(url,USERNAME,PASSWORD); connection.setAutoCommit(false); } //create table fast private void createTable(String tableName) throws SQLException { String createTable = "CREATE TABLE " +tableName+ " (userid VARCHAR(9) NOT NULL," + " nick VARCHAR(20) NOT NULL, " + " loginid VARCHAR(20) NOT NULL, " + " PRIMARY KEY (userid, caldate))"; Statement st = connection.createStatement(); try { st.executeUpdate(createTable); connection.commit(); } catch (Exception e) { LOG.warn("table '"+tableName+"' is already exist! so we do anything"); } finally { st.close(); } } //init // private void initialize() throws Exception { // if(!this.initialized) { // createConnection(driverClassName, URL); //// dropTables(tableName); // createTable(tableName); // System.out.println("------------------create ----------------------"); // this.initialized = true; // } // } }
更简单的代码:
package com.my.hbase; /** * Created by foreverhui on 2015/1/16. */ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; public class FromHBaseToMysqlExample { public static class HBaseMapper extends TableMapper<ImmutableBytesWritable, Text>{ @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { for(Cell kv:value.rawCells()){ //Text out=new Text(Bytes.toString(kv.getFamilyArray())+"|"+Bytes.toString(kv.getQualifierArray())+"|"+Bytes.toString(kv.getValueArray())); String primaryKey=Bytes.toString(kv.getRowArray()); String dataRow=Bytes.toString(kv.getValueArray()); //todo //解析 dataRow insert into mysql //context.write(new ImmutableBytesWritable(kv.getRowArray()), out); } } } /** * @param args */ public static void main(String[] args)throws Exception { Configuration conf=HBaseConfiguration.create(); conf.set("from.table", "testtable"); //conf.set("family", "family1"); Job job=new Job(conf,"hbase to hbase"); job.setJarByClass(FromHBaseToMysqlExample.class); TableMapReduceUtil.initTableMapperJob(conf.get("from.table"), new Scan(), HBaseMapper.class,ImmutableBytesWritable.class, Text.class, job); System.exit(job.waitForCompletion(true)?0:1); } }
相关推荐
使用spark读取hbase中的数据,并插入到mysql中
该案例中主要使用MapReduce作为处理组件进行数据处理,实现的案例有如通过javaapi实现hbase数据写入hdfs、hbase表数据复制到另一个表中等操作 对应(《HBase分布式存储系统应用》胡鑫喆 张志刚著)教材中案例
hadoop1.1.2操作例子 包括hbase hive mapreduce相应的jar包
mapreduce方式入库hbase hive hdfs,速度很快,里面详细讲述了代码的编写过程,值得下载
HBase MapReduce完整实例.rar
kettle集群搭建以及使用kettle将mysql数据转换为Hbase数据
根据mysql中数据库配置表信息查询mysql中数据,将部分处理为json格式,上传到hbase中。
Eclipse工程 HBase MapReduce完整实例 可远程执行 包含HBase增删改查 执行Test可看到效果
java操作Hbase之从Hbase中读取数据写入hdfs中源码,附带全部所需jar包,欢迎下载学习。
#资源达人分享计划#
NULL 博文链接:https://jsh0401.iteye.com/blog/2096103
htabse 命令的基本操作步骤,1.熟悉使用HBase操作常用的Shell命令。 2.学会表和族的属性操作。 3.学会Filter操作。 4.学会时间戳和数据版本的操作。 学会数据批量导入。
HBase的 HBase MapReduce投影
对Hadoop中的HDFS、MapReduce、Hbase系列知识的介绍。如果想初略了解Hadoop 可下载观看
│ Day16[Hbase 企业应用及与MapReduce集成].pdf ├─02_视频 │ Day1601_Hbase Java API-环境配置.mp4 │ Day1603_Hbase Java API-put、delete.mp4 │ Day1604_Hbase Java API-Scan和过滤器.mp4 │ Day1605_Hbase...
hbase-rdd, 从HBase读取并写入 RDD ? 这个项目允许将 Apache Spark 连接到 HBase 。 现在可以在 Scala 2.10和版本 2.11上使用Spark和 CDH5.0. 版本,在版本/版本 0.2.2-SNAPSHOT 工作时使用Spark和版本
人工智能-hadoop
利用hadoop的mapreduce把oracle/mysql中的数据导入到hbase和hdfs中的两个java程序
MySQL通过sqoop工具用命令将数据导入到hbase的代码文件