`
Donald_Draper
  • 浏览: 950845 次
社区版块
存档分类
最新评论

netty Pooled字节buf分配器

阅读更多
netty 字节buf定义:http://donald-draper.iteye.com/blog/2393813
netty 资源泄漏探测器:http://donald-draper.iteye.com/blog/2393940
netty 抽象字节buf解析:http://donald-draper.iteye.com/blog/2394078
netty 抽象字节buf引用计数器:http://donald-draper.iteye.com/blog/2394109
netty 复合buf概念:http://donald-draper.iteye.com/blog/2394408
netty 抽象字节buf分配器:http://donald-draper.iteye.com/blog/2394419
netty Unpooled字节buf分配器:[url]http://donald-draper.iteye.com/blog/2394619[/url
引言:
上一篇文章我们看了,Unpooled字节buf分配器,先来回顾一下:
非池类堆字节buf,实际为一个字节数组,直接在Java虚拟机堆内存中,分配字节缓存;非池类Direct buf,实际为一个nio 字节buf,从操作系统实际物理内存中,分配字节缓存。Unpooled创建字节buf,实际委托给内部字节分配器UnpooledByteBufAllocator。
今天来看一下Pooled字节buf非配器,不做深入研究,仅仅窥探一下,由于本人当前能力有限,只能简单看一下,由于Pooled字节分配器可能涉及到很多内存相关的概念,可以参考一下连接:

深入浅出Netty内存管理PoolChunk:http://blog.jobbole.com/106001/
Netty4 中的内存管理:http://www.cnblogs.com/ungshow/p/3541737.html
Netty5源码学习之buffer篇(一)PooledHeapByteBuf :https://yq.aliyun.com/articles/55623
Netty系列之Netty百万级推送服务设计要点:http://www.infoq.com/cn/articles/netty-million-level-push-service-design-points/

关于内存分配策略可以参考这篇文章,
http://www.360doc.com/content/13/0915/09/8363527_314549128.shtml
现在对内存这段研究的不交少,有兴趣的可以先在国内搜索一些内存的相关策略,
在去国外搜索具体内存分配策略相关的论文或专业解析,由于本人还没有深入到操作系统分配这一块,
有时间研究一下,希望这不是接口。
下面两篇文章是具体的应用和内存分配策略比较,虽然有点粗糙,重要的是我们要吸收精华部分,
扯远了,扯了蛋了,疼...
浅谈redis采用不同内存分配器tcmalloc和jemalloc:http://www.jb51.net/article/100575.htm
jemalloc优化MySQL、Nginx内存管理:https://blog.linuxeye.cn/356.html

PooledByteBufAllocator:分配heap、direct buffer
PoolArena:一块逻辑上的内存池,用来管理和组织buffer的,内部数据结构较复杂。
PoolChunk: 管理实际的底层内存,内部已内存Page组成
默认情况下,Page的大小为4KB,有三类,small、large和huge。small类的内存请求都属于一个内存页之内 。另外,在small类里面,又分了三个子类,分别是Tiny、Quantum-Spaced和Sub-page。

看了上面的文章,简单理一下,我们使用内存,实际为机器内存的Memory Mapping Region区域,
PoolArena可以理解为mmap中内存分配区,分配区由内存块PoolChunk组成,内存块以内存Page管理内存,Page的大小为4KB,有三类,small、large和huge。small类的内存请求都属于一个内存页之内 。另外,在small类里面,又分了三个子类,分别是Tiny、Quantum-Spaced和Sub-page。

来看Pooled 字节buf分配器
public class PooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {
    private static final int DEFAULT_NUM_HEAP_ARENA;
    private static final int DEFAULT_NUM_DIRECT_ARENA;

    private static final int DEFAULT_PAGE_SIZE;
    private static final int DEFAULT_MAX_ORDER; // 8192 << 11 = 16 MiB per chunk
    private static final int DEFAULT_TINY_CACHE_SIZE;//默认 tiny buf 缓存size
    private static final int DEFAULT_SMALL_CACHE_SIZE;//默认 small buf 缓存size
    private static final int DEFAULT_NORMAL_CACHE_SIZE;//默认正常buf 缓存size
    private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
    private static final int DEFAULT_CACHE_TRIM_INTERVAL;
    private static final boolean DEFAULT_USE_CACHE_FOR_ALL_THREADS;//是否为所有线程使用buf缓存
    private static final int DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT;

    private static final int MIN_PAGE_SIZE = 4096;
    private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2);

    static {
        //获取默认内存页size
        int defaultPageSize = SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192);
        Throwable pageSizeFallbackCause = null;
        try {
            validateAndCalculatePageShifts(defaultPageSize);
        } catch (Throwable t) {
            pageSizeFallbackCause = t;
            defaultPageSize = 8192;
        }
        DEFAULT_PAGE_SIZE = defaultPageSize;

        int defaultMaxOrder = SystemPropertyUtil.getInt("io.netty.allocator.maxOrder", 11);
        Throwable maxOrderFallbackCause = null;
        try {
            validateAndCalculateChunkSize(DEFAULT_PAGE_SIZE, defaultMaxOrder);
        } catch (Throwable t) {
            maxOrderFallbackCause = t;
            defaultMaxOrder = 11;
        }
        DEFAULT_MAX_ORDER = defaultMaxOrder;

        // Determine reasonable default for nHeapArena and nDirectArena.
        // Assuming each arena has 3 chunks, the pool should not consume more than 50% of max memory.
        final Runtime runtime = Runtime.getRuntime();

        /*
         * We use 2 * available processors by default to reduce contention as we use 2 * available processors for the
         * number of EventLoops in NIO and EPOLL as well. If we choose a smaller number we will run into hot spots as
         * allocation and de-allocation needs to be synchronized on the PoolArena.
         *
         * See https://github.com/netty/netty/issues/3888.
         */
	 //最小内存分配区的数量,默认最小缓冲buf数量为处理器的2倍
        final int defaultMinNumArena = NettyRuntime.availableProcessors() * 2;
        final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;
        DEFAULT_NUM_HEAP_ARENA = Math.max(0,
                SystemPropertyUtil.getInt(
                        "io.netty.allocator.numHeapArenas",
                        (int) Math.min(
                                defaultMinNumArena,
                                runtime.maxMemory() / defaultChunkSize / 2 / 3)));
        DEFAULT_NUM_DIRECT_ARENA = Math.max(0,
                SystemPropertyUtil.getInt(
                        "io.netty.allocator.numDirectArenas",
                        (int) Math.min(
                                defaultMinNumArena,
                                PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));

        // cache sizes 默认tiny为512,small为256,normal为64
        DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
        DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
        DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);

        // 32 kb is the default maximum capacity of the cached buffer. Similar to what is explained in
        // 'Scalable memory allocation using jemalloc',默认最大缓存容量为32kb
        DEFAULT_MAX_CACHED_BUFFER_CAPACITY = SystemPropertyUtil.getInt(
                "io.netty.allocator.maxCachedBufferCapacity", 32 * 1024);

        // the number of threshold of allocations when cached entries will be freed up if not frequently used
	//分配次数阈值,可以可以理解当缓存多久不用时,释放
        DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt(
                "io.netty.allocator.cacheTrimInterval", 8192);
        //默认开启线程buf缓存
        DEFAULT_USE_CACHE_FOR_ALL_THREADS = SystemPropertyUtil.getBoolean(
                "io.netty.allocator.useCacheForAllThreads", true);

        DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT = SystemPropertyUtil.getInt(
                "io.netty.allocator.directMemoryCacheAlignment", 0);

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.allocator.numHeapArenas: {}", DEFAULT_NUM_HEAP_ARENA);
            logger.debug("-Dio.netty.allocator.numDirectArenas: {}", DEFAULT_NUM_DIRECT_ARENA);
            if (pageSizeFallbackCause == null) {
                logger.debug("-Dio.netty.allocator.pageSize: {}", DEFAULT_PAGE_SIZE);
            } else {
                logger.debug("-Dio.netty.allocator.pageSize: {}", DEFAULT_PAGE_SIZE, pageSizeFallbackCause);
            }
            if (maxOrderFallbackCause == null) {
                logger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER);
            } else {
                logger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER, maxOrderFallbackCause);
            }
            logger.debug("-Dio.netty.allocator.chunkSize: {}", DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER);
            logger.debug("-Dio.netty.allocator.tinyCacheSize: {}", DEFAULT_TINY_CACHE_SIZE);
            logger.debug("-Dio.netty.allocator.smallCacheSize: {}", DEFAULT_SMALL_CACHE_SIZE);
            logger.debug("-Dio.netty.allocator.normalCacheSize: {}", DEFAULT_NORMAL_CACHE_SIZE);
            logger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY);
            logger.debug("-Dio.netty.allocator.cacheTrimInterval: {}", DEFAULT_CACHE_TRIM_INTERVAL);
            logger.debug("-Dio.netty.allocator.useCacheForAllThreads: {}", DEFAULT_USE_CACHE_FOR_ALL_THREADS);
        }
    }
    public static final PooledByteBufAllocator DEFAULT =
            new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());

    private final PoolArena<byte[]>[] heapArenas;//堆buf分配区
    private final PoolArena<ByteBuffer>[] directArenas;//direct buf缓存分配区
    private final int tinyCacheSize;//tiny buf 缓存size
    private final int smallCacheSize;//small buf 缓存size   
    private final int normalCacheSize;//正常buf 缓存size    
    private final List<PoolArenaMetric> heapArenaMetrics;//堆buf分配区度量器
    private final List<PoolArenaMetric> directArenaMetrics;//direct buf分配区度量器
    private final PoolThreadLocalCache threadCache;//线程本地字节buf缓存
    private final int chunkSize;//分配区内存块size
    private final PooledByteBufAllocatorMetric metric;//buf 分配器,度量器
    public PooledByteBufAllocator() {
        this(false);
    }

    @SuppressWarnings("deprecation")
    public PooledByteBufAllocator(boolean preferDirect) {
        this(preferDirect, DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER);
    }
    ...
    public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
                                  int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                                  boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
        super(preferDirect);
	//线程buf 缓存为PoolThreadLocalCache
        threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
        this.tinyCacheSize = tinyCacheSize;
        this.smallCacheSize = smallCacheSize;
        this.normalCacheSize = normalCacheSize;
        chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);

        if (nHeapArena < 0) {
            throw new IllegalArgumentException("nHeapArena: " + nHeapArena + " (expected: >= 0)");
        }
        if (nDirectArena < 0) {
            throw new IllegalArgumentException("nDirectArea: " + nDirectArena + " (expected: >= 0)");
        }

        if (directMemoryCacheAlignment < 0) {
            throw new IllegalArgumentException("directMemoryCacheAlignment: "
                    + directMemoryCacheAlignment + " (expected: >= 0)");
        }
        if (directMemoryCacheAlignment > 0 && !isDirectMemoryCacheAlignmentSupported()) {
            throw new IllegalArgumentException("directMemoryCacheAlignment is not supported");
        }

        if ((directMemoryCacheAlignment & -directMemoryCacheAlignment) != directMemoryCacheAlignment) {
            throw new IllegalArgumentException("directMemoryCacheAlignment: "
                    + directMemoryCacheAlignment + " (expected: power of two)");
        }

        int pageShifts = validateAndCalculatePageShifts(pageSize);
         
        if (nHeapArena > 0) {
	   //创建堆buf 分配区
            heapArenas = newArenaArray(nHeapArena);
            List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
            for (int i = 0; i < heapArenas.length; i ++) {
	        //包装pool缓存为PoolArena.HeapArena
                PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
                        pageSize, maxOrder, pageShifts, chunkSize,
                        directMemoryCacheAlignment);
                heapArenas[i] = arena;
		//添加pool 堆buf 分配区到堆分配区度量集
                metrics.add(arena);
            }
            heapArenaMetrics = Collections.unmodifiableList(metrics);
        } else {
            heapArenas = null;
            heapArenaMetrics = Collections.emptyList();
        }

        if (nDirectArena > 0) {
	    //创建direct buf 分配区
            directArenas = newArenaArray(nDirectArena);
            List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
            for (int i = 0; i < directArenas.length; i ++) {
	        //包装pool缓存为PoolArena.DirectArena
                PoolArena.DirectArena arena = new PoolArena.DirectArena(
                        this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment);
                directArenas[i] = arena;
		//添加pool direct buf 分配区到堆分配区度量集
                metrics.add(arena);
            }
            directArenaMetrics = Collections.unmodifiableList(metrics);
        } else {
            directArenas = null;
            directArenaMetrics = Collections.emptyList();
        }
        metric = new PooledByteBufAllocatorMetric(this);
    }
}

//创建内存分配区
 @SuppressWarnings("unchecked")
 private static <T> PoolArena<T>[] newArenaArray(int size) {
     return new PoolArena[size];
}

从上面可以看出Pooled字节buf分配器,内部有一个堆buf和direct buf分配Region区,每个Region的内存块size为chunkSize,每个内存块内存页大小,默认为8k。
来看创建堆buf:

@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
    PoolThreadCache cache = threadCache.get();
    PoolArena<byte[]> heapArena = cache.heapArena;

    final ByteBuf buf;
    if (heapArena != null) {
       //从堆bu分配区,创建一个堆buf 
        buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        buf = PlatformDependent.hasUnsafe() ?
                new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
                new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
    }
    //追踪字节buf资源泄漏情况
    return toLeakAwareBuffer(buf);
}


来看从堆分配区获取堆buf,PoolArena同时为Pool buf分配区量器,获取buf,实际是从PoolThreadCache中获取
abstract class PoolArena<T> implements PoolArenaMetric {
    static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();
    ...
     PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
        PooledByteBuf<T> buf = newByteBuf(maxCapacity);//创建Pooled buf
        allocate(cache, buf, reqCapacity);//从缓冲获取堆buf
        return buf;
    }
    //创建Pooled buf,待子类扩展
    protected abstract PooledByteBuf<T> newByteBuf(int maxCapacity);
    //从缓冲获取堆buf
    private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
    final int normCapacity = normalizeCapacity(reqCapacity);
        //容量小于页size,即tiny 或small buf
        if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
            int tableIdx;
            PoolSubpage<T>[] table;//sub page pool

            boolean tiny = isTiny(normCapacity);
            if (tiny) { // < 512
                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = tinyIdx(normCapacity);
                table = tinySubpagePools;
            } else {
                if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = smallIdx(normCapacity);
                table = smallSubpagePools;
            }
            ...
        }
	//正常bufsize
        if (normCapacity <= chunkSize) {
            if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);
                ++allocationsNormal;
            }
        } 
	...
    }
    //堆buf分配区
     static final class HeapArena extends PoolArena<byte[]> {

        HeapArena(PooledByteBufAllocator parent, int pageSize, int maxOrder,
                int pageShifts, int chunkSize, int directMemoryCacheAlignment) {
            super(parent, pageSize, maxOrder, pageShifts, chunkSize,
                    directMemoryCacheAlignment);
        }
	//创建Pooled堆buf
	 @Override
        protected PooledByteBuf<byte[]> newByteBuf(int maxCapacity) {
            return HAS_UNSAFE ? PooledUnsafeHeapByteBuf.newUnsafeInstance(maxCapacity)
                    : PooledHeapByteBuf.newInstance(maxCapacity);
        }
	...
    }
    //direct buf 缓存
     static final class DirectArena extends PoolArena<ByteBuffer> {

        DirectArena(PooledByteBufAllocator parent, int pageSize, int maxOrder,
                int pageShifts, int chunkSize, int directMemoryCacheAlignment) {
            super(parent, pageSize, maxOrder, pageShifts, chunkSize,
                    directMemoryCacheAlignment);
        }
	//创建Pooled Direct buf
	 @Override
        protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
            if (HAS_UNSAFE) {
                return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
            } else {
                return PooledDirectByteBuf.newInstance(maxCapacity);
            }
        
	}
      ...
      }
}

//PoolArenaMetric
public interface PoolArenaMetric {

    /**
     * Returns the number of thread caches backed by this arena.
     返回缓存此arena的线程数
     */
    int numThreadCaches();
    /**
     * Returns the number of tiny sub-pages for the arena.
     tiny sub page数量
     */
    int numTinySubpages();
    /**
     * Returns the number of small sub-pages for the arena.
     small sub page数量
     */
    int numSmallSubpages();
    /**
     * Returns the number of chunk lists for the arena.
     分配区,内存块数量
     */
    int numChunkLists();
    ...
 }

从上面来看,PoolArena根据容量来决定创建tiny,small还是Normal buf:
我们以Normal为例:
从线程本地缓存获取buf
//PoolThreadCache
final class PoolThreadCache {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);

    final PoolArena<byte[]> heapArena;//堆分配区
    final PoolArena<ByteBuffer> directArena;//direct buf分配区
    // Hold the caches for the different size classes, which are tiny, small and normal.
    private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;//tiny subpage 堆缓存
    private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;//tiny subpage direct缓存
    private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;//small subpage 堆缓存
    private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;//small subpage direct缓存
    private final MemoryRegionCache<byte[]>[] normalHeapCaches;//normal subpage 堆缓存
    private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;//normal subpage direct缓存

    // Used for bitshifting when calculate the index of normal caches later
    private final int numShiftsNormalDirect;
    private final int numShiftsNormalHeap;
    private final int freeSweepAllocationThreshold;

    private final Thread deathWatchThread;
    private final Runnable freeTask;

    private int allocations;
    ...
    /**
     * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
     */
    boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
        //从内存域缓存,创建buf
        return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity);
    }
    //获取对应的缓存region
    private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {
        if (area.isDirect()) {
            int idx = log2(normCapacity >> numShiftsNormalDirect);
            return cache(normalDirectCaches, idx);
        }
        int idx = log2(normCapacity >> numShiftsNormalHeap);
        return cache(normalHeapCaches, idx);
    }

    private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
        if (cache == null || idx > cache.length - 1) {
            return null;
        }
        return cache[idx];
    }
    //从内存域缓存,创建buf
     @SuppressWarnings({ "unchecked", "rawtypes" })
    private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
        if (cache == null) {
            // no cache found so just return false here
            return false;
        }
        boolean allocated = cache.allocate(buf, reqCapacity);
        if (++ allocations >= freeSweepAllocationThreshold) {
            allocations = 0;
            trim();
        }
        return allocated;
    }
    
     //内存region缓存
     private abstract static class MemoryRegionCache<T> {
        private final int size;
        private final Queue<Entry<T>> queue;
        private final SizeClass sizeClass;
        private int allocations;
	/**
         * Allocate something out of the cache if possible and remove the entry from the cache.
         */
       public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
            //从缓冲buf队列poll一个buf
             Entry<T> entry = queue.poll();
             if (entry == null) {
                 return false;
             }
             initBuf(entry.chunk, entry.handle, buf, reqCapacity);
             entry.recycle();
         
             // allocations is not thread-safe which is fine as this is only called from the same thread all time.
             ++ allocations;
             return true;
         }
         /**
          * Init the {@link PooledByteBuf} using the provided chunk and handle with the capacity restrictions.
	  在
          */
         protected abstract void initBuf(PoolChunk<T> chunk, long handle,
                                             PooledByteBuf<T> buf, int reqCapacity);
      	...
          }
	}
	/**
         * Cache used for buffers which are backed by NORMAL size.
	 正常size buf的内存Region 缓存
         */
        private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> {
            NormalMemoryRegionCache(int size) {
                super(size, SizeClass.Normal);
            }
             //初始化Pooled字节buf
            @Override
            protected void initBuf(
                    PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
                chunk.initBuf(buf, handle, reqCapacity);
            }
        }
...
}



//PoolChunk,内存块
final class PoolChunk<T> implements PoolChunkMetric {

    private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1;

    final PoolArena<T> arena;//关联缓存池
    final T memory;
    final boolean unpooled;
    final int offset;

    private final byte[] memoryMap;
    private final byte[] depthMap;
    private final PoolSubpage<T>[] subpages;//内存页
    /** Used to determine if the requested capacity is equal to or greater than pageSize. */
    private final int subpageOverflowMask;
    private final int pageSize;//内存页size
    private final int pageShifts;
    private final int maxOrder;
    private final int chunkSize;
    private final int log2ChunkSize;
    private final int maxSubpageAllocs;
    /** Used to mark memory as unusable */
    private final byte unusable;

    private int freeBytes;

    PoolChunkList<T> parent;
    PoolChunk<T> prev;
    PoolChunk<T> next;
    //初始化Pooled字节buf
    void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {
    int memoryMapIdx = memoryMapIdx(handle);
    int bitmapIdx = bitmapIdx(handle);
    if (bitmapIdx == 0) {
        byte val = value(memoryMapIdx);
        assert val == unusable : String.valueOf(val);
        buf.init(this, handle, runOffset(memoryMapIdx) + offset, reqCapacity, runLength(memoryMapIdx),
                 arena.parent.threadCache());
    } else {
        initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
    }
    ...
}


//PooledByteBuf
abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {

    private final Recycler.Handle<PooledByteBuf<T>> recyclerHandle;

    protected PoolChunk<T> chunk;//内存块
    protected long handle;
    protected T memory;
    protected int offset;
    protected int length;
    int maxLength;
    PoolThreadCache cache;
    private ByteBuffer tmpNioBuf;
    private ByteBufAllocator allocator;
    ...
    void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
        init0(chunk, handle, offset, length, maxLength, cache);
    }

    void initUnpooled(PoolChunk<T> chunk, int length) {
        init0(chunk, 0, chunk.offset, length, length, null);
    }

    private void init0(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
        assert handle >= 0;
        assert chunk != null;

        this.chunk = chunk;
        memory = chunk.memory;
        allocator = chunk.arena.parent;
        this.cache = cache;
        this.handle = handle;
        this.offset = offset;
        this.length = length;
        this.maxLength = maxLength;
        tmpNioBuf = null;
    }
}

再来看分配其他两种分配tiny和small:
/**
 * Try to allocate a tiny buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
 */
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
    return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}

/**
 * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
 */
boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
    return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity);
}
//获取tiny内存域缓存
private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
    int idx = PoolArena.tinyIdx(normCapacity);
    if (area.isDirect()) {
        return cache(tinySubPageDirectCaches, idx);
    }
    return cache(tinySubPageHeapCaches, idx);
}
//获取Small内存域缓存
private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int normCapacity) {
    int idx = PoolArena.smallIdx(normCapacity);
    if (area.isDirect()) {
        return cache(smallSubPageDirectCaches, idx);
    }
    return cache(smallSubPageHeapCaches, idx);
}
private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
    if (cache == null || idx > cache.length - 1) {
        return null;
    }
    return cache[idx];
}

/**
 * Cache used for buffers which are backed by TINY or SMALL size.
 */
private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
    SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
        super(size, sizeClass);
    }

    @Override
    protected void initBuf(
            PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
        chunk.initBufWithSubpage(buf, handle, reqCapacity);
    }
}



再来看创建direct类型buf:
 @Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    PoolThreadCache cache = threadCache.get();
    PoolArena<ByteBuffer> directArena = cache.directArena;

    final ByteBuf buf;
    if (directArena != null) {
        //从direct分配区,分配一个direct buf
        buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        buf = PlatformDependent.hasUnsafe() ?
                UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }
    //追踪字节buf资源泄漏情况
    return toLeakAwareBuffer(buf);
}

这个思路与创建堆buf思路一致。

我们来简单看一Pooled 堆和direct buf

先来看堆分配区和direct分配区,分配buf
//堆buf分配区
 static final class HeapArena extends PoolArena<byte[]> {

    HeapArena(PooledByteBufAllocator parent, int pageSize, int maxOrder,
            int pageShifts, int chunkSize, int directMemoryCacheAlignment) {
        super(parent, pageSize, maxOrder, pageShifts, chunkSize,
                directMemoryCacheAlignment);
    }
	//创建Pooled堆buf
	 @Override
    protected PooledByteBuf<byte[]> newByteBuf(int maxCapacity) {
        return HAS_UNSAFE ? PooledUnsafeHeapByteBuf.newUnsafeInstance(maxCapacity)
                : PooledHeapByteBuf.newInstance(maxCapacity);
    }
	...
}

//direct buf 分配区
 static final class DirectArena extends PoolArena<ByteBuffer> {

    DirectArena(PooledByteBufAllocator parent, int pageSize, int maxOrder,
            int pageShifts, int chunkSize, int directMemoryCacheAlignment) {
        super(parent, pageSize, maxOrder, pageShifts, chunkSize,
                directMemoryCacheAlignment);
    }
	//创建Pooled Direct buf
	 @Override
    protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
        if (HAS_UNSAFE) {
            return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
        } else {
            return PooledDirectByteBuf.newInstance(maxCapacity);
        }
    
     }
  ...
}

从上面可以看出Pool 堆buf为,PooledUnsafeHeapByteBuf、PooledHeapByteBuf
direct buf为PooledUnsafeDirectByteBuf、PooledDirectByteBuf。
我们分别来简单看一下这四种buf:

//PooledUnsafeHeapByteBuf
final class PooledUnsafeHeapByteBuf extends PooledHeapByteBuf {

    private static final Recycler<PooledUnsafeHeapByteBuf> RECYCLER = new Recycler<PooledUnsafeHeapByteBuf>() {
        @Override//创建buf
        protected PooledUnsafeHeapByteBuf newObject(Handle<PooledUnsafeHeapByteBuf> handle) {
            return new PooledUnsafeHeapByteBuf(handle, 0);
        }
    };

    static PooledUnsafeHeapByteBuf newUnsafeInstance(int maxCapacity) {
        PooledUnsafeHeapByteBuf buf = RECYCLER.get();//从线程本地对象栈,获取buf对象
        buf.reuse(maxCapacity);//重置Pooled字节buf
        return buf;
    }
    ....
}


//PooledHeapByteBuf
class PooledHeapByteBuf extends PooledByteBuf<byte[]> {

    private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() {
        @Override//创建buf
        protected PooledHeapByteBuf newObject(Handle<PooledHeapByteBuf> handle) {
            return new PooledHeapByteBuf(handle, 0);
        }
    };

    static PooledHeapByteBuf newInstance(int maxCapacity) {
        PooledHeapByteBuf buf = RECYCLER.get();//从线程本地对象栈,获取buf对象
        buf.reuse(maxCapacity);//重置Pooled字节buf
        return buf;
    }
    ...
}


//PooledUnsafeDirectByteBuf
final class PooledUnsafeDirectByteBuf extends PooledByteBuf<ByteBuffer> {
    private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() {
        @Override//创建buf
        protected PooledUnsafeDirectByteBuf newObject(Handle<PooledUnsafeDirectByteBuf> handle) {
            return new PooledUnsafeDirectByteBuf(handle, 0);
        }
    };

    static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
        PooledUnsafeDirectByteBuf buf = RECYCLER.get();//从线程本地对象栈,获取buf对象
        buf.reuse(maxCapacity);//重置Pooled字节buf
        return buf;
    }
    ...
}


//PooledDirectByteBuf
final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> {

    private static final Recycler<PooledDirectByteBuf> RECYCLER = new Recycler<PooledDirectByteBuf>() {
        @Override//创建buf
        protected PooledDirectByteBuf newObject(Handle<PooledDirectByteBuf> handle) {
            return new PooledDirectByteBuf(handle, 0);
        }
    };

    static PooledDirectByteBuf newInstance(int maxCapacity) {
        PooledDirectByteBuf buf = RECYCLER.get();//从线程本地对象栈,获取buf对象
        buf.reuse(maxCapacity);//重置Pooled字节buf
        return buf;
    }
    ...
}



//PooledByteBuf
重用Pooled字节buf之前,必须调用#reuse方法
/**
 * Method must be called before reuse this {@link PooledByteBufAllocator}
 */
final void reuse(int maxCapacity) {
    maxCapacity(maxCapacity);
    setRefCnt(1);//重置引用计数器
    setIndex0(0, 0);//重置读写索引
    discardMarks();//丢弃读写索引标记
}


在简单看一下字节buf内存的回收器Recycler

/**
 * Light-weight object pool based on a thread-local stack.
 *
 * @param <T> the type of the pooled object
 */
public abstract class Recycler<T> {
   ...
   //线程本地对象栈
   private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
        @Override
        protected Stack<T> initialValue() {
            return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
                    ratioMask, maxDelayedQueuesPerThread);
        }
    };
    @SuppressWarnings("unchecked")
    public final T get() {
        if (maxCapacityPerThread == 0) {
            return newObject((Handle<T>) NOOP_HANDLE);
        }
	//获取线程本地对象栈
        Stack<T> stack = threadLocal.get();
        DefaultHandle<T> handle = stack.pop();//从对象栈中获取对象handle
        if (handle == null) {
            handle = stack.newHandle();
            handle.value = newObject(handle);
        }
        return (T) handle.value;
    }
    static final class Stack<T> {
        final Recycler<T> parent;
        final Thread thread;
        final AtomicInteger availableSharedCapacity;
        final int maxDelayedQueues;

        private final int maxCapacity;
        private final int ratioMask;
        private DefaultHandle<?>[] elements;
        private int size;
        private int handleRecycleCount = -1; // Start with -1 so the first one will be recycled.
        private WeakOrderQueue cursor, prev;
        private volatile WeakOrderQueue head;
	//创建对象Hanlde
	 DefaultHandle<T> newHandle() {
            return new DefaultHandle<T>(this);
        }
    }
    //创建对象
     protected abstract T newObject(Handle<T> handle);
    ...
}


从上面可以看出,Pool字节buf内部有一个回收器Recycler,管理字节buf,而回收器内部是将
对象放在一个线程本地栈中管理。



再来简单看一下线程本地buf缓存池:
//线程buf 缓存为PoolThreadLocalCache
threadCache = new PoolThreadLocalCache(useCacheForAllThreads);


//线程本地buf缓存
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
    private final boolean useCacheForAllThreads;

    PoolThreadLocalCache(boolean useCacheForAllThreads) {
        this.useCacheForAllThreads = useCacheForAllThreads;
    }
    //初始化线程本地buf缓存
    @Override
    protected synchronized PoolThreadCache initialValue() {
        final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
        final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
        
        if (useCacheForAllThreads || Thread.currentThread() instanceof FastThreadLocalThread) {
            return new PoolThreadCache(
                    heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
                    DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
        }
        // No caching for non FastThreadLocalThreads.
        return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
    }
    //获取最少被线程使用的buf 缓存
     private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
        if (arenas == null || arenas.length == 0) {
            return null;
        }
        PoolArena<T> minArena = arenas[0];
        for (int i = 1; i < arenas.length; i++) {
            PoolArena<T> arena = arenas[i];
            if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
                minArena = arena;
            }
        }

        return minArena;
    }
    @Override
    protected void onRemoval(PoolThreadCache threadCache) {
        threadCache.free();
    }

   
}


再来简单看一下buf缓存池度量器:
metric = new PooledByteBufAllocatorMetric(this);

/**
 * Exposed metric for {@link PooledByteBufAllocator}.
 */
@SuppressWarnings("deprecation")
public final class PooledByteBufAllocatorMetric implements ByteBufAllocatorMetric {
    private final PooledByteBufAllocator allocator;

    PooledByteBufAllocatorMetric(PooledByteBufAllocator allocator) {
        this.allocator = allocator;
    }
    /**
     * Return the number of heap arenas.
     返回堆缓存计数器
     */
    public int numHeapArenas() {
        return allocator.numHeapArenas();
    }
    /**
     * Return the number of direct arenas.
     返回direct缓存计数器
     */
    public int numDirectArenas() {
        return allocator.numDirectArenas();
    }
    /**
     * Return a {@link List} of all heap {@link PoolArenaMetric}s that are provided by this pool.
     堆buf缓存度量器
     */
    public List<PoolArenaMetric> heapArenas() {
        return allocator.heapArenas();
    }
    /**
     * Return a {@link List} of all direct {@link PoolArenaMetric}s that are provided by this pool.
     direct buf缓存度量器
     */
    public List<PoolArenaMetric> directArenas() {
        return allocator.directArenas();
    }
    /**
     * Return the number of thread local caches used by this {@link PooledByteBufAllocator}.
     使用此Pooled字节分配器的线程本地缓存数
     */
    public int numThreadLocalCaches() {
        return allocator.numThreadLocalCaches();
    }
    /**
     * Return the size of the tiny cache.
     tiny缓存大小
     */
    public int tinyCacheSize() {
        return allocator.tinyCacheSize();
    }
    /**
     * Return the size of the small cache.
     small缓存大小
     */
    public int smallCacheSize() {
        return allocator.smallCacheSize();
    }
    /**
     * Return the size of the normal cache.
     normal缓存大小
     */
    public int normalCacheSize() {
        return allocator.normalCacheSize();
    }
    /**
     * Return the chunk size for an arena.
     */
    public int chunkSize() {
        return allocator.chunkSize();
    }
    //堆内存使用量
    @Override
    public long usedHeapMemory() {
        return allocator.usedHeapMemory();
    }
    //direct内存使用量
    @Override
    public long usedDirectMemory() {
        return allocator.usedDirectMemory();
    }
    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder(256);
        sb.append(StringUtil.simpleClassName(this))
                .append("(usedHeapMemory: ").append(usedHeapMemory())
                .append("; usedDirectMemory: ").append(usedDirectMemory())
                .append("; numHeapArenas: ").append(numHeapArenas())
                .append("; numDirectArenas: ").append(numDirectArenas())
                .append("; tinyCacheSize: ").append(tinyCacheSize())
                .append("; smallCacheSize: ").append(smallCacheSize())
                .append("; normalCacheSize: ").append(normalCacheSize())
                .append("; numThreadLocalCaches: ").append(numThreadLocalCaches())
                .append("; chunkSize: ").append(chunkSize()).append(')');
        return sb.toString();
    }
}


再来看Pooledd字节分配器的其他方法:
//PooledByteBufAllocator
/**
 * Return the number of heap arenas.
 *
 * @deprecated use {@link PooledByteBufAllocatorMetric#numHeapArenas()}.
 */
@Deprecated
public int numHeapArenas() {
    return heapArenaMetrics.size();
}

/**
 * Return the number of direct arenas.
 *
 * @deprecated use {@link PooledByteBufAllocatorMetric#numDirectArenas()}.
 */
@Deprecated
public int numDirectArenas() {
    return directArenaMetrics.size();
}

/**
 * Return a {@link List} of all heap {@link PoolArenaMetric}s that are provided by this pool.
 *
 * @deprecated use {@link PooledByteBufAllocatorMetric#heapArenas()}.
 */
@Deprecated
public List<PoolArenaMetric> heapArenas() {
    return heapArenaMetrics;
}

/**
 * Return a {@link List} of all direct {@link PoolArenaMetric}s that are provided by this pool.
 *
 * @deprecated use {@link PooledByteBufAllocatorMetric#directArenas()}.
 */
@Deprecated
public List<PoolArenaMetric> directArenas() {
    return directArenaMetrics;
}

/**
 * Return the number of thread local caches used by this {@link PooledByteBufAllocator}.
 *
 * @deprecated use {@link PooledByteBufAllocatorMetric#numThreadLocalCaches()}.
 */
@Deprecated
public int numThreadLocalCaches() {
    PoolArena<?>[] arenas = heapArenas != null ? heapArenas : directArenas;
    if (arenas == null) {
        return 0;
    }

    int total = 0;
    for (PoolArena<?> arena : arenas) {
        total += arena.numThreadCaches.get();
    }

    return total;
}

/**
 * Return the size of the tiny cache.
 *
 * @deprecated use {@link PooledByteBufAllocatorMetric#tinyCacheSize()}.
 */
@Deprecated
public int tinyCacheSize() {
    return tinyCacheSize;
}

/**
 * Return the size of the small cache.
 *
 * @deprecated use {@link PooledByteBufAllocatorMetric#smallCacheSize()}.
 */
@Deprecated
public int smallCacheSize() {
    return smallCacheSize;
}

/**
 * Return the size of the normal cache.
 *
 * @deprecated use {@link PooledByteBufAllocatorMetric#normalCacheSize()}.
 */
@Deprecated
public int normalCacheSize() {
    return normalCacheSize;
}

/**
 * Return the chunk size for an arena.
 *
 * @deprecated use {@link PooledByteBufAllocatorMetric#chunkSize()}.
 */
@Deprecated
public final int chunkSize() {
    return chunkSize;
}

final long usedHeapMemory() {
    return usedMemory(heapArenas);
}

final long usedDirectMemory() {
    return usedMemory(directArenas);
}

private static long usedMemory(PoolArena<?>... arenas) {
    if (arenas == null) {
        return -1;
    }
    long used = 0;
    for (PoolArena<?> arena : arenas) {
        used += arena.numActiveBytes();
        if (used < 0) {
            return Long.MAX_VALUE;
        }
    }
    return used;
}

final PoolThreadCache threadCache() {
    return threadCache.get();
}

/**
 * Returns the status of the allocator (which contains all metrics) as string. Be aware this may be expensive
 * and so should not called too frequently.
 */
public String dumpStats() {
    int heapArenasLen = heapArenas == null ? 0 : heapArenas.length;
    StringBuilder buf = new StringBuilder(512)
            .append(heapArenasLen)
            .append(" heap arena(s):")
            .append(StringUtil.NEWLINE);
    if (heapArenasLen > 0) {
        for (PoolArena<byte[]> a: heapArenas) {
            buf.append(a);
        }
    }

    int directArenasLen = directArenas == null ? 0 : directArenas.length;

    buf.append(directArenasLen)
       .append(" direct arena(s):")
       .append(StringUtil.NEWLINE);
    if (directArenasLen > 0) {
        for (PoolArena<ByteBuffer> a: directArenas) {
            buf.append(a);
        }
    }
    return buf.toString();
}

总结:
Pooled字节buf分配器,内部有一个堆buf和direct buf分配Region区(PoolArena),每个Region的内存块(PoolChunk)size为chunkSize,每个内存块内存页(PoolSubpage)大小,默认为8k。Pooled 堆buf是基于字节数组,而direct buf是基于nio 字节buf。Pooled字节分配器分配heap和direct buf时,首先获取线程本地buf缓存PoolThreadCache,从buf获取对应的heap或direct分配区,分配区创建buf(PooledByteBuf),然后将buf放到内存块中管理,根据buf的容量,将放到相应tiny,small,normal Memory Region Cache(MemoryRegionCache)中。每个Pooled buf通过内存的Recycler,重用buf。Pool字节buf内部有一个回收器Recycler,管理字节buf,而回收器内部是将对象放在一个线程本地栈中管理。


0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics