1.LoadIncrementalHFiles.doBulkLoad(Path hfofDir, HTable table)
首先用discoverLoadQueue方法扫描出hfofDir下有哪些fhile文件,再循环调用tryLoad方法把每个文件load进去,这是一个串行的过程。
Deque<LoadQueueItem> queue = null;
queue = discoverLoadQueue(hfofDir);
while (!queue.isEmpty()) {
LoadQueueItem item = queue.remove();
tryLoad(item, conn, table.getTableName(), queue);
}
2.LoadIncrementalHFiles.discoverLoadQueue(Path hfofDir)
hfofDir下是两层目录结构family-->hfile,因此二重循环遍历每个hfile文件,加到Deque里返回,hfofDir下以"_"开头的不是family目录。
LoadQueueItem的数据结构用于记录family和hfile
final byte[] family;
final Path hfilePath;
FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);//第一层目录
Deque<LoadQueueItem> ret = new LinkedList<LoadQueueItem>();
for (FileStatus stat : familyDirStatuses) {
Path familyDir = stat.getPath();
// Skip _logs, etc
if (familyDir.getName().startsWith("_")) continue;//以"_"开头的不是family目录。
byte[] family = familyDir.getName().getBytes();
Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));//第二层目录
for (Path hfile : hfiles) {
if (hfile.getName().startsWith("_")) continue;
ret.add(new LoadQueueItem(family, hfile));
}
}
3.LoadIncrementalHFiles.tryLoad(final LoadQueueItem item,
HConnection conn, final byte[] table,
final Deque<LoadQueueItem> queue)
首先检查当前的hfile所属的region是否已经发生分裂,如果发生分裂,则将hfile分裂成匹配新region的两个hfile,并将这两个hfile放入deque;哪果没有发生分裂,则调用region所在server的bulkLoadHFile方法将hfile导入。重点是以下几句
if (!hri.containsRange(first, last)) {//判断包含firstkey的当前region是否包含hfile的startkey和endkey,如果不包含说明当前region是分裂过的
LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
"region. Splitting...");
HColumnDescriptor familyDesc = hri.getTableDesc().getFamily(item.family);
Path botOut = new Path(tmpDir, hri.getEncodedName() + ".bottom");
Path topOut = new Path(tmpDir, hri.getEncodedName() + ".top");
splitStoreFile(getConf(), hfilePath, familyDesc, hri.getEndKey(),
botOut, topOut);//以当前region的endkey为中值分裂hfile为两个,文件存为.bottom和.top
// Add these back at the *front* of the queue, so there's a lower
// chance that the region will just split again before we get there.
//.bottom和.top重新放回queue
queue.addFirst(new LoadQueueItem(item.family, botOut));
queue.addFirst(new LoadQueueItem(item.family, topOut));
LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
return null;
}
byte[] regionName = location.getRegionInfo().getRegionName();
server.bulkLoadHFile(hfilePath.toString(), regionName, item.family);//如果包含,直接调用region所在server的bulkLoadHFile方法将hfile导入
return null;
4.LoadIncrementalHFiles.splitStoreFile(
Configuration conf, Path inFile,
HColumnDescriptor familyDesc, byte[] splitKey,
Path bottomOut, Path topOut)
//以splitKey为中值,将inFile拷贝分裂为bottomOut和topOut两个文件
Reference topReference = new Reference(splitKey, Range.top);
Reference bottomReference = new Reference(splitKey, Range.bottom);
copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
5.HRegionServer.bulkLoadHFile(String hfilePath, byte[] regionName,
byte[] familyName)
这是个HRegionInterface下的远程调用,是在regionserver中执行的。
checkOpen();//检查region是否已停,已经停了便不再导
HRegion region = getRegion(regionName);//从regionserver中拿到region
region.bulkLoadHFile(hfilePath, familyName);//这步才开始导
6.HRegion.bulkLoadHFile(String hfilePath, byte[] familyName)
throws IOException {
startRegionOperation();//上读锁
try {
Store store = getStore(familyName);
if (store == null) {
throw new DoNotRetryIOException(
"No such column family " + Bytes.toStringBinary(familyName));
}
store.bulkLoadHFile(hfilePath);//调store的同名方法
} finally {
closeRegionOperation();//解读锁
}
}
7.Store.bulkLoadHFile(String srcPathStr)
就三步:首先将hfile重命名到store的目录下;其次将hfile包装成StoreFile对象装载到Store的列表里。在这两步之前是再一次检查region的startkey和endkey是否跟hfile的匹配
//再次检查是否匹配region
HRegionInfo hri = region.getRegionInfo();
if (!hri.containsRange(firstKey, lastKey)) {
throw new WrongRegionException(
"Bulk load file " + srcPathStr + " does not fit inside region "
+ this.region);
}
//挪文件
Path srcPath = new Path(srcPathStr);
Path dstPath = StoreFile.getRandomFilename(fs, homedir);
LOG.info("Renaming bulk load file " + srcPath + " to " + dstPath);
StoreFile.rename(fs, srcPath, dstPath);
//装载文件以提供在线服务
// Append the new storefile into the list
this.lock.writeLock().lock();//加store的写锁
try {
ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(storefiles);
newFiles.add(sf);
this.storefiles = sortAndClone(newFiles);
notifyChangedReadersObservers();
} finally {
this.lock.writeLock().unlock();//解store的写锁
}
分享到:
相关推荐
博客配套文件,详细演示了如何通过MR程序的方式bulkload数据到hbase,代码可直接用于生产环境。
《hbase权威指南》随书示例源代码.方便学习
hbase0.94java源代码 希望对大家有帮助
VC代码 hbase1.0 (实用代码源).rarVC代码 hbase1.0 (实用代码源).rarVC代码 hbase1.0 (实用代码源).rarVC代码 hbase1.0 (实用代码源).rarVC代码 hbase1.0 (实用代码源).rarVC代码 hbase1.0 (实用代码源).rarVC代码 ...
hbase权威指南书中的源代码, hbase the definitive guide 源代码
hbase权威指南 源代码 英文
hbase权威指南.源代码.绝对经典。。 下载后评分+评论,即可返回分数.
Hbase权威指南 随书源代码 源码包 绝对完整版 maven工程,带pom文件,可以直接作为一个完整工程导入eclipse等ide。
通过使用 WAL 和缓冲的 Put 从 Hdfs 文件中摄取 HBase 记录 通过 WAL(使用 Put)将具有 PARQUET 格式的 hdfs 文件加载到 Hbase 的包。 该包基于仅使用 Mapper 加载表。 很快我将添加如何使用 reducer 以及使用 MR ...
hbase操作必备客户端源代码
工作中遇到的问题 即 bulkload 实现hive 到Hbase批量写入数据 工作中遇到的问题 即 bulkload 实现hive 到Hbase批量写入数据
HBase基本操作 增删改查 java代码 要使用须导入对应的jar包
介绍了大数据平台如何将hdfs中的分布式文件导入hbase 。源代码在cloudera-SCM 的cdh 4.8.1产品环境中验证通过。
java操作Hbase之Hbase专用过滤器PageFilter的使用源代码,附带全部所需源代码,欢迎下载学习。
hbase源码,适合研究分析底层实现。对hbase的原理的理解很有好处
hbase 常用参数含义,默认值,调优建议(必须参数,split,compaction,blockcache,memstore flush,hlog,zookeeper,其他,等相参数名称、含义、默认值、调优建议)
Hbase HLog源代码阅读笔记 HBase是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File System)...
hbase批量加载 从RCFile进行HBase批量加载的临时代码 这将使用LoadIncrementalFiles从HFileOutputFormat2中Mapreduce写入的数据中加载HBase表。
hbase权威指南 配套源码,自己从官网上下载的,拿来分享下
hbase权威指南源码