- 浏览: 249020 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
leibnitz:
有几点要请教下;a.在二阶段里有这样一句:引用例如如果一个 p ...
zookeeper源码学习 -
nettm:
不错,我也遇到了第一个问题
mongodb客户端错误集合 -
lingqi1818:
xiaoych 写道很好,研究了一年多了吧,哈哈 难得你上 ...
80x86系统启动原理 -
xiaoych:
很好,研究了一年多了吧,哈哈
80x86系统启动原理 -
pengpeng:
pengpeng 写道很强大。我觉得mas-slave那块可以 ...
分布式计算需求场景以及解决方案
最近有个需求要对mysql的全量数据迁移到hbase,虽然hbase的设计非常利于高效的读取,但是它的compaction实现对海量数据写入造成非常大的影响,数据到一定量之后,就开始抽风。
分析hbase的实现,不管其运行的机制,其最终存储结构为分布式文件系统中的hfile格式。
刚好hbase的源代码中提供一个HFileOutputFormat类,分析其源代码可以看到:
可以看到,它的工作流程就是首先根据你的配置文件初始化,然后写成hfile的格式。
这里我做了个偷懒的demo:
执行然之后,会在hdfs的/tmp目录下生成一份文件。注意批量写数据的时候一定要保证key的有序性
这个时候,hbase自己提供的一个基于jruby的loadtable.rb脚本就可以发挥作用了。
它的格式是loadtable.rb 你希望的表明 hdfs路径:
hbase org.jruby.Main loadtable.rb offer hdfs://user/root/importoffer/_temporary/_attempt__0000_r_000000_0/
执行完之后:
运行./hbase shell
>list
就会显示刚才导入的offer表了。
分析hbase的实现,不管其运行的机制,其最终存储结构为分布式文件系统中的hfile格式。
刚好hbase的源代码中提供一个HFileOutputFormat类,分析其源代码可以看到:
/** * Copyright 2009 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; import java.util.Map; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.mortbay.log.Log; /** * Writes HFiles. Passed KeyValues must arrive in order. * Currently, can only write files to a single column family at a * time. Multiple column families requires coordinating keys cross family. * Writes current time as the sequence id for the file. Sets the major compacted * attribute on created hfiles. * @see KeyValueSortReducer */ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> { public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { // Get the path of the temporary output file final Path outputPath = FileOutputFormat.getOutputPath(context); final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath(); Configuration conf = context.getConfiguration(); final FileSystem fs = outputdir.getFileSystem(conf); // These configs. are from hbase-*.xml final long maxsize = conf.getLong("hbase.hregion.max.filesize", 268435456); final int blocksize = conf.getInt("hfile.min.blocksize.size", 65536); // Invented config. Add to hbase-*.xml if other than default compression. final String compression = conf.get("hfile.compression", Compression.Algorithm.NONE.getName()); return new RecordWriter<ImmutableBytesWritable, KeyValue>() { // Map of families to writers and how much has been output on the writer. private final Map<byte [], WriterLength> writers = new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR); private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY; private final byte [] now = Bytes.toBytes(System.currentTimeMillis()); public void write(ImmutableBytesWritable row, KeyValue kv) throws IOException { long length = kv.getLength(); byte [] family = kv.getFamily(); WriterLength wl = this.writers.get(family); if (wl == null || ((length + wl.written) >= maxsize) && Bytes.compareTo(this.previousRow, 0, this.previousRow.length, kv.getBuffer(), kv.getRowOffset(), kv.getRowLength()) != 0) { // Get a new writer. Path basedir = new Path(outputdir, Bytes.toString(family)); if (wl == null) { wl = new WriterLength(); this.writers.put(family, wl); if (this.writers.size() > 1) throw new IOException("One family only"); // If wl == null, first file in family. Ensure family dir exits. if (!fs.exists(basedir)) fs.mkdirs(basedir); } wl.writer = getNewWriter(wl.writer, basedir); Log.info("Writer=" + wl.writer.getPath() + ((wl.written == 0)? "": ", wrote=" + wl.written)); wl.written = 0; } kv.updateLatestStamp(this.now); wl.writer.append(kv); wl.written += length; // Copy the row so we know when a row transition. this.previousRow = kv.getRow(); } /* Create a new HFile.Writer. Close current if there is one. * @param writer * @param familydir * @return A new HFile.Writer. * @throws IOException */ private HFile.Writer getNewWriter(final HFile.Writer writer, final Path familydir) throws IOException { close(writer); return new HFile.Writer(fs, StoreFile.getUniqueFile(fs, familydir), blocksize, compression, KeyValue.KEY_COMPARATOR); } private void close(final HFile.Writer w) throws IOException { if (w != null) { StoreFile.appendMetadata(w, System.currentTimeMillis(), true); w.close(); } } public void close(TaskAttemptContext c) throws IOException, InterruptedException { for (Map.Entry<byte [], WriterLength> e: this.writers.entrySet()) { close(e.getValue().writer); } } }; } /* * Data structure to hold a Writer and amount of data written on it. */ static class WriterLength { long written = 0; HFile.Writer writer = null; } }
可以看到,它的工作流程就是首先根据你的配置文件初始化,然后写成hfile的格式。
这里我做了个偷懒的demo:
HFileOutputFormat hf = new HFileOutputFormat(); HBaseConfiguration conf = new HBaseConfiguration(); conf.addResource(new Path("/home/performance/softs/hadoop/conf/core-site.xml")); conf.set("mapred.output.dir", "/tmp"); conf.set("hfile.compression", Compression.Algorithm.LZO.getName()); TaskAttemptContext context = new TaskAttemptContext(conf, new TaskAttemptID()); RecordWriter writer = hf.getRecordWriter(context); KeyValue kv = new KeyValue(Bytes.toBytes("1111111111111"), Bytes.toBytes("offer:action"), System.currentTimeMillis(), Bytes.toBytes("test")); KeyValue kv1 = new KeyValue(Bytes.toBytes("1111111111111"), Bytes.toBytes("offer:id"), System.currentTimeMillis(), Bytes.toBytes("123")); KeyValue kv3 = new KeyValue(Bytes.toBytes("1111111111112"), Bytes.toBytes("offer:action"), System.currentTimeMillis(), Bytes.toBytes("test")); KeyValue kv4 = new KeyValue(Bytes.toBytes("1111111111112"), Bytes.toBytes("offer:id"), System.currentTimeMillis(), Bytes.toBytes("123")); writer.write(null, kv); writer.write(null, kv1); writer.write(null, kv3); writer.write(null, kv4); writer.close(context);
执行然之后,会在hdfs的/tmp目录下生成一份文件。注意批量写数据的时候一定要保证key的有序性
这个时候,hbase自己提供的一个基于jruby的loadtable.rb脚本就可以发挥作用了。
它的格式是loadtable.rb 你希望的表明 hdfs路径:
hbase org.jruby.Main loadtable.rb offer hdfs://user/root/importoffer/_temporary/_attempt__0000_r_000000_0/
执行完之后:
运行./hbase shell
>list
就会显示刚才导入的offer表了。
评论
3 楼
Angel_Night
2012-03-26
解决了我的难题啊
十分感谢
十分感谢
2 楼
依然仰望天空
2011-11-04
很不错,学习
1 楼
pengpeng
2011-02-18
牛 ~~ 学习了
发表评论
-
spring mvc介绍
2014-06-29 18:18 821项目中用到了spring mvc,整理个文档给新手入门使用,欢 ... -
【转】GCC内嵌汇编
2012-07-26 15:37 972http://wenku.baidu.com/view/58f ... -
汇编和C相互调用
2012-06-06 11:28 987这里有几个原则: 1.调用者需要在调用前声明被调用者。 c的做 ... -
commons-io引起的ygc问题
2012-05-18 16:49 1308今天接到任务,图片上传服务器的性能有问题,高峰期间YGC频率在 ... -
mongodb客户端错误集合
2011-12-20 10:38 13497错误一: 调用代码: String map = "f ... -
jmeter java请求参数配置
2011-12-20 10:36 2131<JavaSampler guiclass=&quo ... -
openfire简介
2011-09-23 15:07 26882详细文章请下载附件。。。。。。 Openfire简介 ... -
计算机缓存漫谈
2011-06-27 16:36 982见附件 见附件 见附件 -
va_list和vsnprintf
2011-06-22 15:40 1197http://blog.sina.com.cn/s/blog_ ... -
memcached源代码分析
2011-06-17 11:12 4902目录 一. 概述... 3 二 ... -
[转]关于SASL的介绍文档
2011-05-20 11:11 2http://docs. ... -
jetty服务器性能调整过程分析
2011-05-13 10:27 2625见附件 见附件 见附件 -
【转】“INT 21H”指令说明及使用方法
2011-04-07 14:29 2326很多初学汇编语言的同学可能会对INT 21H这条指令感到困 ... -
hbase-0.20.6数据写入服务端代码性能瓶颈分析
2011-03-29 16:33 1733目前我的实际配置是4台8核CPU,装4个regionServe ... -
再见c3p0
2011-03-28 16:24 1147c3p0已经很久不维护了,以后java数据库连接池的代码打算都 ... -
无侵入,系统性能监测程序,配置简单,欢迎下载
2011-03-21 09:51 2919本外挂主要目的是对系 ... -
keepalive的来龙去脉
2011-03-02 09:35 4713今天有同事反应在性能测试环境cpu load很高有500多,我 ... -
深入浅出IO程序设计—序
2011-02-24 10:31 1294作为一个程序员,除了 ... -
hbase&hadoop初探
2011-02-17 10:44 928见附件。。。 -
服务端到手机端的推送方式
2011-02-11 11:13 12691.无连接的方式 前提条件服务端知道客户端的IP地址,并且客户 ...
相关推荐
本文当是一个基于HBase的海量数据的实实时查询系统的原理分析。详细的介绍了大数据查询的原理。
从HBase的集群搭建、HBaseshell操作、java编程、架构、原理、涉及的数据结构,并且结合陌陌海量消息存储案例来讲解实战HBase 课程亮点 1,知识体系完备,从小白到大神各阶段读者均能学有所获。 2,生动形象,化繁为...
为解决现有的HBase数据压缩策略选择方法未考虑数据的冷热性,以及在选择过程中存在片面性和不可靠性的缺陷,提出了基于HBase数据分类的压缩策略选择方法。依据数据文件的访问频度将HBase数据划分为冷热数据,并限定具体...
本文档详细的描述了如何采用HBase存储海量图片,以及如何将大批量的小文件写成sequenceFile文件格式。
│ Day15[Hbase 基本使用及存储设计].pdf │ ├─02_视频 │ Day1501_Hbase的介绍及其发展.mp4 │ Day1502_Hbase中的特殊概念.mp4 │ Day1503_Hbase与MYSQL的存储比较.mp4 │ Day1504_Hbase部署环境准备.mp4 │ Day...
HIVE建表时可以指定映射关系直接读取HBASE的数据,相当于有了一个HBASE向HIVE的通道。那HIVE向HBASE有通道吗?本文主要讲述了Hive库数据如何入到HBASE中。
项目中自己用的,弄了很久,需要把数据从各种数据源导入到,我们的原始数据库,原始数据库采用hbase,来存储所有数据,那么这里就用的nifi,从其他数据源获取数据以后,然后导入到Hbase中去,这个是设计好的流程模板. ...
springboot搭建的hbase可视化界面 支持hbase的建表与删除 支持根据rowkey查询数据
Hbase笔记 —— 利用JavaAPI的方式操作Hbase数据库(往hbase的表中批量插入数据)
使用spark读取hbase中的数据,并插入到mysql中
kettle集群搭建以及使用kettle将mysql数据转换为Hbase数据
HBASE的一个读取数据流程的解析,清晰的画出整个过程,十分有利于理解
python 连接hbase 打印数据。hbase 的一些源数据未转化
tsv格式的数据库测试文件,hbase可以通过采用importtsv导入外部数据到hbase中
HBase基本数据操作详解,分享给大家!
分布式数据库HBase在大规模数据加载中较传统关系型数据库有较大的优势但也存在很大的优化空间.基于Hadoop分布式平台搭建HBase环境,并优化自定义数据加载算法.首先,分析HBase底层数据存储,实验得出HBase自带数据加载...
MySQL通过sqoop工具用命令将数据导入到hbase的代码文件
介绍了大数据平台如何将hdfs中的分布式文件导入hbase 。源代码在cloudera-SCM 的cdh 4.8.1产品环境中验证通过。
hbase备份和数据恢复,hbase与hive的互导,hbase和hdfs互导。