package cmd;
/**
* MapReduce 读取hdfs上的文件,
* 以HTable.put(put)的方式在map中完成数据写入,无reduce过程
*/
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class HBaseImport extends Configured implements Tool {
static final Log LOG = LogFactory.getLog(HBaseImport.class);
public static final String JOBNAME = "MRImport ";
public static class Map extends
Mapper<LongWritable, Text, NullWritable, NullWritable> {
Configuration configuration = null;
HTable xTable = null;
private boolean wal = true;
static long count = 0;
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
super.cleanup(context);
xTable.flushCommits();
xTable.close();
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String all[] = value.toString().split("/t");
Put put = null;
if (all.length == 2) {
put = new Put(Bytes.toBytes(all[0]));
put.add(Bytes.toBytes("xxx"), Bytes.toBytes("20110313"),
Bytes.toBytes(all[1]));
}
if (!wal) {
put.setWriteToWAL(false);
}
xTable.put(put);
if ((++count % 100) == 0) {
context.setStatus(count + " DOCUMENTS done!");
context.progress();
System.out.println(count + " DOCUMENTS done!");
}
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
configuration = context.getConfiguration();
xTable = new HTable(configuration, "testKang");
xTable.setAutoFlush(false);
xTable.setWriteBufferSize(12 * 1024 * 1024);
wal = true;
}
}
@Override
public int run(String[] args) throws Exception {
String input = args[0];
Configuration conf = HBaseConfiguration.create(getConf());
conf.set("hbase.master", "m0:60000");
Job job = new Job(conf, JOBNAME);
job.setJarByClass(HBaseImport.class);
job.setMapperClass(Map.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.setInputPaths(job, input);
job.setOutputFormatClass(NullOutputFormat.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
int res = 1;
try {
res = ToolRunner.run(conf, new HBaseImport(), otherArgs);
} catch (Exception e) {
e.printStackTrace();
}
System.exit(res);
}
}
package data2hbase;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
public class DataToHBase {
private static final String TABLE_NAME = "TrafficInfo";
private static final String FAMILY_NAME = "cf";
private static final String INPUT_PATH = "hdfs://hadoop.master:9000/traffic_in.dat";
// private static final String OUT_PATH = "hdfs://hadoop.master:/9000/traffic_out.dat";
public static void main(String[] args) throws Exception {
// 创建table,
Configuration conf = new Configuration();
// conf.set("hbase.rootdir", "hdfs://hadoop.master:9000/hbase");
//使用eclipse时必须添加这个,否则无法定位
// conf.set("hbase.zookeeper.quorum",
// "hadoop.master,hadoop.slave0,hadoop.slave1,hadoop.slave2");
Configuration cf = HBaseConfiguration.create(conf);
HBaseAdmin hbaseAdmin = new HBaseAdmin(cf);
boolean tableExists = hbaseAdmin.tableExists(TABLE_NAME);
if (tableExists) {
hbaseAdmin.disableTable(TABLE_NAME);
hbaseAdmin.deleteTable(TABLE_NAME);
System.err.println("............del table: " + TABLE_NAME);
}
HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME.getBytes());
desc.addFamily(family);
hbaseAdmin.createTable(desc);
System.err.println(".................create table: " + TABLE_NAME);
// 1.1
conf = new Configuration();
// // 设置zookeeper
// conf.set("hbase.zookeeper.quorum",
// "hadoop.master,hadoop.slave0,hadoop.slave1,hadoop.slave2");
// 设置hbase表名称
conf.set(TableOutputFormat.OUTPUT_TABLE, TABLE_NAME);
conf.set("dfs.socket.timeout", "180000");
Job job = new Job(conf, DataToHBase.class.getName());
job.setJarByClass(DataToHBase.class);
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
// 1.2
job.setMapperClass(BatchImportMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 1.3
job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(1);
// 1.4
// job.setGroupingComparatorClass(cls);
// 1.5
// job.setCombinerClass(cls)
// 2.1
// 2.2
job.setReducerClass(BatchImportReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TableOutputFormat.class);
// 2.3
// FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
job.waitForCompletion(true);
// 只想批量导入
}
// static class BatchImportMapper extends TableMapper<Text, Text> {
// protected void map(
// org.apache.hadoop.hbase.io.ImmutableBytesWritable key,
// org.apache.hadoop.hbase.client.Result value,
// org.apache.hadoop.mapreduce.Mapper<org.apache.hadoop.hbase.io.ImmutableBytesWritable,
// org.apache.hadoop.hbase.client.Result, Text, Text>.Context context)
// throws java.io.IOException, InterruptedException {
// };
// }
static class BatchImportMapper extends
Mapper<LongWritable, Text, Text, Text> {
SimpleDateFormat simpleDataFormat = new SimpleDateFormat(
"yyyyMMddHHmmss");
protected void map(
LongWritable key,
Text value,
org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, Text>.Context context)
throws java.io.IOException, InterruptedException {
String[] split = value.toString().split("\t");
String time = split[0].trim();
System.err.println("=="+time+"==");
String formatDate = simpleDataFormat.format(new Date(Long
.parseLong(time)));
context.write(new Text(split[1]), new Text(split[1] + ":"
+ formatDate + "\t" + value.toString()));
};
}
static class BatchImportReducer extends TableReducer<Text, Text, Text> {
protected void reduce(
Text k2,
java.lang.Iterable<Text> v2s,
org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, org.apache.hadoop.io.Writable>.Context context)
throws java.io.IOException, InterruptedException {
for (Text text : v2s) {
String[] split = text.toString().split("\t");
String tableRowKey = split[0].trim();
String phoneNum = split[2];
String upPackNum = split[7];
String downPackNum = split[8];
String upPayLoad = split[9];
String downPayLoad = split[10];
String host = split[5];
Put put = new Put(tableRowKey.getBytes());
put.add(FAMILY_NAME.getBytes(), "phoneNum".getBytes(),
phoneNum.getBytes());
put.add(FAMILY_NAME.getBytes(), "upPackNum".getBytes(),
upPackNum.getBytes());
put.add(FAMILY_NAME.getBytes(), "downPackNum".getBytes(),
downPackNum.getBytes());
put.add(FAMILY_NAME.getBytes(), "upPayLoad".getBytes(),
upPayLoad.getBytes());
put.add(FAMILY_NAME.getBytes(), "downPayLoad".getBytes(),
downPayLoad.getBytes());
put.add(FAMILY_NAME.getBytes(), "host".getBytes(),
host.getBytes());
context.write(new Text(tableRowKey), put);
// HTable htable=new HTable(new Configuration(),TABLE_NAME);
// htable.put(put);
//
// context.write(new Text(tableRowKey), new Text(text));
}
};
}
}
分享到:
相关推荐
利用hadoop的mapreduce把oracle/mysql中的数据导入到hbase和hdfs中的两个java程序
mapreduce方式入库hbase hive hdfs,速度很快,里面详细讲述了代码的编写过程,值得下载
Eclipse工程 HBase MapReduce完整实例 可远程执行 包含HBase增删改查 执行Test可看到效果
HBase MapReduce完整实例.rar
基于Hadoop的mapreduce 在hbase上的使用,基于Hadoop的mapreduce 在hbase上的使用
该案例中主要使用MapReduce作为处理组件进行数据处理,实现的案例有如通过javaapi实现hbase数据写入hdfs、hbase表数据复制到另一个表中等操作 对应(《HBase分布式存储系统应用》胡鑫喆 张志刚著)教材中案例
hadoop1.1.2操作例子 包括hbase hive mapreduce相应的jar包
一、 HBase技术介绍 HBase简介 HBase – Hadoop Database,是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统... Sqoop则为HBase提供了方便的RDBMS数据导入功能,使得传统数据库数据向HBase中迁移变的非常方便。
NULL 博文链接:https://jsh0401.iteye.com/blog/2096103
hbase导入hbase导入
简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接
mapreduce基本数据读取,通俗易懂。 此项目情景为,遗传关系族谱。 (爷爷 、父母 、孩子) 经典案例
MapReduce--->实现简单的数据清洗需要的数据文件
对Hadoop中的HDFS、MapReduce、Hbase系列知识的介绍。如果想初略了解Hadoop 可下载观看
这是一篇博客的附件。 博客地址:http://blog.csdn.net/luckymelina/article/details/22889383
HBase – Hadoop Database,是一...Google运行MapReduce来处理Bigtable中的海量数据,HBase同样利用Hadoop MapReduce来处理HBase中的海量数据;Google Bigtable利用 Chubby作为协同服务,HBase利用Zookeeper作为对应。
HBase即Hadoop Database,是一个... Google运行MapReduce来处理Bigtable中的海量数据,HBase同样利用Hadoop MapReduce来处理HBase中的海量数据;Google Bigtable利用 Chubby作为协同服务,HBase利用Zookeeper作为对应。
HDFS+MapReduce+Hive+HBase十分钟快速入门.pdf
htabse 命令的基本操作步骤,1.熟悉使用HBase操作常用的Shell命令。 2.学会表和族的属性操作。 3.学会Filter操作。 4.学会时间戳和数据版本的操作。 学会数据批量导入。
#资源达人分享计划#