`

HBase-压缩和分割原理

 
阅读更多

 

HRegionServer调用合并请求

主要逻辑如下:

//遍历每个Store然后计算需要合并的文件,生成
//CompactionRequest对象并提交到线程池中执行
//根据throttleCompaction()函数规则来判断是提交到
//largeCompactions线程池还是smallCompactions线程池
CompactSplitThread#requestCompaction() {
	for (Store s : r.getStores().values()) {
		CompactionRequest cr = Store.requestCompaction(priority, request);
		ThreadPoolExecutor pool = s.throttleCompaction(cr.getSize())
          ? largeCompactions : smallCompactions;
      	pool.execute(cr);		
		ret.add(cr);
	}	
}

//如果CompactionRequest的总大小 >
//minFilesToCompact * 2 * memstoreFlushSize
//则这次任务为major合并,否则在为minor合并
Store#throttleCompaction() {
    long throttlePoint = conf.getLong(
        "hbase.regionserver.thread.compaction.throttle",
        2 * this.minFilesToCompact * this.region.memstoreFlushSize);
    return compactionSize > throttlePoint;		
}


Store#compactSelection() {
	//选择出已经过期的StoreFile
	if(storefile.maxTimeStamp + store.ttl < now_timestamp) {
		//返回已经过期的store file文件集合	
	}
	
	//从0开始遍历到最后,如果发现有文件 > maxCompactSize则pos++
	//然后过滤掉这些大于maxCompactSize的文件
	while (pos < compactSelection.getFilesToCompact().size() &&
             compactSelection.getFilesToCompact().get(pos).getReader().length()
               > while (pos < compactSelection.getFilesToCompact().size() &&
             compactSelection.getFilesToCompact().get(pos).getReader().length()
               > maxCompactSize &&
             !compactSelection.getFilesToCompact().get(pos).isReference()) ++pos;
      if (pos != 0) compactSelection.clearSubList(0, pos); &&
             !compactSelection.getFilesToCompact().get(pos).isReference()) {
		++pos;
	}
	if (pos != 0) {      	
		compactSelection.clearSubList(0, pos);
	}      
	if (compactSelection.getFilesToCompact().size() < minFilesToCompact) {
		return;	
	}
	
	//计算出sumSize数组,数组大小就是Store中的文件数量
	//sumSize数组中每个元素的大小是根据StroeFile的大小再加上 sumSize[i+1](或者0)
	//然后减去fileSizes[tooFar](或者0)
	//sumSize的内容跟元素的fileSizes数组应该差别不大
	int countOfFiles = compactSelection.getFilesToCompact().size();
	long [] fileSizes = new long[countOfFiles];
	long [] sumSize = new long[countOfFiles];
	for (int i = countOfFiles-1; i >= 0; --i) {
		StoreFile file = compactSelection.getFilesToCompact().get(i);
        fileSizes[i] = file.getReader().length();
        // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
        int tooFar = i + this.maxFilesToCompact - 1;
        sumSize[i] = fileSizes[i] + ((i+1    < countOfFiles) ? sumSize[i+1]      : 0)
			- ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
	}
	
	//如果fileSize[start] > Math.max(minCompactSize,sumSize[start+1] * r)
	//则下标++,这里的操作是过滤掉过大的文件,以免影响合并时间
	while(countOfFiles - start >= this.minFilesToCompact && fileSizes[start] >
		Math.max(minCompactSize, (long)(sumSize[start+1] * r))) {
        ++start;
	}
	int end = Math.min(countOfFiles, start + this.maxFilesToCompact);
	long totalSize = fileSizes[start] + ((start+1 < countOfFiles) ? sumSize[start+1] : 0);
		compactSelection = compactSelection.getSubList(start, end);
		
	//如果是major compact,并且需要执行的文件数量过多,则去掉一些	
	if(majorcompaction && compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
		int pastMax = compactSelection.getFilesToCompact().size() - this.maxFilesToCompact;
        compactSelection.getFilesToCompact().subList(0, pastMax).clear();		
	}		
}

 

 

 

 

 

CompactionRequest线程(用于执行major和minor合并)

压缩相关的类图如下:



major和minor合并的差别其实很小,如果最后待合并的总大小 > 2*minFilesToCompact*memstoreFlushSize

则认为这次是一个major合并,方到major线程池中执行,否则认为是一次minor合并

另外在创建StoreScanner构造函数时,会根据ScanType来判断是major还是minor合并,之后在

ScanQueryMathcer中根据ScanType的不同(有用户类型,minor和major三种类型)来决定返回的不同值的

主要逻辑如下:

//在单独的线程中执行合并
CompactionRequest#run() {
	boolean completed = HRegion.compact(this);
	if (completed) {
		if (s.getCompactPriority() <= 0) {
			server.getCompactSplitThread().requestCompaction(r, s, "Recursive enqueue", null);
		} else {
			// see if the compaction has caused us to exceed max region size
			server.getCompactSplitThread().requestSplit(r);
		}		
	}
}

//这里会调用Store,来执行compact
HRegion#compact() {
	Preconditions.checkArgument(cr.getHRegion().equals(this));
	lock.readLock().lock();
	CompactionRequest.getStore().compact(cr);
	lock.readLock().unlock();		
}

//完成合并,调用Compactor#compact()完成最核心的compact逻辑
//将合并后的文件移动到最终目录下并删除掉旧的文件
Store#compact() {
	List<StoreFile> filesToCompact = request.getFiles();
	StoreFile.Writer writer = this.compactor.compact(cr, maxId);
	if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
        sf = completeCompaction(filesToCompact, writer);	
	}else {
        // Create storefile around what we wrote with a reader on it.
		sf = new StoreFile(this.fs, writer.getPath(), this.conf, this.cacheConf,
          this.family.getBloomFilterType(), this.dataBlockEncoder);
        sf.createReader();
	}
}

//将 /hbase/mytable/963cf86f3fd07c3d3161c1f4f15bef5a/.tmp/9c8614a6bd0d4833b419a13abfde5ac1
//移动到
// /hbase/mytable/963cf86f3fd07c3d3161c1f4f15bef5a/value/9c8614a6bd0d4833b419a13abfde5ac1
//再对新的目标文件创建一个StroeFile对象包装
//将旧的文件(这些底层的HFile都已经合并成一个文件了)删除
//最后计算新的StoreFile文件大小等信息并返回
Store#completeCompaction() {
	Path origPath = compactedFile.getPath();
	Path destPath = new Path(homedir, origPath.getName());		
	HBaseFileSystem.renameDirForFileSystem(fs, origPath, destPath);
	StoreFile result = new StoreFile(this.fs, destPath, this.conf, this.cacheConf,
          this.family.getBloomFilterType(), this.dataBlockEncoder);
	passSchemaMetricsTo(result);
	result.createReader();	
}


//compact的最核心逻辑!!
//对多个StoreFile进行合并,这里使用到了StoreScanner
//迭代读取所有的StroeFile然后使用堆排序输出,并写入到
//StoreFile$Writer#append()中
Compactor#compact() {
    for (StoreFile file : filesToCompact) {
		StoreFile.Reader r = file.getReader();	
		long keyCount = (r.getBloomFilterType() == store.getFamily()
          .getBloomFilterType()) ?
          r.getFilterEntries() : r.getEntries();	
		maxKeyCount += keyCount;          	
	}
	
	int compactionKVMax = getConf().getInt("hbase.hstore.compaction.kv.max", 10);
	Compression.Algorithm compression = store.getFamily().getCompression();
	List<StoreFileScanner> scanners = StoreFileScanner
      .getScannersForStoreFiles(filesToCompact, false, false, true);
	Scan scan = new Scan();
	scan.setMaxVersions(store.getFamily().getMaxVersions());     
	
        //这里会根据当前合并的类型选择ScanType的类型,之后ScanQueryMatcher根据ScanType的
        //的类型返回不同的值
        InternalScanner scanner = new StoreScanner(store, store.getScanInfo(), scan, scanne        rs,majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
		smallestReadPoint, earliestPutTs);      
		
	do {
		hasMore = scanner.next(kvs, compactionKVMax);
		if (writer == null && !kvs.isEmpty()) {
			//在tmp目录下创建一个临时文件,路径类似
			// /hbase/mytable/963cf86f3fd07c3d3161c1f4f15bef5a/.tmp/9c8614a6bd0d4833b419a13abfde5ac1
			writer = store.createWriterInTmp(maxKeyCount, compactionCompression, true,
			  maxMVCCReadpoint >= smallestReadPoint);
		}
		for (KeyValue kv : kvs) {
			writer.append(kv);	
		}		
	}while(hasMore);
	
	scanner.close();
	StoreFile$Writer.appendMetadata(maxId, majorCompaction);
	StoreFile$Writer.close();	
}

 

压缩算法和的核心逻辑演示类图

根据由新到老排序文件,选择出合适的文件

这里的滑动窗口是从0下标开始过滤掉size过大的文件,这样可以提高合并效率


 

 

 

 

 

使用到的一些重要类

其中内部scan的时候使用到的相关类图如下


相关重要的类:

Hbase在实现该算法的过程中重要的是下面这五个类。 
1.org.apache.hadoop.hbase.regionserver.Store 
2.org.apache.hadoop.hbase.regionserver.StoreScanner 
3.org.apache.hadoop.hbase.regionserver.StoreFileScanner 
4.org.apache.hadoop.hbase.regionserver.KeyValueHeap 
5.org.apache.hadoop.hbase.regionserver.ScanQueryMatcher 

这五个类的关系是 
1.Store类调用StoreScanner的next方法,并循环输出kv到合并文件; 
2.StoreScanner的作用是负责创建并持有多个输入文件的StoreFileScanner,
	内部遍历这些StoreFileScanner并通过KeyValueHeap来排序这些输入文件的首条记录; 
3.StoreFileScanner的作用是遍历单个输入文件,管理并提供单个输入文件的首条记录; 
4.KeyValueHeap的作用就是通过堆来排序每个输入文件的首条记录。 
5.ScanQueryMatcher的作用是当输入文件的首条记录来的时候,根据一定的策略判断这条记录到底是该输出还是该跳过。 

 

StoreScanner及相关类的主要逻辑如下:

//内部应用StoreFileScanner列表,创建ScanQueryMatcher用来判断是过滤还是输出
//创建KeyValueHeap用于堆排序,根据堆的结构每次从堆顶拿出一个
//注意这个构造函数中有一个参数ScanType,是扫描的类型,包括MAJOR_COMPACT,MINOR_COMPACT,
//USER_COMPACT来返回不同的值,以达到major或minor的效果
StoreScanner#构造函数() {
	ScanQueryMatcher matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType,
        smallestReadPoint, earliestPutTs, oldestUnexpiredTS);	
	List<? extends KeyValueScanner> scanners = selectScannersFrom(scanners);
	for(KeyValueScanner scanner : scanners) {
		scanner.seek(matcher.getStartKey());
    }	
    KeyValueHeap heap = new KeyValueHeap(scanners, store.comparator);     
}

//选择性的创建布隆过滤器,调用HFileWriterv2的append()
//写入KeyValue信息
StoreFile$Writer#append() {
      appendGeneralBloomfilter(kv);
      appendDeleteFamilyBloomFilter(kv);
      HFileWriterV2.append(kv);
      trackTimestamps(kv);	
}

//这个方法封装了处理heap取出的记录值的逻辑,
//根据matcher对该值的判断来决定这个值是输出还是跳过
StoreSanner#next() {
 KeyValue peeked = this.heap.peek();
    if (peeked == null) {
      close();
      return false;
    }	
	LOOP: 
	while((kv = this.heap.peek()) != null) {    
		ScanQueryMatcher.MatchCode qcode = matcher.match(kv);
        	switch(qcode) {
          		case INCLUDE:
          		case INCLUDE_AND_SEEK_NEXT_ROW:
          		case INCLUDE_AND_SEEK_NEXT_COL:
            		Filter f = matcher.getFilter();
            		outResult.add(f == null ? kv : f.transform(kv));
            		count++;
            		if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
              			if (!matcher.moreRowsMayExistAfter(kv)) {
                			return false;
              			}
              			reseek(matcher.getKeyForNextRow(kv));
            		} else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
              			reseek(matcher.getKeyForNextColumn(kv));
            		} else {
              			this.heap.next();
            		}          		        				
            		cumulativeMetric += kv.getLength();
            		if (limit > 0 && (count == limit)) {
              			break LOOP;
            		}
            		continue;
          		case DONE:
            		return true;
          		case DONE_SCAN:
            		close();
            		return false;		
          		case SEEK_NEXT_ROW:  
					if (!matcher.moreRowsMayExistAfter(kv)) {
              			return false;
            		}
            		reseek(matcher.getKeyForNextRow(kv));
            		break;  
          		case SEEK_NEXT_COL:
            		reseek(matcher.getKeyForNextColumn(kv));
            		break;
          		case SKIP:
            		this.heap.next();
            		break;
          		case SEEK_NEXT_USING_HINT:
            		KeyValue nextKV = matcher.getNextKeyHint(kv);
            		if (nextKV != null) {
              			reseek(nextKV);
            		} else {
              			heap.next();
            		}
            		break;
          		default:
            		throw new RuntimeException("UNEXPECTED");              	
    }//end while
}

//KeyValueHeap使用堆排序输出结果
//内部使用了优先队列,再用KVScannerComparator
//作为比较工具
KeyValueHeap#构造函数() {
	this.comparator = new KVScannerComparator(comparator);
	heap = new PriorityQueue<KeyValueScanner>(scanners.size(),
          this.comparator);
	for (KeyValueScanner scanner : scanners) {
		if (scanner.peek() != null) {
			this.heap.add(scanner);
        } else {
			scanner.close();
        }
	}
	this.current = pollRealKV();          
}

//堆里面最重要的方法其实就是next,不过看这个方法的主要功能不是
//为了算出nextKeyValue,而主要是为了算出nextScanner,然后需在外部
//再次调用peek方法来取得nextKeyValue,不是很简练。
KeyValueHeap#next() {
	InternalScanner currentAsInternal = (InternalScanner)this.current;
    boolean mayContainMoreRows = currentAsInternal.next(result, limit, metric);
    KeyValue pee = this.current.peek();
	if (pee == null || !mayContainMoreRows) {
		this.current.close();
    } else {
		this.heap.add(this.current);
    }
    this.current = pollRealKV();
    return (this.current != null);    
}

//这里省略了其他部分,注意这里有两个赋值
//对于compact来说如果是minor类型的则不会删除掉DELETE类型的KeyValue
//而major类型在最终输出的时候会删除掉DELETE类型的KeyValue标记
ScanQueryMatcher#构造函数() {
	//.....
    /* how to deal with deletes */
    this.isUserScan = scanType == ScanType.USER_SCAN;
    this.retainDeletesInOutput = scanType == ScanType.MINOR_COMPACT || scan.isRaw();	
	//..
}

 

 

 

 

 

HRegionServer调用split请求


执行逻辑如下:

//切分region
HRegionServer#splitRegion() {
	HRegion region = getRegion(regionInfo.getRegionName());
    region.flushcache();
    region.forceSplit(splitPoint);
    compactSplitThread.requestSplit(region, region.checkSplit());		
}

//创建SplitRequest对象,放到线程池中执行
CompactSplitThread#requestSplit() {
	ThreadPoolExecutor#execute(new SplitRequest(r, midKey, HRegionServer.this));	
}

  

 

 

 

 

split线程执行过程


 

META表更新的瞬间

主要逻辑如下:

//在单线中执行
SplitRequest#run() {
	SplitTransaction st = new SplitTransaction(parent, midKey);
	if (!st.prepare()) {
		return;	
	}
	st.execute(this.server, this.server);
}

//核心逻辑,先创建两个子region,再创建临时的ZK节点
//将父region切分,创建临时目录,将region关闭
//开始切分,将storefile放到目录中
//创建子regionA和B,同时open这两个region,更新META信息
//更新ZK信息,将原region下线
SplitTransaction#execute() {
    PairOfSameType<HRegion> regions = createDaughters(server, services);
    openDaughters(server, services, regions.getFirst(), regions.getSecond());
    transitionZKNode(server, services, regions.getFirst(), regions.getSecond());		
}


//预先创建两个子region
SplitTransaction#prepare() {
	HRegionInfo hri = parent.getRegionInfo();
	hri_a = new HRegionInfo(hri.getTableName(), startKey, splitrow, false, rid);
	hri_b = new HRegionInfo(hri.getTableName(), splitrow, endKey, false, rid);
}

SplitTransaction#createDaughters() {
	//创建类似 /hbase/unassigned/fad11edf1e6e0a842b7fd3ad87f25053
	//这样的节点,其中的编码后的region就是待split的region
	createNodeSplitting();
	//用于记录事务的处理进展
	this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK);
	
	//将这个节点作为事务节点,待任务处理完后会删除这个节点
	transitionNodeSplitting();
	
	//创建类似 /hbase/kvdb/fad11edf1e6e0a842b7fd3ad87f25053/.splits
	//的HDFS节点,用于临时处理split文件
	createSplitDir();	
	
	//关闭待处理的region
	List<StoreFile> hstoreFilesToSplit = this.parent.close(false);
	HRegionServer.removeFromOnlineRegions(this.parent.getRegionInfo().getEncodedName());
	splitStoreFiles(this.splitdir, hstoreFilesToSplit);
	this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
    HRegion a = createDaughterRegion(this.hri_a, this.parent.rsServices);
    this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
    HRegion b = createDaughterRegion(this.hri_b, this.parent.rsServices);
    
    //更新META表信息
	MetaEditor.offlineParentInMeta(server.getCatalogTracker(),
        this.parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo());
            
    //返回两个子region A和B
    return new PairOfSameType<HRegion>(a, b);
}

SplitTransaction#splitStoreFiles() {
	for (StoreFile sf: hstoreFilesToSplit) {
		//splitStoreFile(sf, splitdir);
      	StoreFileSplitter sfs = new StoreFileSplitter(sf, splitdir);
      	futures.add(threadPool.submit(sfs));
    }	
    //等待线程池中的任务执行完后返回
}

//开始分割文件
SplitTransaction$StoreFileSplitter#call() {
	splitStoreFile(sf, splitdir);
}

SplitTransaction#splitStoreFile() {
    FileSystem fs = this.parent.getFilesystem();
    byte [] family = sf.getFamily();
    String encoded = this.hri_a.getEncodedName();    
    //地址类似
    // /hbase/kvdb/fad11edf1e6e0a842b7fd3ad87f25053/.splits/1977310abc183fac9aba3dc626b01a2d
    //    /value/92e897822d804d3bb4805548e9a80bd2.fad11edf1e6e0a842b7fd3ad87f25053
    Path storedir = Store.getStoreHomedir(splitdir, encoded, family);    
    //这里会根据splitRow分别创建两个文件,一个是从最开始到splitRow
    //还有一个是从splitRow到文件最后
    //这里是直接调用HDFS的API写入到底层文件系统中的
    StoreFile.split(fs, storedir, sf, this.splitrow, Range.bottom);
    encoded = this.hri_b.getEncodedName();
    storedir = Store.getStoreHomedir(splitdir, encoded, family);
    StoreFile.split(fs, storedir, sf, this.splitrow, Range.top);		
}

//这里会根据传入的参数,是从开始到splitRow
//还是从splitRow到文件结束
//如果是从开始到splitRow,那么判断第一个key如果splitRow大则这个
//文件就不需要分割了,直接返回即可
StoreFile#split() {
	if (range == Reference.Range.bottom) {
		KeyValue splitKey = KeyValue.createLastOnRow(splitRow);
      	byte[] firstKey = f.createReader().getFirstKey();
      	if (f.getReader().getComparator().compare(splitKey.getBuffer(), 
          	splitKey.getKeyOffset(), splitKey.getKeyLength(), 
          	firstKey, 0, firstKey.length) < 0) {
        	return null;
      	}   		
	} else {
		KeyValue splitKey = KeyValue.createFirstOnRow(splitRow);
      	byte[] lastKey = f.createReader().getLastKey();      
      	if (f.getReader().getComparator().compare(splitKey.getBuffer(), 
          	splitKey.getKeyOffset(), splitKey.getKeyLength(), 
          	lastKey, 0, lastKey.length) > 0) {
        	return null;
      	}		
	}	
	Reference r = new Reference(splitRow, range);
	String parentRegionName = f.getPath().getParent().getParent().getName();
	Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
    return r.write(fs, p);
}

//创建一个HRegion
SplitTransaction#createDaughterRegion() {
    FileSystem fs = this.parent.getFilesystem();
    Path regionDir = getSplitDirForDaughter(this.parent.getFilesystem(),
      this.splitdir, hri);
    HRegion r = HRegion.newHRegion(this.parent.getTableDir(),
      this.parent.getLog(), fs, this.parent.getBaseConf(),
      hri, this.parent.getTableDesc(), rsServices);
    long halfParentReadRequestCount = this.parent.getReadRequestsCount() / 2;
    r.readRequestsCount.set(halfParentReadRequestCount);
    r.setOpMetricsReadRequestCount(halfParentReadRequestCount);
    long halfParentWriteRequest = this.parent.getWriteRequestsCount() / 2;
    r.writeRequestsCount.set(halfParentWriteRequest);
    r.setOpMetricsWriteRequestCount(halfParentWriteRequest);    
    HRegion.moveInitialFilesIntoPlace(fs, regionDir, r.getRegionDir());
    return r;	
}

//设置region的info:regioninfo列为下线状态
//再增加两个列info:splitA和info:splitB
MetaEditor#offlineParentInMeta() {
    HRegionInfo copyOfParent = new HRegionInfo(parent);
    copyOfParent.setOffline(true);
    copyOfParent.setSplit(true);
    Put put = new Put(copyOfParent.getRegionName());
    addRegionInfo(put, copyOfParent);
    put.add("info", "splitA",Writables.getBytes(a));
    put.add("info", "splitB",Writables.getBytes(b));
    putToMetaTable(catalogTracker, put);	
}



//这里的DaughterOpener是对HRegion的封装
//会在新线程中启动HRegion#open()
//之后会更新META表信息,之后META表在很短的时间内
//会同时存在父region信息(已下线)和两个子region信息
SplitTransaction#openDaughters() {
	DaughterOpener aOpener = new DaughterOpener(server, a);
    DaughterOpener bOpener = new DaughterOpener(server, b);
    aOpener.start();
    bOpener.start();
	aOpener.join();
	bOpener.join();    
	
	HRegionServer.postOpenDeployTasks(b, server.getCatalogTracker(), true);
	// Should add it to OnlineRegions
	HRegionServer.addToOnlineRegions(b);
	HRegionServer.postOpenDeployTasks(a, server.getCatalogTracker(), true);
	HRegionServer.addToOnlineRegions(a);	  	
}

//如果StoreFile超过一定数量了会执行compact
//然后更新ZK或者ROOT和META表
HRegionServer#postOpenDeployTasks() {
	for (Store s : r.getStores().values()) {
		if (s.hasReferences() || s.needsCompaction()) {
        	getCompactionRequester().requestCompaction(r, s, "Opening Region", null);
      	}
    }
    //更新ZK或者ROOT和META表
	if (r.getRegionInfo().isRootRegion()) {
      	RootLocationEditor.setRootLocation(getZooKeeper(),
       	this.serverNameFromMasterPOV);
    } else if (r.getRegionInfo().isMetaRegion()) {
      	MetaEditor.updateMetaLocation(ct, r.getRegionInfo(),
        this.serverNameFromMasterPOV);
    } else {
      	if (daughter) {
        	// If daughter of a split, update whole row, not just location.
        	MetaEditor.addDaughter(ct, r.getRegionInfo(),
          	this.serverNameFromMasterPOV);
      	} else {
        	MetaEditor.updateRegionLocation(ct, r.getRegionInfo(),
          	this.serverNameFromMasterPOV);
      	}
    }    	
}

//将ZK中 /hbase/unassigned 节点下的
//fad11edf1e6e0a842b7fd3ad87f25053(待处理的region)
//删除
SplitTransaction#transitionZKNode() {
	transitionNodeSplit();
	tickleNodeSplit();	
}

 

 

 

 

 

一些辅助逻辑:

//等待压缩完成,然后刷新数据
//最后再线程池中关闭所有的Store
HRegion#close() {
	waitForFlushesAndCompactions();
	internalFlushcache();
	ThreadPoolExecutor storeCloserThreadPool =
          getStoreOpenAndCloseThreadPool("StoreCloserThread-"
            + this.regionInfo.getRegionNameAsString());
	CompletionService<ImmutableList<StoreFile>> completionService =
          new ExecutorCompletionService<ImmutableList<StoreFile>>(
            storeCloserThreadPool);	
            
	for (final Store store : stores.values()) {
		completionService.submit(new Callable<ImmutableList<StoreFile>>() {
			public ImmutableList<StoreFile> call() throws IOException {
				return store.close();
			}
		});
	}            
}

//提交到线程池中关闭所有打开的StoreFile
Store#close() {
	for (final StoreFile f : result) {
		completionService.submit(new Callable<Void>() {
            public Void call() throws IOException {
              f.closeReader(true);
              return null;
		}
	}
}

 

 

 

 

 

compactionChecker线程

这个类是用于定期检查region server下的region是否需要做compact

主要逻辑如下:

//不停的遍历当前RegionServer下的所有Region
//然后检查是否需要做compact
CompactionChecker#chore() {
	for (HRegion r : this.instance.onlineRegions.values()) {
		for (Store s : r.getStores().values()) {
			if (s.needsCompaction()) {
				// Queue a compaction. Will recognize if major is needed.
              	this.instance.compactSplitThread.requestCompaction(r, s, getName());
            } else if (s.isMajorCompaction()) {
				if (majorCompactPriority == DEFAULT_PRIORITY
                || majorCompactPriority > r.getCompactPriority()) {
                	this.instance.compactSplitThread.requestCompaction(r, s, getName());
                } else {
                	this.instance.compactSplitThread.requestCompaction(r, s, getName());	
                }
            }
		}	
	}	
}

 

 

 

 

 

参考

深入分析HBase Compaction机制

Hbase的Region Compact算法实现分析

深入分析HBase RPC(Protobuf)实现机制

HBase region split源码分析

 

 

  • 大小: 37.4 KB
  • 大小: 43.4 KB
  • 大小: 84.3 KB
  • 大小: 33.2 KB
  • 大小: 101.9 KB
  • 大小: 41.7 KB
  • 大小: 46.1 KB
  • 大小: 82.7 KB
分享到:
评论

相关推荐

    hbase-sdk是基于hbase-client和hbase-thrift的原生API封装的一款轻量级的HBase ORM框架

    hbase-sdk是基于hbase-client和hbase-thrift的原生API封装的一款轻量级的HBase ORM框架。 针对HBase各版本API(1.x~2.x)间的差异,在其上剥离出了一层统一的抽象。并提供了以类SQL的方式来读写HBase表中的数据。对...

    hbase-meta-repair-hbase-2.0.2.jar

    HBase 元数据修复工具包。 ①修改 jar 包中的application.properties,重点是 zookeeper.address、zookeeper.nodeParent、hdfs....③开始修复 `java -jar -Drepair.tableName=表名 hbase-meta-repair-hbase-2.0.2.jar`

    HBase(hbase-2.4.9-bin.tar.gz)

    HBase(hbase-2.4.9-bin.tar.gz)是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File System...

    hbase的hbase-1.2.0-cdh5.14.2.tar.gz资源包

    hbase的hbase-1.2.0-cdh5.14.2.tar.gz资源包

    hbase-client-2.1.0-cdh6.3.0.jar

    hbase-client-2.1.0-cdh6.3.0.jar

    phoenix-hbase-2.2-5.1.2-bin.tar.gz

    phoenix-hbase-2.2-5.1.2-bin.tar.gz

    hive-hbase-handler-1.2.1.jar

    被编译的hive-hbase-handler-1.2.1.jar,用于在Hive中创建关联HBase表的jar,解决创建Hive关联HBase时报FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org.apache.hadoop....

    hbase-1.2.1-bin.tar.gz.zip

    hbase-1.2.1-bin.tar.gz.zip 提示:先解压再使用,最外层是zip压缩文件

    phoenix-client-hbase-2.2-5.1.2.jar

    phoenix-client-hbase-2.2-5.1.2.jar

    hbase-1.2.6.1-bin.tar.gz

    hbase-1.2.6.1-bin.tar.gz,hbase-1.2.6.1-bin.tar.gz,hbase-1.2.6.1-bin.tar.gz,hbase-1.2.6.1-bin.tar.gz,hbase-1.2.6.1-bin.tar.gz,hbase-1.2.6.1-bin.tar.gz,hbase-1.2.6.1-bin.tar.gz,hbase-1.2.6.1-bin.tar.gz

    hbase-hadoop-compat-1.1.3-API文档-中文版.zip

    赠送jar包:hbase-hadoop-compat-1.1.3.jar; 赠送原API文档:hbase-hadoop-compat-1.1.3-javadoc.jar; 赠送源代码:hbase-hadoop-compat-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop-compat-1.1.3....

    hbase-2.0.2-bin.tar

    hbase-2.0.2-bin.tar

    hbase-prefix-tree-1.1.3-API文档-中文版.zip

    赠送jar包:hbase-prefix-tree-1.1.3.jar; 赠送原API文档:hbase-prefix-tree-1.1.3-javadoc.jar; 赠送源代码:hbase-prefix-tree-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-prefix-tree-1.1.3.pom; ...

    hbase-metrics-api-1.4.3-API文档-中文版.zip

    赠送jar包:hbase-metrics-api-1.4.3.jar; 赠送原API文档:hbase-metrics-api-1.4.3-javadoc.jar; 赠送源代码:hbase-metrics-api-1.4.3-sources.jar; 赠送Maven依赖信息文件:hbase-metrics-api-1.4.3.pom; ...

    hbase-annotations-1.1.2-API文档-中文版.zip

    赠送jar包:hbase-annotations-1.1.2.jar; 赠送原API文档:hbase-annotations-1.1.2-javadoc.jar; 赠送源代码:hbase-annotations-1.1.2-sources.jar; 赠送Maven依赖信息文件:hbase-annotations-1.1.2.pom; ...

    hbase-client-1.4.3-API文档-中文版.zip

    赠送jar包:hbase-client-1.4.3.jar; 赠送原API文档:hbase-client-1.4.3-javadoc.jar; 赠送源代码:hbase-client-1.4.3-sources.jar; 赠送Maven依赖信息文件:hbase-client-1.4.3.pom; 包含翻译后的API文档:...

    hbase-server-1.4.3-API文档-中文版.zip

    赠送jar包:hbase-server-1.4.3.jar; 赠送原API文档:hbase-server-1.4.3-javadoc.jar; 赠送源代码:hbase-server-1.4.3-sources.jar; 赠送Maven依赖信息文件:hbase-server-1.4.3.pom; 包含翻译后的API文档:...

    hbase-common-1.2.12-API文档-中英对照版.zip

    赠送jar包:hbase-common-1.2.12.jar; 赠送原API文档:hbase-common-1.2.12-javadoc.jar; 赠送源代码:hbase-common-1.2.12-sources.jar; 赠送Maven依赖信息文件:hbase-common-1.2.12.pom; 包含翻译后的API文档...

    hbase-common-1.4.3-API文档-中文版.zip

    赠送jar包:hbase-common-1.4.3.jar; 赠送原API文档:hbase-common-1.4.3-javadoc.jar; 赠送源代码:hbase-common-1.4.3-sources.jar; 赠送Maven依赖信息文件:hbase-common-1.4.3.pom; 包含翻译后的API文档:...

Global site tag (gtag.js) - Google Analytics