compact处理流程分析
compact的处理与split相同,由client端与flush时检查发起。
针对compact还有一个在rs生成时生成的CompactionChecker线程定期去检查是否需要做compact操作
线程执行的间隔时间通过hbase.server.thread.wakefrequency配置,默认为10*1000ms
CompactionChecker线程主要作用:
生成通过hbase.server.thread.wakefrequency(10*1000ms)配置的定期检查region是否需要compact的检查线程,
如果需要进行compact,会在此处通过compact的线程触发compcat的请求
此实例中通过hbase.server.thread.wakefrequency(10*1000ms)配置major compact的优先级,
如果major compact的优先级大过此值,把compact的优先级设置为此值.
Store中通过hbase.server.compactchecker.interval.multiplier配置多少时间需要进行compact检查的间隔
默认为1000ms,
compactionChecker的检查周期为wakefrequency*multiplier ms,
也就是默认情况下线程调用1000次执行一次compact检查
a.compaction检查时发起compact的条件是
如果一个store中所有的file个数减去在做(或发起compact请求)的个数,大于或等于
hbase.hstore.compaction.min配置的值,
老版本使用hbase.hstore.compactionThreshold进行配置,默认值为3
b.major compact的条件检查
通过hbase.hregion.majorcompaction配置major的检查周期,default=1000*60*60*24
通过hbase.hregion.majorcompaction.jitter配置major的浮动时间,默认为0.2,
也就是major的时间上下浮动4.8小时
b2.检查(当前时间-major配置时间>store最小的文件生成时间)表示需要major,
b2.1>store下是否只有一个文件,同时这个文件已经到了major的时间,
b2.1>检查ttl时间是否达到(intager.max表示没配置),达到ttl时间需要major,否则不做
b2.2>文件个数大于1,到达major的时间,需要major
Client端发起compactRegion的request
Client通过HBaseAdmin.compact发起regionserver的rpc连接,调用regionserver.compactRegion
如果传入的是tablename而不是regionname,会迭代出此table的所有region调用HRegionServer.compactRegion
由client发起,调用HRegionServer.compactRegion
public CompactRegionResponse compactRegion(final RpcController controller,
final CompactRegionRequest request) throws ServiceException {
try {
checkOpen();
requestCount.increment();
从onlineRegions中得到request的Hregion实例
HRegion region = getRegion(request.getRegion());
region.startRegionOperation(Operation.COMPACT_REGION);
LOG.info("Compacting " + region.getRegionNameAsString());
booleanmajor = false;
byte [] family = null;
Storestore = null;
如果client发起的request中传入有columnfamily的值,得到此cf的HStore
if (request.hasFamily()) {
family = request.getFamily().toByteArray();
store = region.getStore(family);
if (store == null) {
thrownewServiceException(newIOException("column family " + Bytes.toString(family) +
" does not exist in region " + region.getRegionNameAsString()));
}
}
检查是否是major的compact请求
if (request.hasMajor()) {
major = request.getMajor();
}
如果是发起majorcompaction的操作,
if (major) {
if (family != null) {
store.triggerMajorCompaction();
} else {
region.triggerMajorCompaction();
}
}
String familyLogMsg = (family != null)?" for column family: " + Bytes.toString(family):"";
LOG.trace("User-triggered compaction requested for region " +
region.getRegionNameAsString() + familyLogMsg);
String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
否则是一般compation的请求,通过compactsplitThread.requestCompaction发起compact request
if(family != null) {
compactSplitThread.requestCompaction(region, store, log,
Store.PRIORITY_USER, null);
} else {
compactSplitThread.requestCompaction(region, log,
Store.PRIORITY_USER, null);
}
return CompactRegionResponse.newBuilder().build();
} catch (IOException ie) {
thrownewServiceException(ie);
}
}
非major的compact处理流程
requestCompaction不管是直接传入sotre或者是region的传入,
如果传入的是region,那么会拿到region下的所有store,迭代调用每一个store的compaction request操作。
所有的非major compaction request最终会通过如下方法发起compaction request
private synchronized CompactionRequest requestCompactionInternal(final HRegion r,
final Store s,
final String why, intpriority, CompactionRequest request, booleanselectNow)
针对store的compaction request处理流程
如果要对一个HBASE的表禁用掉compaction操作,可以通过create table时配置COMPACTION_ENABLED属性
private synchronized CompactionRequest requestCompactionInternal(final HRegion r, final Store s,
final String why, intpriority, CompactionRequest request, booleanselectNow)
throws IOException {
if (this.server.isStopped()
|| (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
returnnull;
}
CompactionContextcompaction = null;
此时的调用selectNow为true,(如果是系统调用,此时的selectNow为false,)
也就是在发起request到CompactSplitThread.CompactionRunner线程执行时,
如果是系统调用,传入的CompactionContext的实例为null,否则是用户发起的调用在这个地方得到compaction实例
if (selectNow) {
通过HStore.requestCompaction得到一个compactionContext,计算要进行compact的storefile
并设置其request.priority为Store.PRIORITY_USER表示用户发起的request
如果是flush时发起的compact,
并设置其request.priority为hbase.hstore.blockingStoreFiles配置的值减去storefile的个数,
表示系统发起的request,
如果hbase.hstore.blockingStoreFiles配置的值减去storefile的个数==PRIORITY_USER
那么priority的值为PRIORITY_USER+1
compaction = selectCompaction(r, s, priority, request);
if (compaction == null) returnnull; // message logged inside
}
// We assume that most compactions are small. So, put system compactions into small
// pool; we will do selection there, and move to large pool if necessary.
longsize = selectNow ? compaction.getRequest().getSize() : 0;
此时好像一直就得不到largeCompactions的实例(在system时通过CompactionRunner线程检查),
因为selectNow==false时,size的大小为0
不可能大于hbase.regionserver.thread.compaction.throttle配置的值
此配置的默认值是hbase.hstore.compaction.max*2*memstoresize
ThreadPoolExecutor pool = (!selectNow && s.throttleCompaction(size))
? largeCompactions : smallCompactions;
通过smallCompactions的线程池生成CompactionRunner线程并执行,见执行Compaction的处理线程
pool.execute(newCompactionRunner(s, r, compaction, pool));
if (LOG.isDebugEnabled()) {
String type = (pool == smallCompactions) ? "Small " : "Large ";
LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
+ (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
}
returnselectNow ? compaction.getRequest() : null;
}
生成CompactionRequest实例
Hstore.requestcompaction得到要进行compact的storefile,并生成一个CompactionContext
public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
throws IOException {
// don't even select for compaction if writes are disabled
if (!this.areWritesEnabled()) {
returnnull;
}
生成一个DefaultStoreEngine.DefaultCompactionContext实例(如果storeEngine是默认的配置)
CompactionContextcompaction = storeEngine.createCompaction();
this.lock.readLock().lock();
try {
synchronized (filesCompacting) {
// First, see if coprocessor would want to override selection.
if (this.getCoprocessorHost() != null) {
List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
booleanoverride = this.getCoprocessorHost().preCompactSelection(
this, candidatesForCoproc, baseRequest);
if (override) {
// Coprocessor is overriding normal file selection.
compaction.forceSelect(newCompactionRequest(candidatesForCoproc));
}
}
// Normal case - coprocessor is not overriding file selection.
if (!compaction.hasSelection()) {
如果是client端发起的compact,此时的值为true,如果是flush时发起的compact,此时的值为false
booleanisUserCompaction = priority == Store.PRIORITY_USER;
offPeakHours的值说明:
1.通过hbase.offpeak.start.hour配置major的启动开始小时,如配置为1
2.通过hbase.offpeak.end.hour配置major的启动结束小时,如配置为2
如果启动时间是1与2配置的小时时间内,那么配置有这两个值后,
主要用来检查compact的文件的大小是否超过hbase.hstore.compaction.max配置的值,默认为10,
减去1个文件的总和的多少倍,
如:有10个待做compact的文件,第一个文件(i=0)的size是=i+max(10)-1=9,
以上表示第一个文件的size超过了后面9个文件总size的大小的多少倍,如果超过了倍数,不做compact
如果1与2配置为不等于-1,同时start小于end,当前做compact的时间刚好在此时间内,
多少倍这个值通过hbase.hstore.compaction.ratio.offpeak配置得到,默认为5.0f
否则通过hbase.hstore.compaction.ratio配置得到,默认为1.2f
booleanmayUseOffPeak = offPeakHours.isOffPeakHour() &&
offPeakCompactionTracker.compareAndSet(false, true);
try {
调用DefaultStoreEngine.DefaultCompactionContext实例的select方法,返回true/false,
对compaction.select的具体分析说明可参见major的compact处理流程
true表示有compactrequest,否则表示没有compactrequest
此方法最终调用RatioBasedCompactionPolicy.selectCompaction方法,
生成CompactRequest并放入到DefaultStoreEngine.DefaultCompactionContext的request属性中
得到要compact的storefile列表,放入到HStore.filesCompacting列表中
方法传入的forceMajor实例只有在发起major compact时同时fileCompacting列表中没有值时,此值为true,
其它情况值都为false.就是最后一个参数的值为false
a.在compaction.select方法中得到此store中所有的storefile列表,
传入到compactionPolicy.selectCompaction方法中。
RatioBasedCompactionPolicy.selectCompaction方法处理流程:
1.检查所有的storefile的个数减去正在做compact的storefile文件个数
是否大于hbase.hstore.blockingStoreFiles配置的值,默认为7,
比对方法:
a.如果filesCompacting(正在做compact的storefile列表)不为空
那么storefiles的个数减去正在做compact的storefile文件个数加1是否大于blockingStoreFiles配置的值
b.如果filesCompacting(正在做compact的storefile列表)为空
那么storefiles的个数减去正在做compact的storefile文件个数是否大于blockingStoreFiles配置的值
2.从所有的storefile列表中移出正在做compcat的storefile列表(fileCompacting列表中的数据)
得到还没做(可选的)compact的storefiles列表
3.如果columnfamily配置中的MIN_VERSIONS的值没有配置(=0),
得到TTL配置的值(不配置=Integer.MAX_VALUE=-1)配置的值为秒为单位,否则得到Long.MAX_VALUE
4.检查如果hbase.store.delete.expired.storefile配置的值为true(default=true),同时ttl非默认值
从2中得到的storefile列表中得到ttl超时的所有storefile列表。
4.1如果有ttl过期的storefile,生成这些storefile的CompactionRequest请求并返回
4.2如果没有ttl过期的storefile,(控制大文件先不做小的compact)
把storefile列表中size超过hbase.hstore.compaction.max.size配置的storefile移出,默认为Long.MAX_VALUE
5.检查storefile是否需要做major compact操作,
5.1得到通过hbase.hregion.majorcompaction配置的值默认为1000*60*60*24*7
5.2得到通过hbase.hregion.majorcompaction.jitter配置的值,默认为0.5f
5.3检查storefile中最先更新的storefile的更新时间是否在5.1与5.2配置的时间内(默认是3.5天到7天之间)
如果配置为24小时,那么执行时间的加减为4.8个小时
5.4如果还没有超过配置的时间,表示不需要发做major compact,
5.5如果在时间范围内或超过此配置的时间,表示需要做major compact,
a.同时如果只有一个storefile此storefile的最小更新时间已经超过了ttl的配置时间,需要做major compact
b.如果有多个storefile文件,表示需要做major compat.
6.检查是否需要做compact还有一个条件,在5成立的条件下,
如果当前要做compact的storefile的个数小于hbase.hstore.compaction.max配置的值,默认10,
-
5与6的检查条件都成立,或者此region (有个split操作,有References文件),,表示升级为major的compact
-
如果没有升级成major的compact,把storefile列表中的bluk load的file移出
-
计算出最大的几个storefile,也就是file size的值是后面几个文件的size的多少倍,
把超过倍数的storefile移出,不做compact
可以看上面对offPeakHours的值说明:
10. 如果现在还有需要做compcat的storefile列表,检查文件个数是否达到最小compact的配置的值,
通过hbase.hstore.compaction.min配置,默认为3,老版本通过hbase.hstore.compactionThreshold配置
如果没有达到最小的配置值,不做compact
11.如果没有升级到major,把超过hbase.hstore.compaction.max配置的storefile移出列表。默认配置为10
12.生成并返回一个CompactionRequest的实例。如果非major,同时在offPeakHours的值说明的时间内,
把CompactionRequest的isOffPeak设置为true,否则设置为false(major)
compaction.select(this.filesCompacting, isUserCompaction,
mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
} catch (IOException e) {
if (mayUseOffPeak) {
offPeakCompactionTracker.set(false);
}
throwe;
}
assertcompaction.hasSelection();
if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {
// Compaction policy doesn't want to take advantage of off-peak.
offPeakCompactionTracker.set(false);
}
}
if (this.getCoprocessorHost() != null) {
this.getCoprocessorHost().postCompactSelection(
this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest);
}
// Selected files; see if we have a compaction with some custom base request.
if (baseRequest != null) {
// Update the request with what the system thinks the request should be;
// its up to the request if it wants to listen.
compaction.forceSelect(
baseRequest.combineWith(compaction.getRequest()));
}
// Finally, we have the resulting files list. Check if we have any files at all.
finalCollection<StoreFile> selectedFiles = compaction.getRequest().getFiles();
if (selectedFiles.isEmpty()) {
returnnull;
}
// Update filesCompacting (check that we do not try to compact the same StoreFile twice).
if (!Collections.disjoint(filesCompacting, selectedFiles)) {
Preconditions.checkArgument(false, "%s overlaps with %s",
selectedFiles, filesCompacting);
}
把当前要执行compact的storefile列表添加到HStore.filesCompacting中。
filesCompacting.addAll(selectedFiles);
通过storefile的seqid按从小到大排序
Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
// If we're enqueuing a major, clear the force flag.
如果当前要做compact的文件个数等待当前sotre中所有的storefile个数,把当前的compact提升为major
booleanisMajor = selectedFiles.size() == this.getStorefilesCount();
this.forceMajor = this.forceMajor && !isMajor;
// Set common request properties.
// Set priority, either override value supplied by caller or from store.
compaction.getRequest().setPriority(
(priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
compaction.getRequest().setIsMajor(isMajor);
compaction.getRequest().setDescription(
getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
}
} finally {
this.lock.readLock().unlock();
}
LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName() + ": Initiating "
+ (compaction.getRequest().isMajor() ? "major" : "minor") + " compaction");
this.region.reportCompactionRequestStart(compaction.getRequest().isMajor());
returncompaction;
}
执行Compaction的处理流程
在compact执行时是通过指定的线程池生成并执行CompactSplitThread.CompactionRunner线程
以下是线程执行的具体说明:
public void run() {
Preconditions.checkNotNull(server);
if (server.isStopped()
|| (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
return;
}
// Common case - system compaction without a file selection. Select now.
如果compaction==null表示是systemcompact非用户发起的compaction得到一个compactionContext
if (this.compaction == null) {
queuedPriority的值在此线程实例生成时默认是hbase.hstore.blockingStoreFiles配置的值减去storefile的个数
如果相减的值是1时返回2,否则返回相减的值
int oldPriority = this.queuedPriority;
重新拿到hbase.hstore.blockingStoreFiles配置的值减去storefile的个数的值,
this.queuedPriority = this.store.getCompactPriority();
如果这次拿到的值比上次的值要大,表示有storefile被删除(基本上是有compact完成)
if (this.queuedPriority > oldPriority) {
// Store priority decreased while we were in queue (due to some other compaction?),
// requeue with new priority to avoid blocking potential higher priorities.
结束本次线程调用,发起一个新的线程调用,用最新的priority
this.parent.execute(this);
return;
}
try {
通过HStore.requestCompaction得到一个compactionContext,计算要进行compact的storefile
并设置其request.priority为hbase.hstore.blockingStoreFiles配置的值减去storefile的个数,
表示系统发起的request,
如果hbase.hstore.blockingStoreFiles配置的值减去storefile的个数==PRIORITY_USER
那么priority的值为PRIORITY_USER+1
如果是client时发起的compact,此处会设置其request.priority为Store.PRIORITY_USER表示是用户发起的request
this.compaction = selectCompaction(this.region, this.store, queuedPriority, null);
} catch (IOException ex) {
LOG.error("Compaction selection failed " + this, ex);
server.checkFileSystem();
return;
}
if (this.compaction == null) return; // nothing to do
// Now see if we are in correct pool for the size; if not, go to the correct one.
// We might end up waiting for a while, so cancel the selection.
assertthis.compaction.hasSelection();
此处检查上面提到没用的地方:
compaction.getRequest().getSize()的大小为所有当此要做compact的storefile的总大小
检查是否大于hbase.regionserver.thread.compaction.throttle配置的值
此配置的默认值是hbase.hstore.compaction.max*2*memstoresize
如果大于指定的值,使用 largeCompactions,否则使用 smallCompactions
ThreadPoolExecutor pool = store.throttleCompaction(
compaction.getRequest().getSize()) ? largeCompactions : smallCompactions;
如果发现当前重新生成的执行线程池不是上次选择的线程池,结束compaction操作,
并重新通过新的线程池执行当前线程,结束当前线程的调用执行
if (this.parent != pool) {
this.store.cancelRequestedCompaction(this.compaction);
this.compaction = null;
this.parent = pool;
this.parent.execute(this);
return;
}
}
// Finally we can compact something.
assertthis.compaction != null;
this.compaction.getRequest().beforeExecute();
try {
// Note: please don't put single-compaction logic here;
// put it into region/store/etc. This is CST logic.
longstart = EnvironmentEdgeManager.currentTimeMillis();
调用HRegion.compact方法,此方法调用HStore.compact方法,把CompactionContext传入
此方法调用返回compact是否成功,如果成功返回true,否则返回false
booleancompleted = region.compact(compaction, store);
longnow = EnvironmentEdgeManager.currentTimeMillis();
LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
this + "; duration=" + StringUtils.formatTimeDiff(now, start));
if (completed) {
检查此时的storefile个数是否还大于hbase.hstore.blockingStoreFiles配置的值,默认为7,
如要大于或等于此时返回的值为小于或等于0的值,表示还需要进行compact操作,重新再发起一次compact的request
// degenerate case: blocked regions require recursive enqueues
if (store.getCompactPriority() <= 0) {
requestSystemCompaction(region, store, "Recursive enqueue");
} else {
此时表示compact操作完成后,storefile的个数在配置的范围内,不需要在做compact,
检查是否需要split,如果需要发起split操作。
Split的发起条件:
a.splitlimit,hbase.regionserver.regionSplitLimit配置的值大于当前rs中的all onlineregions
默认为integer.maxvalue
b.a检查通过的同时hbase.hstore.blockingStoreFiles配置的值减去storefile的个数
大于等于Store.PRIORITY_USER(1)
c.非meta与namespace表,同时其它条件见split的分析部分
// see if the compaction has caused us to exceed max region size
requestSplit(region);
}
}
} catch (IOException ex) {
IOException remoteEx = RemoteExceptionHandler.checkIOException(ex);
LOG.error("Compaction failed " + this, remoteEx);
if (remoteEx != ex) {
LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
}
server.checkFileSystem();
} catch (Exception ex) {
LOG.error("Compaction failed " + this, ex);
server.checkFileSystem();
} finally {
LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
}
this.compaction.getRequest().afterExecute();
}
Hstore.compact方法流程:
public List<StoreFile> compact(CompactionContext compaction) throws IOException {
assertcompaction != null && compaction.hasSelection();
CompactionRequest cr = compaction.getRequest();
得到要做compact的storefile列表
Collection<StoreFile> filesToCompact = cr.getFiles();
assert !filesToCompact.isEmpty();
synchronized (filesCompacting) {
// sanity check: we're compacting files that this store knows about
// TODO: change this to LOG.error() after more debugging
Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
}
// Ready to go. Have list of files to compact.
LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
+ this + " of " + this.getRegionInfo().getRegionNameAsString()
+ " into tmpdir=" + fs.getTempDir() + ", totalSize="
+ StringUtils.humanReadableInt(cr.getSize()));
longcompactionStartTime = EnvironmentEdgeManager.currentTimeMillis();
List<StoreFile> sfs = null;
try {
执行compact操作,把所有的storefile全并成一个storefile,放入到store/.tmp目录下
通过DefaultCompactor.compact操作,把原有的所有storefile生成一个StoreFileScanner列表,
并生成一个StoreScanner把StoreFileScanner列表加入,
如果compact提升成了major,ScanType=COMPACT_DROP_DELETES,否则等于COMPACT_RETAIN_DELETES
针对compact的数据scan可参见后期分析的scan流程
// Commence the compaction.
List<Path> newFiles = compaction.compact();
如果hbase.hstore.compaction.complete 设置为false,检查storefile生成是否可用
// TODO: get rid of this!
if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
LOG.warn("hbase.hstore.compaction.complete is set to false");
sfs = newArrayList<StoreFile>();
for (Path newFile : newFiles) {
// Create storefile around what we wrote with a reader on it.
StoreFile sf = createStoreFileAndReader(newFile);
sf.closeReader(true);
sfs.add(sf);
}
returnsfs;
}
把生成的新的storefile添加到cf的目录下。并返回生成后的storefile,此storefile已经生成好reader
// Do the steps necessary to complete the compaction.
sfs = moveCompatedFilesIntoPlace(cr, newFiles);
生成一个compaction的说明信息,写入到wal日志中
writeCompactionWalRecord(filesToCompact, sfs);
把原有的storefile列表中store中的storefiles列表中移出,
并把新的storefile添加到storefiles列表中,对storefiles列表重新排序,通过storefile.seqid
storefiles列表是scan操作时对store中的查询用的storefile与reader
从HStore.filesCompacting列表中移出完成compact的storefiles列表
replaceStoreFiles(filesToCompact, sfs);
从hdfs中此store下移出compact完成的storefile文件列表。
// At this point the store will use new files for all new scanners.
completeCompaction(filesToCompact); // Archive old files & update store size.
} finally {
从HStore.filesCompacting列表中移出完成compact的storefiles列表,如果compact完成此时没有要移出的文件
如果compact失败,此时把没有compact的文件移出
finishCompactionRequest(cr);
}
logCompactionEndMessage(cr, sfs, compactionStartTime);
returnsfs;
}
major的compact处理流程
majorCompaction不管是直接传入sotre或者是region的传入,
如果传入的是region,那么会拿到region下的所有store,迭代调用每一个store的triggerMajorCompaction操作。
Hstore.triggerMajorCompaction操作流程:设置store中的forcemajor的值为true
public void triggerMajorCompaction() {
this.forceMajor = true;
}
设置完成forceMajor的值后,最终还是直接触发requestCompaction方法
if(family != null) {
compactSplitThread.requestCompaction(region, store, log,
Store.PRIORITY_USER, null);
} else {
compactSplitThread.requestCompaction(region, log,
Store.PRIORITY_USER, null);
}
requestCompaction的处理流程大至与非major的coompact处理流程无区别:
CompactSplitThread.requestCompaction-->requestCompactionInternal-->selectCompaction
-->Hstore.requestCompaction(priority, request)得到compactionContext
代码细节如下所示:
是否是用户发起的compaction操作
booleanisUserCompaction = priority == Store.PRIORITY_USER;
以下代码返回为true的条件:
a.hbase.offpeak.start.hour的值不等于-1(0-23之间的值)
b.hbase.offpeak.end.hour的值不等-1(0-23之间的值),同时此值大于a配置的值
c.当前时间的小时部分在a与b配置的时间之间
否则返回的值为false
booleanmayUseOffPeak = offPeakHours.isOffPeakHour() &&
offPeakCompactionTracker.compareAndSet(false, true);
try {
此时最后一个参数为true(在没有其它的compact操作的情况下,同时指定的compact模式为major),
compaction.select(this.filesCompacting, isUserCompaction,
mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
} catch (IOException e) {
if (mayUseOffPeak) {
offPeakCompactionTracker.set(false);
}
throwe;
}
以上代码的中的compaction.select默认调用为DefaultStoreEngine.DefaultCompactionContext.select方法
publicbooleanselect(List<StoreFile> filesCompacting, booleanisUserCompaction,
booleanmayUseOffPeak, booleanforceMajor) throws IOException {
调用RatioBasedCompactionPolicy.selectCompaction得到一个CompactionRequest,
并把此request设置到当前compaction实例的request属性中
request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(),
filesCompacting, isUserCompaction, mayUseOffPeak, forceMajor);
returnrequest != null;
}
RatioBasedCompactionPolicy.selectCompaction处理流程说明:
public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
finalList<StoreFile> filesCompacting, finalbooleanisUserCompaction,
finalbooleanmayUseOffPeak, finalbooleanforceMajor) throws IOException {
// Preliminary compaction subject to filters
ArrayList<StoreFile> candidateSelection = newArrayList<StoreFile>(candidateFiles);
// Stuck and not compacting enough (estimate). It is not guaranteed that we will be
// able to compact more if stuck and compacting, because ratio policy excludes some
// non-compacting files from consideration during compaction (see getCurrentEligibleFiles).
intfutureFiles = filesCompacting.isEmpty() ? 0 : 1;
此store下所有的storefile的个数减去当前已经在做compact的个数是否大于blockingfile的配置个数
blockingfile通过hbase.hstore.blockingStoreFiles配置,默认为7
booleanmayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)
>= storeConfigInfo.getBlockingFileCount();
得到可选择的storefile,也就是得到所有的storefile中不包含正在做compact的sotrefile的列表
candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);
LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " +
filesCompacting.size() + " compacting, " + candidateSelection.size() +
" eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking");
得到配置的ttl过期时间,通过在cf的表属性中配置TTL属性,
如果配置值为Integer.MAX_VALUE或者-1或者不配置,表示不控制ttl,
TTL属性生效的前提是MIN_VERSIONS属性不配置,TTL属性配置单位为秒
如果以上条件检查通过表示有配置ttl,返回ttl的配置时间,否则返回Long.maxvalue
longcfTtl = this.storeConfigInfo.getStoreFileTtl();
如果不是发起的major操作,
同时配置有ttl过期时间,同时hbase.store.delete.expired.storefile配置的值为true,默认为true,
同时ttl属性有配置,
得到当前未做compact操作的所有sotrefile中ttl过期的storefile,
如果有ttl过期的storefile文件,生成CompactionRequest实例,并结束此流程处理
if (!forceMajor) {
// If there are expired files, only select them so that compaction deletes them
if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) {
ArrayList<StoreFile> expiredSelection = selectExpiredStoreFiles(
candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl);
if (expiredSelection != null) {
returnnewCompactionRequest(expiredSelection);
}
}
如果非major把storefile中非reference(split后的文件为reference文件)的storefile文件,
同时storefile的大小超过了hbase.hstore.compaction.max.size配置的最大storefile文件大小限制
移出这些文件
candidateSelection = skipLargeFiles(candidateSelection);
}
// Force a major compaction if this is a user-requested major compaction,
// or if we do not have too many files to compact and this was requested
// as a major compaction.
// Or, if there are any references among the candidates.
此处检查major的条件包含以下几个:
(forceMajor && isUserCompaction)
a.如果是用户发起的compaction,同时用户发起的compaction是major的compact,
同时store中没有其它正在做compact的storefile,此值为true
((forceMajor || isMajorCompaction(candidateSelection))
&& (candidateSelection.size() < comConf.getMaxFilesToCompact()))
b.检查上面看到代码的3个条件,第一个(b1)与第二个(b2)为一个通过就行,第三个(b3)必须通过
forceMajor
b1.如果是发起的compaction,同时store中没有其它正在做compact的storefile
isMajorCompaction(candidateSelection)
b2.或者以下几个条件检查通过:
b2.1.可选的storefile列表中修改时间最老的一个storefile的时间达到了间隔的major compact时间
b2.2.如果可选的storefile列表中只有一个storefile,同时此storefile的最老的一条数据的时间已经达到ttl时间
同时此storefile的时间达到了间隔的major时间间隔
b2.3.如果可选的storefile列表中有多少storefile,同时更新时间最老的一个storefile达到了major的时间间隔
b2.4.也就是storefile列表中最老的更新时间的一个storefile的时间达到了间隔的major时间,
但是可选的storefile个数只有一个,同时此storefile已经做过major(StoreFile.majorCompaction==true)
同时ttl时间没有配置或者ttl还没有过期那么此时这个storefile是不做major compact
通过hbase.hregion.majorcompaction配置major的间隔时间,
通过hbase.hregion.majorcompaction.jitter配置major的间隔的左右差
如:major的配置时间为24小时,同时间隔的左右差是0.2f,那么default = 20% = +/- 4.8 hrs
(candidateSelection.size() < comConf.getMaxFilesToCompact())
b3.可选的storefile列表的个数小于compactmaxfiles的配置个数,
通过hbase.hstore.compaction.max配置,默认值为10
StoreUtils.hasReferences(candidateSelection)
c.如果storefile列表中包含有reference(split后的文件为reference文件)的storefile
booleanmajorCompaction = (
(forceMajor && isUserCompaction)
|| ((forceMajor || isMajorCompaction(candidateSelection))
&& (candidateSelection.size() < comConf.getMaxFilesToCompact()))
|| StoreUtils.hasReferences(candidateSelection)
);
如果是非major的compact
if (!majorCompaction) {
// we're doing a minor compaction, let's see what files are applicable
从可选的storefile列表中移出是bulk load的storefile
candidateSelection = filterBulk(candidateSelection);
如果可选的storefile列表中的个数大于或等于hbase.hstore.compaction.max配置的值,
移出可选的storefile列表中最大的几个storefile,
通过如下说明来计算什么文件算是较大的storefile:
a.storefile的文件大小是后面几个文件的总和的多少倍数,倍数的说明在如下几行中进行了说明,
1.通过hbase.offpeak.start.hour配置major的启动开始小时,如配置为1
2.通过hbase.offpeak.end.hour配置major的启动结束小时,如配置为2
如果启动时间是1与2配置的小时时间内,那么配置有这两个值后,
主要用来检查compact的文件的大小是否超过hbase.hstore.compaction.max配置的值,默认为10,
减去1个文件的总和的多少倍,
如:有10个待做compact的文件,第一个文件(i=0)的size是=i+max(10)-1=9,
以上表示第一个文件的size超过了后面9个文件总size的大小的多少倍,如果超过了倍数,不做compact
如果1与2配置为不等于-1,同时start小于end,当前做compact的时间刚好在此时间内,
多少倍这个值通过hbase.hstore.compaction.ratio.offpeak配置得到,默认为5.0f
否则通过hbase.hstore.compaction.ratio配置得到,默认为1.2f
b.storefile的大小必须是大于hbase.hstore.compaction.min.size配置的值,默认是memstore的大小
c.如果现在所有的storefile的个数减去正在做compact的storefile个数大于
通过hbase.hstore.blockingStoreFiles配置的值,默认为7,移出最大的几个storefile,
只保留通过hbase.hstore.compaction.min配置的个数,默认为3(配置不能小于2)
老版本通过hbase.hstore.compactionThreshold配置
candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
检查可选的能做compact的文件个数是否达到最少文件要求,如果没有达到,清空所有可选的storefile列表值
candidateSelection = checkMinFilesCriteria(candidateSelection);
}
如果不是用户发起的major的compact,移出可选的storefile列表中超过hbase.hstore.compaction.max配置的个数
candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
生成CompactionRequest实例
CompactionRequest result = newCompactionRequest(candidateSelection);
如果非major同时offpeak有配置,同时当前时间在配置的时间范围内,设置CompactionRequest的offpeak为true
表示当前时间是非高峰时间内
result.setOffPeak(!candidateSelection.isEmpty() && !majorCompaction && mayUseOffPeak);
returnresult;
}
执行compaction的具体处理,见非major的compaction处理流程中的执行compaction处理流程
flush时的compaction
flush时的compaction通过MemStoreFlusher.FlusherHander.run执行
当flushRegion完成后,会触发compact的执行
CompactSplitThread.requestSystemCompaction--> requestCompactionInternal(region)
public synchronized void requestSystemCompaction(
final HRegion r, final String why) throws IOException {
requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false);
}
CompactSplitThread.requestCompactionInternal(Region)-->requestCompactionInternal(Store)
private List<CompactionRequest> requestCompactionInternal(final HRegion r, final String why,
intp, List<Pair<CompactionRequest, Store>> requests, booleanselectNow) throws IOException {
// not a special compaction request, so make our own list
List<CompactionRequest> ret = null;
if (requests == null) {
ret = selectNow ? newArrayList<CompactionRequest>(r.getStores().size()) : null;
for (Stores : r.getStores().values()) {
迭代发起针对store的compaction操作,传入的priority=Store.NO_PRIORITY,可参见非major的compact处理流程
CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow);
if (selectNow) ret.add(cr);
}
} else {
Preconditions.checkArgument(selectNow); // only system requests have selectNow == false
ret = newArrayList<CompactionRequest>(requests.size());
for (Pair<CompactionRequest, Store> pair : requests) {
ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst()));
}
}
returnret;
}
定时线程执行的compact流程
定期线程执行通过HRegionServer.CompactionChecker执行,
CompactionChecker线程主要作用:
生成通过hbase.server.thread.wakefrequency(10*1000ms)配置的定期检查region是否需要compact的检查线程,
如果需要进行compact,会在此处通过compact的线程触发compcat的请求
此实例中通过hbase.server.thread.wakefrequency(10*1000ms)配置major compact的优先级,
如果major compact的优先级大过此值,把compact的优先级设置为此值.
Store中通过hbase.server.compactchecker.interval.multiplier配置多少时间需要进行compact检查的间隔
默认为1000ms,
compactionChecker的检查周期为wakefrequency*multiplier ms,
也就是默认情况下线程调用1000次执行一次compact检查
a.compaction检查时发起compact的条件是
如果一个store中所有的file个数减去在做(或发起compact请求)的个数,大于或等于
hbase.hstore.compaction.min配置的值,
老版本使用hbase.hstore.compactionThreshold进行配置,默认值为3
b.major compact的条件检查
通过hbase.hregion.majorcompaction配置major的检查周期,default=1000*60*60*24
通过hbase.hregion.majorcompaction.jitter配置major的浮动时间,默认为0.2,
也就是major的时间上下浮动4.8小时
b2.检查(当前时间-major配置时间>store最小的文件生成时间)表示需要major,
b2.1>store下是否只有一个文件,同时这个文件已经到了major的时间,
b2.1>检查ttl时间是否达到(intager.max表示没配置),达到ttl时间需要major,否则不做
b2.2>文件个数大于1,到达major的时间,需要major
protected void chore() {
for (HRegion r : this.instance.onlineRegions.values()) {
if (r == null)
continue;
for (Stores : r.getStores().values()) {
try {
longmultiplier = s.getCompactionCheckMultiplier();
assertmultiplier > 0;
if (iteration % multiplier != 0) continue;
检查是否需要system的compact,当前所有的storefile个数减去正在做compact的storefile个数,
大于或等于hbase.hstore.compaction.min配置的值,表示需要compact
if (s.needsCompaction()) {
// Queue a compaction. Will recognize if major is needed.
发起系统的compact操作,见flush时的coompaction
this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
-
" requests compaction");
b2.或者以下几个条件检查通过:
b2.1.可选的storefile列表中修改时间最老的一个storefile的时间达到了间隔的major compact时间
b2.2.如果可选的storefile列表中只有一个storefile,同时此storefile的最老的一条数据的时间已经达到ttl时间
同时此storefile的时间达到了间隔的major时间间隔
b2.3.如果可选的storefile列表中有多少storefile,同时更新时间最老的一个storefile达到了major的时间间隔
b2.4.也就是storefile列表中最老的更新时间的一个storefile的时间达到了间隔的major时间,
但是可选的storefile个数只有一个,同时此storefile已经做过major(StoreFile.majorCompaction==true)
同时ttl时间没有配置或者ttl还没有过期那么此时这个storefile是不做major compact
通过hbase.hregion.majorcompaction配置major的间隔时间,
通过hbase.hregion.majorcompaction.jitter配置major的间隔的左右差
如:major的配置时间为24小时,同时间隔的左右差是0.2f,那么default = 20% = +/- 4.8 hrs
} elseif (s.isMajorCompaction()) {
if (majorCompactPriority == DEFAULT_PRIORITY
|| majorCompactPriority > r.getCompactPriority()) {
发起requestCompaction操作,见下面说明A
this.instance.compactSplitThread.requestCompaction(r, s, getName()
+ " requests major compaction; use default priority", null);
} else {
发起requestCompaction操作,见下面说明B
this.instance.compactSplitThread.requestCompaction(r, s, getName()
+ " requests major compaction; use configured priority",
this.majorCompactPriority, null);
}
}
} catch (IOException e) {
LOG.warn("Failed major compaction check on " + r, e);
}
}
}
iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
}
}
说明A:
CompactSplitThread.requestCompaction-->
requestCompaction(r, s, why, Store.NO_PRIORITY, request);
-->requestCompactionInternal(r, s, why, priority, request, true);此时设置selectNow为true
说明B:
CompactSplitThread.requestCompaction-->
requestCompactionInternal(r, s, why, priority, request, true);此时设置selectNow为true
-------------------------------------------------------------
requestCompactionInternal处理流程:
private synchronized CompactionRequest requestCompactionInternal(final HRegion r,
final Store s,
final String why, intpriority, CompactionRequest request, booleanselectNow)
针对store的compaction request处理流程
如果要对一个HBASE的表禁用掉compaction操作,可以通过create table时配置COMPACTION_ENABLED属性
private synchronized CompactionRequest requestCompactionInternal(final HRegion r, final Store s,
final String why, intpriority, CompactionRequest request, booleanselectNow)
throws IOException {
if (this.server.isStopped()
|| (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
returnnull;
}
CompactionContextcompaction = null;
此时的调用selectNow为true,(如果是系统调用,此时的selectNow为false,)
也就是在发起request到CompactSplitThread.CompactionRunner线程执行时,
如果是系统调用,传入的CompactionContext的实例为null,否则是用户发起的调用在这个地方得到compaction实例
if (selectNow) {
通过HStore.requestCompaction得到一个compactionContext,计算要进行compact的storefile
并设置其request.priority为Store.PRIORITY_USER表示用户发起的request
如果是flush时发起的compact,
并设置其request.priority为hbase.hstore.blockingStoreFiles配置的值减去storefile的个数,
表示系统发起的request,
如果hbase.hstore.blockingStoreFiles配置的值减去storefile的个数==PRIORITY_USER
那么priority的值为PRIORITY_USER+1
compaction = selectCompaction(r, s, priority, request);
if (compaction == null) returnnull; // message logged inside
}
// We assume that most compactions are small. So, put system compactions into small
// pool; we will do selection there, and move to large pool if necessary.
longsize = selectNow ? compaction.getRequest().getSize() : 0;
此时好像一直就得不到largeCompactions的实例,因为selectNow==false时,size的大小为0
不可能大于hbase.regionserver.thread.compaction.throttle配置的值
此配置的默认值是hbase.hstore.compaction.max*2*memstoresize
ThreadPoolExecutor pool = (!selectNow && s.throttleCompaction(size))
? largeCompactions : smallCompactions;
通过smallCompactions的线程池生成CompactionRunner线程并执行,见执行Compaction的处理线程
pool.execute(newCompactionRunner(s, r, compaction, pool));
if (LOG.isDebugEnabled()) {
String type = (pool == smallCompactions) ? "Small " : "Large ";
LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
+ (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
}
returnselectNow ? compaction.getRequest() : null;
}
相关推荐
以信息家电、智能型手机、PDA产品为出发点,广泛深入地分析相关的嵌入式系统技术。适用于产品主管、系统设计分析人员及欲进入该领域的工程师。是一本开发嵌入式系统产品必备的入门圣经,进入嵌入式系统领域的宝典。 ...
本书以ADSP2106x、ADSP2116x系列高性能浮点DSP为主,介绍了以数字信号处理器(DSP)为核心的实时数字信号处理的系统设计,详细论述了DSP与多种外围接口电路的设计方法,包括各种存储器、模数和数模转换电路、异步串行...
303 4-4-4 小结 305 4-5 Jini 306 4-5-1 Jini的架构 306 4-5-2 Jini的移植 309 4-5-3 小结 311 4-6 电信服务i-Mode与i-appli 312 4-6-1 i-Mode通信系统 312 4-6-2 Compact ...
To the compact size and hemanity of interface completes simple files management capability of mobile phone. This system after the test, the movement is stable, may put into ...
Asp.net地址转义(分析)加强版 Web的桌面提醒(Popup) Using the Popup Object Click button only once in asp.net 2.0 Coalesys PanelBar + R.a.d Treeview +Xml 构建的Asp.net 菜单和权限管理模块 突破屏蔽限制...
9.4 xml流样式分析程序 348 9.4.1 写入xml流 349 9.4.2 读取xml流 352 9.4.3 文档对象模型(dom) 360 9.5 xslt转换 364 9.5.1 使用xslt转换不同的xml标准 367 9.5.2 system.xml.xsl中定义的其他类和接口 ...
- 硬盘,软盘,CD-ROM和 DVD,ZIP,Smart Media,Compact Flash,等磁盘编辑器... - 支持 FAT,NTFS,Ext2/3/4,ReiserFS,Reiser4,UFS,CDFS,UDF 文件系统 - 支持对磁盘阵列 RAID 系统和动态磁盘的重组、分析和数据恢复 - ...
在GIS中,采用网络分析的原理与方法对空间网络数据进行处理与分析,是一项具有极其重要应用意义的工作。ArcView网络分析扩展模块Network Analyst主要有以下三种主要功能: 寻找最佳行进路线,如:找出两地通达的...
MinorGC 的过程(复制->清空->互换) ....................................................................................... 24 1:eden、servicorFrom 复制到 ServicorTo,年龄+1.................................
可达性分析............................................................................................................................................... 26 2.3.2. 2.3.3. 老年代 ........................