`
liguanqun811
  • 浏览: 22861 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

guava cache 原理

 
阅读更多

Guava cache是一个支持高并发的线程安全的本地缓存。多线程情况下也可以安全的访问或者更新cache。这一切都是借鉴了ConcurrentHashMap的结果,但是,guava cache 又有自己的特性:

  1. automatic loading of entries into the cache

    当cache中不存在要查找的entry的时候会自动执行用户自定义的加载逻辑,加载成功后将entry存入缓存并返回给用户(获取过程中调用方需要等待加载完成)未过期的entry,如果不存在或者已过期,则需要load,为防止多线程并发下重复加载,需要先锁定,获得加载资格的线程(获得锁的线程)创建一个LoadingValueRefrerence并放入map中,其他线程等待结果返回。

示例
@Test
public void testLoad() throws Exception{
    LoadingCache<String,Object> loadingCache=CacheBuilder.newBuilder().build(new CacheLoader<String, Object>() {
        @Override
        public Object load(String key) throws Exception {
            System.out.println("load value for key:"+key);
            return key+"";
        }
    });
    //第一次查询会执行cache的load方法,第二次直接从缓存获取
    System.out.println(loadingCache.get("key1"));
    System.out.println(loadingCache.get("key1"));
}
  1. least-recently-used eviction when a maximum size is exceeded

    如果指定了cache的最大容量,那么当触及容量阈值的时候会根据 LRU策略自动删除entry

@Test
public void testEviction() throws Exception{
    //设置cache的最大size为5,并监听删除事件,一旦有缓存数据被删除则会通知监听
    Cache<String,Object> cache=CacheBuilder.newBuilder().maximumSize(5).removalListener(notification -> {
        System.out.println(notification.getValue() + " is removed");
    }).build();
    //放入5个对象
    for(int i=0;i<5;i++){
        cache.put("key"+i,i);
    }
    //放入第6个对象,此时缓存已满,会根据LRU策略删除最早的数据
    cache.put("key"+10,10);
}
  1. time-based expiration of entries, measured since last access or last write

    如果指定了entry的最大存活时间,那么如果该entry从最后一次活跃时间到最大存活时间内没有再活跃过,接下来就会被从cache中删除,如果用户注册了RemoveLienster,将会同时接收到数据被移除的通知

@Test
public void testTimeBasedEviction() throws Exception{
    //写入30秒后过期,并监听删除事件,一旦有缓存数据被删除则会通知监听
    Cache<String,Object> cache=CacheBuilder.newBuilder().expireAfterWrite(30, TimeUnit.SECONDS)
            //为了查看过期效果,这里讲并发数改为了1?
            .concurrencyLevel(1)
            .removalListener(notification -> {
                System.out.println(notification.getValue() +
                        " is removed at:"+System.currentTimeMillis()/1000);
            }).build();
    //每隔10十秒一个对象,共放入5个对象,放入第4个对象时因为第1个已经过期则会删除第0个对象
    for(int i=0;i<=4;i++){
        System.out.println("put key"+i+":"+i+" at:"+System.currentTimeMillis()/1000);
        cache.put("key" + i, i);
        Thread.sleep(12000);
    }
}
  1. keys automatically wrapped in weak references

    cache中的key可以被包装成弱引用。弱引用的好处就是当jvm执行gc的时候会自动将该entry回收。如果一个对象只有WeakReference对象引用它,那么就可能被gc掉。gc掉后WearkReference对象会进入ReferenceQueue队列。cache会利用RQ队列将entry移除掉。

@Test
public void testWeakKeys() throws Exception {
    LoadingCache<String, String> loadingCache = CacheBuilder.newBuilder().concurrencyLevel(1)
            //缓存key使用WeakReference包装,一旦gc执行则回收该key,guava会自动移除key所对应的entry
            .weakKeys().removalListener(notification -> {
                //注册监听,打印被回收的key对应的value
                //此时打印的key为null(因为被gc掉了),value为实际值
                System.out.println("remove " + notification.getKey()+":"+notification.getValue());
            }).build(new CacheLoader<String, String>() {
                @Override
                public String load(String key) throws Exception {
                    System.out.println("load key:" + key);
                    return "v("+key.toString()+")";
                }
            });
    int i = 0;
    while (true) {
        //通知jvm需要执行gc
        if(i%10==0)
            System.gc();
        Thread.sleep(1000);
        loadingCache.get("bird" + (i++) + "");
    }
}
  1. values automatically wrapped in weak or soft references

    cache中的value也可以被自动包装成弱引用或者软引用

@Test
public void testWeakValues() throws Exception {
    LoadingCache<String, String> loadingCache = CacheBuilder.newBuilder().concurrencyLevel(1)
            //缓存value使用WeakReference包装,一旦gc执行则回收该value,guava会自动移除value所对应的entry
            .weakValues().removalListener(notification -> {
                //注册监听,打印被回收的key对应的value
                //此时打印的key为实际值,value为null(因为被gc掉了)
                System.out.println("remove " + notification.getKey()+":"+notification.getValue());
            }).build(new CacheLoader<String, String>() {
                @Override
                public String load(String key) throws Exception {
                    System.out.println("load key:" + key);
                    return "v("+key.toString()+")";
                }
            });
    int i = 0;
    while (true) {
        //通知jvm需要执行gc
        if(i%10==0)
            System.gc();
        Thread.sleep(1000);
        loadingCache.get("bird" + (i++) + "");
    }
}
  1. accumulation of cache access statistics

    cache支持key命中或者不命中的统计

@Test
public void testStat() throws Exception {
    Cache<String,String> cache=CacheBuilder.newBuilder()
            //开启统计功能
            .recordStats()
            .maximumSize(15).build();
    for(int i=0;i<20;i++)
        cache.put(i+"",i+"");

    for(int i=0;i<30;i++)
        cache.getIfPresent(i+"");
    //输出:
    //CacheStats{hitCount=15, missCount=15, loadSuccessCount=0,
    // loadExceptionCount=0, totalLoadTime=0, evictionCount=5}
    System.out.println(cache.stats().toString());
}

Guava cache 的数据结构

  1. Guava cache的数据结构与concurrentHashMap类似,都是由Segment数组构成。Segement数组的长度决定了cache的并发数,这是因为每一个Segment使用了单独的锁,其实每一个Segment继承了ReentrantLock,对Segment的写操作需要先拿到锁
  2. 每个Segment持有一个AtomicReferenceArray对象。此对象中存储了一个个的 ReferenceEntry。每一个entry持有key和value,key的hash值,以及一个next指针。此数据结构是不是非常像Hashmap? 所以每个segment就类似于一个HashMap,Guava Cache 就是一组HashMap的集合。entry还有可能有另外的两个或者四个指针,一对指针用来组成writeQueue,一对指针用来组成 accessQueue。为什么说可能会有呢? 开始我们讲了guava cache 支持过期驱逐entry的特性, 如果给cache赋予了支持写入后经过某个时间自动过期的属性,那么ReferenceEntry 就会有两个指针,这两个指针构成了writeQueue队列,还有个writeTime属性记录写入的时间。 同样,如果赋予了支持访问后经过某个时间段自动过期的属性,也会有另外两个指针,这两个指针构成了accessQueue队列,还有个accessTime属性,记录访问时间。 如果以上两个属性都支持,那么就会需要四个指针了,如果以上两个属性都不支持,那么久不需要这四个指针了。 这两个队列都是访问时间最早的在对头,较晚的在队尾。正是这些队列的应用才实现了Cache的LRU功能
Guava cache的属性
  1. expireAfterWriteNanos: 在entry写入,或者最近的替换之后,一旦超过了该时间期限没有新的写入及替换,就会将该entry从cache移除,如果设置为0,则会禁用cache过期触发时机是,在写入或者读取的时候,对writeQueue 队列中的数据进行过期性检查
  2. expireAfterAccessNanos: 在entry写入,或者最近替换、或者最近访问之后的 该指定时间段内没有再次写入,替换或者访问,就会将该entry从cache移除,如果该值设置为0,则相当于禁用cache,每次put之后就会立即过期。
  3. maximumSize: 指定整个cache的最大容量,一旦超过该容量则会引发enrty的驱逐。根据lru策略驱逐,如果该值设置为0,则相当于禁用cache,每次put之后会立即过期并被移除掉
  4. maximumWeight:指定这个cache的最大重量。该属性不能和
  5. maximumSize同时使用。如果该值被设置为0,则相当于禁用cache功能,每次put之后会立即过期并被移除。需要配合Weiger对对象使用。Weigher用于计算每一个entry的重量。
  6. writeQueue和accessQueue 都是双向链表队列,他们的目的就是维护entry写入或者访问的顺序,以便于之后的lru驱逐策略。每个segment都拥有各自的queue,因此,guava cache并不是绝对的lru,而是在各自的segment内lru,因为每一个segment使用独立queue。如果想使用严格的lru的话,只能将并发level设置为1了。 关于writequeue和accessqueue两个队列基于一个事实认定:在segment的数组中的元素必然也会在queue中,在queue中的元素也必然会在segment的数组中。
  7. writeQueue 设置了expireAfterWriteNanos参数才会启用该队列。每次新写入或者更新entry都会加入队列尾部。如果对原来的值做更新操作,则会将原来writeQueue队列中的entry摘下来,挂到队列尾部,并更新writeTime。
  8. accessQueue 设置了expireAfterAccess参数或者maxWeight参数才会启用该队列。每次写入或者更新entry都会加入到队列尾部,如果对原来的值做更新操作,则会将原来accessQueue 队列中的entry摘下来,挂到队列尾部,并更新accessTime。
  9. recencyQueue 启用条件和accessQueue一样。每次访问操作都会将该entry加入到队列尾部,并更新accessTime。如果遇到写入操作,则将该队列内容排干,如果accessQueue队列中持有该这些 entry,然后将这些entry add到accessQueue队列。注意,因为accessQueue是非线程安全的,所以如果每次访问entry时就将该entry加入到accessQueue队列中,就会导致并发问题。所以这里每次访问先将entry临时加入到并发安全的ConcurrentLinkedQueue队列中,也就是recencyQueue中。在写入的时候通过加锁的方式,将recencyQueue中的数据添加到accessQueue队列中。 如此看来,recencyQueue是为 accessQueue服务的。以便高效的实现expireAfterAccess功能。 关于使用recencyQueue的好处:get的时候使用并发工具ConcurrentLinkedQueue队列添加entry,而不用lock(),一个是无阻赛锁一个是阻塞锁。 https://github.com/google/guava/issues/1487
Cache的构造

cahce的构造是典型的Builder模式

建造者模式定义: 将一个复杂对象的构建与它的表示分离,使得同样的构建过程可以创建不同的表示。 建造者模式包括的角色:

  1. Builder:给出一个抽象接口或抽象类,以规范产品的建造。这个接口规定要实现复杂对象的哪些部分的创建,并不涉及具体的对象部件的创建,一般由子类具体实现。
  2. ConcreteBuilder:Builder接口的实现类,并返回组建好对象实例。
  3. Director:调用具体建造者来创建复杂对象的各个部分,在指导者中不涉及具体产品的信息,只负责保证对象各部分完整创建或按某种顺序创建。
  4. Product:要创建的复杂对象,产品类。

在Guava cache中,product是LocalCache,Builder 是CacheBuilder,CacheBuilder拥有LocalCache类似的属性以便用来构造LocalCache,而LocalCache继承了AbstractMap。

看下LocalCache的构造函数

LocalCache(CacheBuilder builder, CacheLoader loader) {
     //默认并发数为4
    concurrencyLevel = Math.min(builder.getConcurrencyLevel(), MAX_SEGMENTS);
    
    //cache支持不同强度的Reference,为便于对不同强度的key及value包装抽象出了
    //Strength类
    keyStrength = builder.getKeyStrength();
    valueStrength = builder.getValueStrength();
    
    //根据key取value时使用,用来判断两个key是否相等
    keyEquivalence = builder.getKeyEquivalence();
    valueEquivalence = builder.getValueEquivalence();
    
    //还可以设置cache的总重量,若超重则会驱逐最早的entry
    //若未指定称重器的话,每个entry的重量默认为1,此时
    //cache最大重量与cache最大元素个数相等
    maxWeight = builder.getMaximumWeight();
    //称重器,衡量每个entry的重量
    weigher = builder.getWeigher();
    
    expireAfterAccessNanos = builder.getExpireAfterAccessNanos();
    expireAfterWriteNanos = builder.getExpireAfterWriteNanos();
    refreshNanos = builder.getRefreshNanos();
    
    //监听器,监听entry的删除并执行相应的操作
    removalListener = builder.getRemovalListener();
    removalNotificationQueue = (removalListener == NullListener.INSTANCE)
        ? LocalCache.<RemovalNotification<K, V>>discardingQueue()
        : new ConcurrentLinkedQueue<RemovalNotification<K, V>>();

    ticker = builder.getTicker(recordsTime());
    //entry工厂,创建不同Reference的entry
    entryFactory = EntryFactory.getFactory(keyStrength, usesAccessEntries(), usesWriteEntries());
    
    globalStatsCounter = builder.getStatsCounterSupplier().get();
    defaultLoader = loader;
    
    //设置cache的初始容量
    int initialCapacity = Math.min(builder.getInitialCapacity(), MAXIMUM_CAPACITY);
    if (evictsBySize() && !customWeigher()) {
      initialCapacity = Math.min(initialCapacity, (int) maxWeight);
    }

    //设置合适的并发数,应该是2的n次方,且大于等于用户指定的并发数
    //例如用户指定了concurrentLevel=9,那么segmentCount=16
    int segmentShift = 0;
    int segmentCount = 1;
    while (segmentCount < concurrencyLevel
           && (!evictsBySize() || segmentCount * 20 <= maxWeight)) {
      ++segmentShift;
      segmentCount <<= 1;
    }
    this.segmentShift = 32 - segmentShift;
    segmentMask = segmentCount - 1;
    
    //看,这里创建了上图中的Segment数组
    this.segments = newSegmentArray(segmentCount);
    
    //设置每个segment中数组的大小,保整每个segmentSize大小为
    //2的n次方,总的segmentSize大于等于initialCapacity
    int segmentCapacity = initialCapacity / segmentCount;
    if (segmentCapacity * segmentCount < initialCapacity) {
      ++segmentCapacity;
    }

    int segmentSize = 1;
    while (segmentSize < segmentCapacity) {
      segmentSize <<= 1;
    }
    
    //如果指定了最大容量,则计算每个Segment的最大容量
    //确保所有Segment容量等于用户指定的最大数
    if (evictsBySize()) {
      // Ensure sum of segment max weights = overall max weights
      long maxSegmentWeight = maxWeight / segmentCount + 1;
      long remainder = maxWeight % segmentCount;
      for (int i = 0; i < this.segments.length; ++i) {
        if (i == remainder) {
          maxSegmentWeight--;
        }
        this.segments[i] =
            createSegment(segmentSize, maxSegmentWeight, builder.getStatsCounterSupplier().get());
      }
    } else {
      for (int i = 0; i < this.segments.length; ++i) {
        this.segments[i] =
            createSegment(segmentSize, UNSET_INT, builder.getStatsCounterSupplier().get());
      }
    }
  }

创建Segment

Segment<K, V> createSegment(
  int initialCapacity, long maxSegmentWeight, StatsCounter statsCounter) {
return new Segment<K, V>(this, initialCapacity, maxSegmentWeight, statsCounter);
}
Segment(LocalCache<K, V> map, int initialCapacity, long maxSegmentWeight,
    StatsCounter statsCounter) {
  this.map = map;
  //设置sgement的最大容量
  this.maxSegmentWeight = maxSegmentWeight;
  this.statsCounter = checkNotNull(statsCounter);
  initTable(newEntryArray(initialCapacity));

  keyReferenceQueue = map.usesKeyReferences()
       ? new ReferenceQueue<K>() : null;

  valueReferenceQueue = map.usesValueReferences()
       ? new ReferenceQueue<V>() : null;

  recencyQueue = map.usesAccessQueue()
      ? new ConcurrentLinkedQueue<ReferenceEntry<K, V>>()
      : LocalCache.<ReferenceEntry<K, V>>discardingQueue();

  writeQueue = map.usesWriteQueue()
      ? new WriteQueue<K, V>()
      : LocalCache.<ReferenceEntry<K, V>>discardingQueue();

  accessQueue = map.usesAccessQueue()
      ? new AccessQueue<K, V>()
      : LocalCache.<ReferenceEntry<K, V>>discardingQueue();
}
//构建Segment的内部数组容器
AtomicReferenceArray<ReferenceEntry<K, V>> newEntryArray(int size) {
  return new AtomicReferenceArray<ReferenceEntry<K, V>>(size);
}

Guava cache的常用操作解析

put操作
@Override
  public V putIfAbsent(K key, V value) {
    checkNotNull(key);
    checkNotNull(value);
    //对key hash
    int hash = hash(key);
    //定位到key所在的segment然后执行put
    return segmentFor(hash).put(key, hash, value, true);
  }
V put(K key, int hash, V value, boolean onlyIfAbsent) {
  //写操作需要获取此Segment对象的锁
  lock();
  try {
    long now = map.ticker.read();
    //首先清除ReferenceQueue队列中entry
    //然后清除过期的entry
    preWriteCleanup(now);
    
    int newCount = this.count + 1;
    //如果Segment中Entry数量超出阈值,则扩容
    if (newCount > this.threshold) { // ensure capacity
      expand();
      newCount = this.count + 1;
    }

    AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
    //计算出下标
    int index = hash & (table.length() - 1);
    ReferenceEntry<K, V> first = table.get(index);

    // 如果key已经存在了,进入for循环内部执行
    for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
      K entryKey = e.getKey();
      if (e.getHash() == hash && entryKey != null
          && map.keyEquivalence.equivalent(key, entryKey)) {
        // We found an existing entry.

        ValueReference<K, V> valueReference = e.getValueReference();
        V entryValue = valueReference.get();

        if (entryValue == null) {
          ++modCount;
          //如果原来的valueRef有效,则更新value值
          if (valueReference.isActive()) {
            enqueueNotification(key, hash, valueReference, RemovalCause.COLLECTED);
            setValue(e, key, value, now);
            newCount = this.count; // count remains unchanged
          } else {
            //如果原来的valueReference失效,则添加个新的entry
            setValue(e, key, value, now);
            newCount = this.count + 1;
          }
          this.count = newCount; // write-volatile
          evictEntries();
          return null;
        } else if (onlyIfAbsent) {
          // Mimic
          // "if (!map.containsKey(key)) ...
          // else return map.get(key);
          recordLockedRead(e, now);
          return entryValue;
        } else {
          // 若原来的值存在,则更新,同时生成一个驱逐事件并加入到通知队列
          ++modCount;
          enqueueNotification(key, hash, valueReference, RemovalCause.REPLACED);
          setValue(e, key, value, now);
          evictEntries();
          return entryValue;
        }
      }
    }
    //此时,表明key原先不存在,需要创建一个新的entry
    // Create a new entry.
    ++modCount;
    ReferenceEntry<K, V> newEntry = newEntry(key, hash, first);
    //设置entry的value
    setValue(newEntry, key, value, now);
    //将entry挂到Segment的数组里
    table.set(index, newEntry);
    newCount = this.count + 1;
    this.count = newCount; // write-volatile
    //新添加了一个entry,当然要看下缓存是否满了,需要执行清理操作释放空间
    evictEntries();
    return null;
  } finally {
    unlock();
    //解锁以后,触发移除通知事件
    postWriteCleanup();
  }
}

看下是如何删除entry的

void postWriteCleanup() {
  runUnlockedCleanup();
}
void runLockedCleanup(long now) {
  if (tryLock()) {
    try {
      //先清空ReferenceQueue队列中的key或者value对应对的entry
      drainReferenceQueues();
      expireEntries(now); // calls drainRecencyQueue
      readCount.set(0);
    } finally {
      unlock();
    }
  }
}

void drainReferenceQueues() {
  if (map.usesKeyReferences()) {
    //清空key对应的entry
    drainKeyReferenceQueue();
  }
  if (map.usesValueReferences()) {
    //清空value对应的entry
    drainValueReferenceQueue();
  }
}

void drainKeyReferenceQueue() {
  Reference<? extends K> ref;
  int i = 0;
  while ((ref = keyReferenceQueue.poll()) != null) {
    @SuppressWarnings("unchecked")
    ReferenceEntry<K, V> entry = (ReferenceEntry<K, V>) ref;
    map.reclaimKey(entry);
    if (++i == DRAIN_MAX) {
      break;
    }
  }
}

void expireEntries(long now) {
  drainRecencyQueue();
  ReferenceEntry<K, V> e;
  while ((e = writeQueue.peek()) != null && map.isExpired(e, now)) {
    if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
      throw new AssertionError();
    }
  }
  while ((e = accessQueue.peek()) != null && map.isExpired(e, now)) {
    if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
      throw new AssertionError();
    }
  }
}
  //判断一个entry是否过期  
  boolean isExpired(ReferenceEntry<K, V> entry, long now) {
    checkNotNull(entry);
    if (expiresAfterAccess()
        && (now - entry.getAccessTime() >= expireAfterAccessNanos)) {
      return true;
    }
    if (expiresAfterWrite()
        && (now - entry.getWriteTime() >= expireAfterWriteNanos)) {
      return true;
    }
    return false;
  }

Entry中持有key和value,根据key的强度,Entry分为StrongEntry强引用Entry 和WeakEntry弱引用Entry。而根据value的强度,分为强引用StrongValueReference、软引用SoftValueReference和弱引用WeakValueReferece。根据不同的key或者value强度,创建不同的Entry和valueReference。Strength对象负责创建不同强度的ValueReference。

get操作
public V get(@Nullable Object key) {
    if (key == null) {
      return null;
    }
    int hash = hash(key);
    //哈希定位到Segment,然后调用segment.get()
    return segmentFor(hash).get(key, hash);
  }
  
  V get(Object key, int hash) {
  try {
    //快速失败
    if (count != 0) { // read-volatile
      long now = map.ticker.read();
      //查询未过期的entry
      ReferenceEntry<K, V> e = getLiveEntry(key, hash, now);
      if (e == null) {
        return null;
      }

      V value = e.getValueReference().get();
      if (value != null) {
        recordRead(e, now);
        //如果指定了定时刷新,则尝试刷新value
        return scheduleRefresh(e, e.getKey(), hash, value, now, map.defaultLoader);
      }
      tryDrainReferenceQueues();
    }
    return null;
  } finally {
    postReadCleanup();
  }
}
  • 大小: 14.5 KB
  • 大小: 36.5 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics