`
阅读更多

 (编写不易,转载请注明:http://shihlei.iteye.com/blog/2432861) 

 

一 概述

项目用ElasticSearch作为检索引擎,索引规模不大,业务初期 “8千” ducuments,后续增长到最大 “30万”,索引大小约 1g。

 

业务要求性能优先,期望50ms响应。在没有 “ssd” 的硬件配置下,且 datanode 平均内存 大于 60g 的情况,热切期望 ElasticSearch 充分利用内存

 

实际情况:

ElasticSearch 使用了Lucene CacheOnLargeSegments Policy :document 小于 1万时 认为是小文档,不会利用 QueryCache。

 

因此对ElasticSearch 的 NodeQueryCache 进行了研究,本文旨在介绍 ElasticSearch NodeQueryCache的配置及底层实现。

 

注:

相关配置支持ElasticSearch 2.4 ,6.4 版本,其他版本未做确认。

 

二 ElasticSearch NodeQueryCache

1)概述

(1) Node query cache:

 

用于缓存Query的查询结果,每个Node一个,Node上所有Shard共享。Cache实现LRU的逐出策略,Cache满时,最少使用的数据会被逐出。

 

(2)ElasticSearch Query Caching Policy:缓存策略

 

Query Cache 内部使用 内存 bitset 缓存 满足 这次filter 查询的 ducuments,下次查询,直接从内存读取。

 

ElasticSearch默认缓存策略:

[1] non-scoring query :非评分查询(filter 查询)

 

[2] Query 在最近的256个查询中多次使用了多次

即:Lucene 的 UsageTrackingQueryCachingPolicy 策略:

Lucene认为:Query Cache 使用 BitSet 表示一个查询结果的DocId是集合。一个 field+fieldValue对应一个BitSet,如果field的取值规模庞大且大量查询的fieldValue的过于离散,则缓存命中率很低,复用性不高,缓存维护影响性能。

 

[3] Query 查询的Segment 包含超过 10000个 document(或总文档的3%,以较大者为准): 

即:Lucene 的 CacheOnLargeSegments 策略:

Lucene认为:小段快速搜索并快速合并,因此在此处缓存位集是没有意义的

 

特别注意:如果想开启全部缓存(非大segment),可配置 index.queries.cache.everything: true

 

官方doc:

Node Query Cache:https://www.elastic.co/guide/en/elasticsearch/reference/current/query-cache.html

All About Caching:https://www.elastic.co/guide/en/elasticsearch/guide/master/filter-caching.html

 

2)ElasticSearch Node Query Cache 重要配置:

  • indices.queries.cache.size:缓存大小,默认:最大堆的 10%,如果内存富裕可以加大
  • indices.queries.cache.count:缓存query数量,默认:10000,如果内存使用率上去不,可以增大
  • index.queries.cache.everything:是否缓存所有,默认:false(只缓存超过 10000 个文档段)

 三 Lucene QueryCache 分析

 1)概述: 

Lucene 查询过程,可以简述为 Query解析 》 Weight计算权重 》 Scorer 合并DocIdSet,进行打分的过程 其中 Scorer 的打分过程完成了“缓存” 使用流程,倒排合并,打分。

 

QueryCache 主要应用于 Scorer 倒排合并打分流程,会递归取得所有查询结果的DocIdSet,如果缓存有则从缓存中取。

 

流程时序:


缓存核心步骤概述:

(1)IndexSearcher 在search()方法中: 调用 createWeight() 根据Query 创建Weight 树,并调用LRUQueryCache,包装Weight 为 CachingWrapperWeight

(2)IndexSearcher 在search()方法中: 调用 CachingWrapperWeight.bulkScore() 创建 Score 树,该方法调用 LRUQueryCache.get() 从 LeafCache中读取 docSetId,如果缓存没有,则文件读取,并调用LRUQueryCache.putIfAbsent() 缓存供下次使用。

 

注:其中缓存要经过多次Policy的缓存策略判断,该Query的结果集是否需要缓存。

 

类职能:

(1)IndexSearcher:索引查询器

(2)LRUQueryCache:使用LRU策略实现的 QueryCache,提供多个 Index 的缓存能力。

(3)LeafCache:Leaf 缓存,提供根据 Query 查询 缓存的DocIdSet 能力。

(4)CachingWrapperWeight:提供weight 包装。

 

2) 源码分析

(1)QueryCache 缓存接口接口

public interface QueryCache {
	
  Weight doCache(Weight weight, QueryCachingPolicy policy);

}

 注:

接受个参数 weight,policy,返回一个Weight,单从接口上看,无法确定设计意图。

但是由于没有get,put方法,所以这个接口应该不是直接使用缓存的操作接口。

 

整体结构:

 

 

(2)IndexSearcher 缓存使用流程

【1】调用createWeight() 创建Weight树

 

<1> 创建Weight
public class IndexSearcher {

  // 。。。。。。

  private QueryCache queryCache = DEFAULT_QUERY_CACHE;
  private QueryCachingPolicy queryCachingPolicy = DEFAULT_CACHING_POLICY;

  static {
    final int maxCachedQueries = 1000;
    // min of 32MB or 5% of the heap size
    final long maxRamBytesUsed = Math.min(1L << 25, Runtime.getRuntime().maxMemory() / 20);
    DEFAULT_QUERY_CACHE = new LRUQueryCache(maxCachedQueries, maxRamBytesUsed);
  }

  public Weight createNormalizedWeight(Query query, boolean needsScores) throws IOException {
    query = rewrite(query);
    Weight weight = createWeight(query, needsScores);
    float v = weight.getValueForNormalization();
    float norm = getSimilarity(needsScores).queryNorm(v);
    if (Float.isInfinite(norm) || Float.isNaN(norm)) {
      norm = 1.0f;
    }
    weight.normalize(norm, 1.0f);
    return weight;
  }

  public Weight createWeight(Query query, boolean needsScores) throws IOException {
    final QueryCache queryCache = this.queryCache;
    Weight weight = query.createWeight(this, needsScores);
    if (needsScores == false && queryCache != null) {
      weight = queryCache.doCache(weight, queryCachingPolicy);
    }
    return weight;
  }

  // 。。。。。。

}

 

  说明:

  这里发现,最终调用的是LRUQueryCache.doCache() 方法

 

<2> 调用 LRUQueryCache.doCache()

 

public class LRUQueryCache implements QueryCache, Accountable {

  // 。。。。。。

  // 返回一个包装类 CachingWrapperWeight 无任何策略
  @Override
  public Weight doCache(Weight weight, QueryCachingPolicy policy) {
    while (weight instanceof CachingWrapperWeight) {
      weight = ((CachingWrapperWeight) weight).in;
    }
    return new CachingWrapperWeight(weight, policy);
  }

  // 。。。。。。

}

 

 说明:

代码很简单,只是在最后创建一个 CachingWrapperWeight,封装的实际的Weight 和 policy , 这里暂时不看细节。

 

【2】调用Weight的bulkScore() 创建Score树,触发缓存使用流程

 

<1>简单看下IndexSearcher 入口

 

public class IndexSearcher {

  // 。。。。。。

  protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector)
      throws IOException {

    // 。。。。。。

    for (LeafReaderContext ctx : leaves) { // search each subreader
      
      // 。。。。。。

      // 创建 Scorer 树,
      BulkScorer scorer = weight.bulkScorer(ctx);
      if (scorer != null) {
        try {
          // 合并倒排表,记录分数
          scorer.score(leafCollector, ctx.reader().getLiveDocs());
        } catch (CollectionTerminatedException e) {
          // collection was terminated prematurely
          // continue with the following leaf
        }
      }
    }
  }

  // 。。。。。。
}

 

说明:

weight.bulkScorer(ctx); 是缓存触发的入口,由 CachingWrapperWeight 提供

 

<2> CachingWrapperWeight 是LRUQueryCache的内部类,提供Cache使用的流程

 

 private class CachingWrapperWeight extends ConstantScoreWeight {

    private final Weight in;
    private final QueryCachingPolicy policy;
    private final AtomicBoolean used;

    CachingWrapperWeight(Weight in, QueryCachingPolicy policy) {
      super(in.getQuery());
      this.in = in;
      this.policy = policy;
      used = new AtomicBoolean(false);
    }

   	// 。。。。。。

    @Override
    public BulkScorer bulkScorer(LeafReaderContext context) throws IOException {

	  // 。。。。。。

      // 缓存查询
      DocIdSet docIdSet = get(in.getQuery(), context);
      if (docIdSet == null) {
      	// 缓存未命中
        if (shouldCache(context)) {// 如果context允许缓存
          // 读取结果集的scorer , 并将其包含的 docIdSet 转换成  RoaringDocIdSet的内存结构
          docIdSet = cache(context);

          // 插入缓存
          putIfAbsent(in.getQuery(), context, docIdSet);
        } else {
          return in.bulkScorer(context);
        }
      }
	  
	  // 。。。。。。

	  final DocIdSetIterator disi = docIdSet.iterator();
      if (disi == null) {
        return null;
      }

      return new DefaultBulkScorer(new ConstantScoreScorer(this, 0f, disi));
    }

	// 。。。。。。

  }

  

  说明:bulkScorer() 方法的主要流程

  a) 使用LRUQueryCache.get() 查询缓存的 RoaringDocIdSet ,如果缓存命中,则返回,下面详述

  b) 如果未命中则,检查是否允许缓存,不允许缓存,直接调用IndexReader 读取数据

  c)允许缓存,则调用IndexReader 读取数据,转换成 RoaringDocIdSet 的内存结构,并插入缓存

 

(3)LRUQueryCache 缓存查询流程

 

【1】LRUQueryCache 流程:

 

public class LRUQueryCache implements QueryCache, Accountable {

  // 存储最近使用的Query列表
  private final Map<Query, Query> uniqueQueries;
  
  private final Set<Query> mostRecentlyUsedQueries;

  // 索引库对应的LeafCache缓存
  private final Map<Object, LeafCache> cache;

  // 。。。。。。

  synchronized DocIdSet get(Query key, LeafReaderContext context) {
 	// 。。。。。。

 	// 获取readerKey,一个IndexReader 实例对应一个
    final Object readerKey = context.reader().getCoreCacheKey();

    // 获得该IndexReader 实例的 LeafCache ,里面缓存了 Query对应的 DocIdSet
    final LeafCache leafCache = cache.get(readerKey);
    if (leafCache == null) { // 没有直接返回
      onMiss(readerKey, key);
      return null;
    }

    // 查询该Query最近有没有使用,没有使用直接返回,因为不会有缓存
    final Query singleton = uniqueQueries.get(key);
    if (singleton == null) {
      onMiss(readerKey, key);
      return null;
    }

    // 从对应的LeafCache中获取缓存结果 DocIdSet
    final DocIdSet cached = leafCache.get(singleton);
    if (cached == null) {
      onMiss(readerKey, singleton);
    } else {
      onHit(readerKey, singleton);
    }
    return cached;
  }
  // 。。。。。。
}

 

说明:

数据结构:

a)Map<Query, Query> uniqueQueries,Set<Query> mostRecentlyUsedQueries:用于存储该Query是否最近使用过。

b)Map<Object, LeafCache> cache:key是IndexReader,value是LeafCache,用于获取当前IndexReader的缓存

c)LeafCache:真实缓存:key:Query,Value :DocIdSet;

 

查询流程:

a)获取 当前 Query对应的IndexReader 对应的LeafCache,如果没有,则没有 cache 数据

b)通过 uniqueQueries 的记录判断 当前 Query 是否最近使用过,如果没有使用过,一定没有cache 数据

c)如果 Query 最近使用过,且存在Index对应的LeafCache,则直接从 LeafCache中获取 DocIdSet 返回

 

【2】LeafCache 的使用流程

 

private class LeafCache implements Accountable {
 	// 。。。。。。
 
    private final Map<Query, DocIdSet> cache;
    
    // 。。。。。。
  
    DocIdSet get(Query query) {
      assert query instanceof BoostQuery == false;
      assert query instanceof ConstantScoreQuery == false;
      assert query.getBoost() == 1f;
      return cache.get(query);
    }
}

 

说明:

 

很简单的一个Map 查询。

 

(4)LRUQueryCache 数据读取及缓冲填充流程

 

【1】CachingWrapperWeight 入口判断受允许使用缓存

 

 private class CachingWrapperWeight extends ConstantScoreWeight {


   	// 。。。。。。

    private boolean shouldCache(LeafReaderContext context) throws IOException {
      return cacheEntryHasReasonableWorstCaseSize(ReaderUtil.getTopLevelContext(context).reader().maxDoc())
          && policy.shouldCache(in.getQuery(), context);
    }

    private boolean cacheEntryHasReasonableWorstCaseSize(int maxDoc) {
      // 缓存本次查询所需空间
      final long worstCaseRamUsage = maxDoc / 8;
      // 内存总大小
      final long totalRamAvailable = maxRamBytesUsed;
      // 一次结果不能超过内存大小的 五分之一,否则会触发淘汰机制
      return worstCaseRamUsage * 5 < totalRamAvailable;
    }

	// 。。。。。。
  }

 

说明:

允许缓存的大条件如下:一次结果不能超过内存大小的 五分之一,否则内存消耗太大,否则会触发淘汰机制,影响性能

用户指定的Policy,后续详述

 
【2】允许缓存,则调用IndexReader 读取数据,转换成 RoaringDocIdSet的内存结构

 

<1> CachingWrapperWeight 读取数据,封装成缓存数据结构

 

private class CachingWrapperWeight extends ConstantScoreWeight {

   	// 。。。。。。

    private DocIdSet cache(LeafReaderContext context) throws IOException {
      final BulkScorer scorer = in.bulkScorer(context);
      if (scorer == null) {
        return DocIdSet.EMPTY;
      } else {
        return cacheImpl(scorer, context.reader().maxDoc());
      }
    }

	// 。。。。。。

 }

 

说明:

cacheImpl() 方法用于将读取到的Scorer重的DocIdSet 封装成 RoaringDocIdSet, 该方法 有LRUQueryCache 提供。

 

<2> LRUQueryCache 的 cacheImpl 方法

 

public class LRUQueryCache implements QueryCache, Accountable {
  // 。。。。。。

  protected DocIdSet cacheImpl(BulkScorer scorer, int maxDoc) throws IOException {
    final RoaringDocIdSet.Builder builder = new RoaringDocIdSet.Builder(maxDoc);
    scorer.score(new LeafCollector() {

      @Override
      public void setScorer(Scorer scorer) throws IOException {}

      @Override
      public void collect(int doc) throws IOException {
        builder.add(doc);
      }

    }, null);
    return builder.build();
  }

  // 。。。。。。
}

 

说明:很简单,不解释了。

 
【3】同步到LRUQueryCache中供 下次使用:主要是调用:LRUQueryCache.putInAbsent()

 

public class LRUQueryCache implements QueryCache, Accountable {
  // 。。。。。。

  synchronized void putIfAbsent(Query query, LeafReaderContext context, DocIdSet set) {
  	// 。。。。。。

  	// 向uniqueQueries中设置一个
    Query singleton = uniqueQueries.get(query);
    if (singleton == null) {
      uniqueQueries.put(query, query);
      onQueryCache(singleton, LINKED_HASHTABLE_RAM_BYTES_PER_ENTRY + ramBytesUsed(query));
    } else {
      query = singleton;
    }

    // 获得该IndexReader 实例的 LeafCache ,里面缓存了 Query对应的 DocIdSet
    final Object key = context.reader().getCoreCacheKey();
    LeafCache leafCache = cache.get(key);
    if (leafCache == null) {
      leafCache = new LeafCache(key);
      final LeafCache previous = cache.put(context.reader().getCoreCacheKey(), leafCache);
      ramBytesUsed += HASHTABLE_RAM_BYTES_PER_ENTRY;
      assert previous == null;
      // we just created a new leaf cache, need to register a close listener
      context.reader().addCoreClosedListener(new CoreClosedListener() {
        @Override
        public void onClose(Object ownerCoreCacheKey) {
          clearCoreCacheKey(ownerCoreCacheKey);
        }
      });
    }

    //插入 DocIdSet 
    leafCache.putIfAbsent(query, set);
    evictIfNecessary();
  }

  // 。。。。。。
}

 

说明:

这里还是填充get() 方法 使用的三大数据结构:Map<Query, Query> uniqueQueries,Set<Query> mostRecentlyUsedQueries,Map<Object, LeafCache> cache ,只是走的反向流程,他简单,不解释了,中间还更新过内存使用,以及evictIfNecessary()缓存维护,咱不解释.

 

至此缓存的时候全部结束了。

 

四 缓存策略:QueryCachingPolicy

1)概述:

LRUQueryCache 处理基础的判断外,还需要使用QueryCachingPolicy 接口提供的缓冲策略,已确定具体执行时的缓存机制。

默认是 UsageTrackingQueryCachingPolicy + CacheOnLargeSegments ; 

 

Lucene 中:可以通过使用 ALWAYS_CACHE 来指定不限,都缓存。

ElasticSearch 中: 可以通过配置 index.queries.cache.everything:true 指定。

 

类结构如下:


 

2)Lucene 缓存策略说明:

(1)QueryCachingPolicy 接口:高层定义策略

 

public interface QueryCachingPolicy {
  
  void onUse(Query query);

  boolean shouldCache(Query query, LeafReaderContext context) throws IOException;
}

 

重点:shouldCache() 方法,指定了这个Query的结果是否应该被缓存

 

(2)缓存所有策略

 

  public static final QueryCachingPolicy ALWAYS_CACHE = new QueryCachingPolicy() {

    @Override
    public void onUse(Query query) {}

    @Override
    public boolean shouldCache(Query query, LeafReaderContext context) throws IOException {
      return true;
    }

  };

 

 

(3)CacheOnLargeSegments: 大Segment 缓存策略

 

  public static class CacheOnLargeSegments implements QueryCachingPolicy {

    public static final CacheOnLargeSegments DEFAULT = new CacheOnLargeSegments(10000, 0.03f);

    private final int minIndexSize;
    private final float minSizeRatio;

  	// 。。。。。。

    @Override
    public void onUse(Query query) {}

    @Override
    public boolean shouldCache(Query query, LeafReaderContext context) throws IOException {
      final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context);

      // 索引文档数 > 指定阈值 minIndexSize (默认1000)
      if (topLevelContext.reader().maxDoc() < minIndexSize) {
        return false;
      }

      // segment 的document数量 占 总文档数的比例超过 指定阈值 (默认 0.3)
      final float sizeRatio = (float) context.reader().maxDoc() / topLevelContext.reader().maxDoc();
      return sizeRatio >= minSizeRatio;
    }

  };

 

 

 总结:可缓存要求:

 a)索引库文档数 大于 指定阈值 minIndexSize (默认10000)

 b)segment 的document数量 占 总文档数的比例超过 指定阈值 (默认 0.3)

 

(4)UsageTrackingQueryCachingPolicy :使用跟踪的缓存策略

 

public final class UsageTrackingQueryCachingPolicy implements QueryCachingPolicy {
  // 。。。。。。

  private final QueryCachingPolicy.CacheOnLargeSegments segmentPolicy;
  private final FrequencyTrackingRingBuffer recentlyUsedFilters;

  public UsageTrackingQueryCachingPolicy() {
    this(QueryCachingPolicy.CacheOnLargeSegments.DEFAULT, 256);
  }

  // 。。。。。。

  @Override
  public boolean shouldCache(Query query, LeafReaderContext context) throws IOException {

  	// MatchAllDocsQuery ,MatchNoDocsQuery 不缓存
    // BooleanQuery 没有查询条件不缓存

    // 。。。。。。
    
    // CacheOnLargeSegments 策略是否允许
    if (segmentPolicy.shouldCache(query, context) == false) {
      return false;
    }

    //从 recentlyUsedFilters 中 获取该Query的最近查询次数
    final int frequency = frequency(query);
    //获取 该 Query 类型对应的最小缓存使用频次值
    final int minFrequency = minFrequencyToCache(query);
    //该 Query 最近访问次数 > 该查询类型的缓存最小访问次数要求,则允许缓存
    return frequency >= minFrequency;
  }

  int frequency(Query query) {
    assert query instanceof BoostQuery == false;
    assert query instanceof ConstantScoreQuery == false;

    int hashCode = query.hashCode();

    synchronized (this) {
      return recentlyUsedFilters.frequency(hashCode);
    }
  }

  protected int minFrequencyToCache(Query query) {
    if (isCostly(query)) {
      // 高成本查询:MultiTermQuery,MultiTermQueryConstantScoreWrapper,需要超过2次
      return 2;
    } else if (isCheap(query)) {
      // 低成本查询,TermQuery 需要超过 20次
      return 20;
    } else {
      // 默认超过5次
      return 5;
    }
  }

  static boolean isCostly(Query query) {
    return query instanceof MultiTermQuery ||
        query instanceof MultiTermQueryConstantScoreWrapper;
  }

  static boolean isCheap(Query query) {
    return query instanceof TermQuery;
  }
}

 

总结:可缓存要求:

a)Query类型 非 MatchAllDocsQuery ,MatchNoDocsQuery 

b)Query类型 非 不带条件的 BooleanQuery

c)CacheOnLargeSegments 允许缓存策略,见上面(3)CacheOnLargeSegments: 大Segment 缓存策略

d)查看该Query最近256次查询中出现的次数是否超过类型对应的最小访问次数值

(i)高成本Query:MultiTermQuery,MultiTermQueryConstantScoreWrapper:最少2次

(ii)低成本Query:TermQuery:最小20次

(ii)其他:最小5次

 

 

  • 大小: 324.1 KB
  • 大小: 198.1 KB
  • 大小: 197.6 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics