`
Donald_Draper
  • 浏览: 950905 次
社区版块
存档分类
最新评论

netty 抽象字节buf分配器

阅读更多
netty 字节buf定义:http://donald-draper.iteye.com/blog/2393813
netty 资源泄漏探测器:http://donald-draper.iteye.com/blog/2393940
netty 抽象字节buf解析:http://donald-draper.iteye.com/blog/2394078
netty 抽象字节buf引用计数器:http://donald-draper.iteye.com/blog/2394109
netty 复合buf概念:http://donald-draper.iteye.com/blog/2394408
引言
上一篇文章,我们看了复合buf概念,先来回顾一下:

复合字节缓冲CompositeByteBuf,内部有一个字节buf数组,用于存放字节buf,每个字节buf添加到复合buf集时,将被包装成一个buf组件,如果添加buf是,复合buf集已满,则将buf集中的所有buf,整合到一个组件buf中,并将原始buf集清空,添加整合后的buf到buf集。复合buf的读写索引为字节buf集的起始索引和size;每个组件buf Component内部记录着字节buf在复合buf中的起始位置和结束位置,及buf可读数据长度。

在netty 默认通道配置初始化(http://donald-draper.iteye.com/blog/2393504
这篇文章,我们看到字节buf是由字节分配器来分配的,默认字节分配器,在ByteBufUtil中定义,我们先回到
通道配置字节buf分配器的定义:
//字节buf分配器
private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;


public final class ByteBufUtil {  
     static final ByteBufAllocator DEFAULT_ALLOCATOR;//默认字节分配器
     static {  
            String allocType = SystemPropertyUtil.get(  
                    "io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");  
            allocType = allocType.toLowerCase(Locale.US).trim();  
            ByteBufAllocator alloc;  
            if ("unpooled".equals(allocType)) {  
                alloc = UnpooledByteBufAllocator.DEFAULT;  
                logger.debug("-Dio.netty.allocator.type: {}", allocType);  
            } else if ("pooled".equals(allocType)) {  
                alloc = PooledByteBufAllocator.DEFAULT;  
                logger.debug("-Dio.netty.allocator.type: {}", allocType);  
            } else {  
                alloc = PooledByteBufAllocator.DEFAULT;  
                logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType);  
            }  
            DEFAULT_ALLOCATOR = alloc;  
      
            THREAD_LOCAL_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 64 * 1024);  
            logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", THREAD_LOCAL_BUFFER_SIZE);  
      
            MAX_CHAR_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.maxThreadLocalCharBufferSize", 16 * 1024);  
            logger.debug("-Dio.netty.maxThreadLocalCharBufferSize: {}", MAX_CHAR_BUFFER_SIZE);  
        }  
    }  


从字节buf工具类,可以看出,如果系统属性io.netty.allocator.type,配置的为unpooled,
则默认的字节buf分配器为UnpooledByteBufAllocator,否则为PooledByteBufAllocator,
对于Android平台,默认为UnpooledByteBufAllocator。

//UnpooledByteBufAllocator
 public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {  
      
        private final UnpooledByteBufAllocatorMetric metric = new UnpooledByteBufAllocatorMetric();  
        private final boolean disableLeakDetector;  
      
        /** 
         * Default instance which uses leak-detection for direct buffers. 
         */  
        public static final UnpooledByteBufAllocator DEFAULT =  
                new UnpooledByteBufAllocator(PlatformDependent.directBufferPreferred());  
            ...  
    }  


//PooledByteBufAllocator
public class PooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {  
     public static final PooledByteBufAllocator DEFAULT =  
                new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());  
           ...  
}  


由于上面两个方法继承了AbstractByteBufAllocator,我们先来看一下抽象字节buf分配器,

/**
 * Skeletal {@link ByteBufAllocator} implementation to extend.
 */
public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
    static final int DEFAULT_INITIAL_CAPACITY = 256;//初始化容量
    static final int DEFAULT_MAX_CAPACITY = Integer.MAX_VALUE;//最大容量
    static final int DEFAULT_MAX_COMPONENTS = 16;//复合buf最大组件buf数量
    static final int CALCULATE_THRESHOLD = 1048576 * 4; // 4 MiB page 容量调整阈值
    private final boolean directByDefault;//分配buf是否为direct类型
    private final ByteBuf emptyBuf;

    /**
     * Instance use heap buffers by default
     */
    protected AbstractByteBufAllocator() {
        this(false);
    }

    /**
     * Create new instance
     *
     * @param preferDirect {@code true} if {@link #buffer(int)} should try to allocate a direct buffer rather than
     *                     a heap buffer
     */
    protected AbstractByteBufAllocator(boolean preferDirect) {
        directByDefault = preferDirect && PlatformDependent.hasUnsafe();
        emptyBuf = new EmptyByteBuf(this);
    }
}

来看创建字节buf:
@Override
public ByteBuf buffer() {
    if (directByDefault) {
        return directBuffer();
    }
    return heapBuffer();
}

@Override
public ByteBuf buffer(int initialCapacity) {
    if (directByDefault) {
        return directBuffer(initialCapacity);
    }
    return heapBuffer(initialCapacity);
}

@Override
public ByteBuf buffer(int initialCapacity, int maxCapacity) {
    if (directByDefault) {
        return directBuffer(initialCapacity, maxCapacity);
    }
    return heapBuffer(initialCapacity, maxCapacity);
}

创建字节buf主要根据字节buf分配器的directByDefault属性,来决定分配buf是否为direct类型还是heap类型

先来看分配direct类型buf
@Override
public ByteBuf directBuffer() {
    return directBuffer(DEFAULT_INITIAL_CAPACITY, DEFAULT_MAX_CAPACITY);
}

@Override
public ByteBuf directBuffer(int initialCapacity) {
    return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
}

@Override
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
    if (initialCapacity == 0 && maxCapacity == 0) {
        return emptyBuf;
    }
    validate(initialCapacity, maxCapacity);
    return newDirectBuffer(initialCapacity, maxCapacity);
}
/**
 * Create a direct {@link ByteBuf} with the given initialCapacity and maxCapacity.
 待子类扩展
 */
protected abstract ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity);


再来看分配heap类型buf:
@Override
public ByteBuf heapBuffer() {
    return heapBuffer(DEFAULT_INITIAL_CAPACITY, DEFAULT_MAX_CAPACITY);
}

@Override
public ByteBuf heapBuffer(int initialCapacity) {
    return heapBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
}

@Override
public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
    if (initialCapacity == 0 && maxCapacity == 0) {
        return emptyBuf;
    }
    validate(initialCapacity, maxCapacity);
    return newHeapBuffer(initialCapacity, maxCapacity);
}
/**
 * Create a heap {@link ByteBuf} with the given initialCapacity and maxCapacity.
 待子类扩展
 */
protected abstract ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity);


从上面可以看出创建direct和heap buf实际通过newDirectBuffer和newHeapBuffer方法,待子类扩展。

来看创建ioBufer
@Override
public ByteBuf ioBuffer() {
    if (PlatformDependent.hasUnsafe()) {
        return directBuffer(DEFAULT_INITIAL_CAPACITY);
    }
    return heapBuffer(DEFAULT_INITIAL_CAPACITY);
}

@Override
public ByteBuf ioBuffer(int initialCapacity) {
    if (PlatformDependent.hasUnsafe()) {
        return directBuffer(initialCapacity);
    }
    return heapBuffer(initialCapacity);
}

@Override
public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) {
    if (PlatformDependent.hasUnsafe()) {
        return directBuffer(initialCapacity, maxCapacity);
    }
    return heapBuffer(initialCapacity, maxCapacity);
}


从上面可以看出ioBuffer方法创建的字节buf,优先为direct类型,当系统平台不支持Unsafe时,才为heap类型



再来看创建复合buf:

@Override
public CompositeByteBuf compositeBuffer() {
    if (directByDefault) {
        return compositeDirectBuffer();
    }
    return compositeHeapBuffer();
}

@Override
public CompositeByteBuf compositeBuffer(int maxNumComponents) {
    if (directByDefault) {
        return compositeDirectBuffer(maxNumComponents);
    }
    return compositeHeapBuffer(maxNumComponents);
}

从上面可以看出创建复合buf主要根据字节buf分配器的directByDefault属性,来决定分配buf是否为direct类型还是heap类型;

来看创建heap类型复合buf
@Override
public CompositeByteBuf compositeHeapBuffer() {
    return compositeHeapBuffer(DEFAULT_MAX_COMPONENTS);
}

@Override
public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
    return toLeakAwareBuffer(new CompositeByteBuf(this, false, maxNumComponents));
}

//追踪复合buf内存泄漏
protected static CompositeByteBuf toLeakAwareBuffer(CompositeByteBuf buf) {
    ResourceLeakTracker<ByteBuf> leak;
    switch (ResourceLeakDetector.getLevel()) {
        case SIMPLE:
            leak = AbstractByteBuf.leakDetector.track(buf);
            if (leak != null) {
                buf = new SimpleLeakAwareCompositeByteBuf(buf, leak);
            }
            break;
        case ADVANCED:
        case PARANOID:
            leak = AbstractByteBuf.leakDetector.track(buf);
            if (leak != null) {
                buf = new AdvancedLeakAwareCompositeByteBuf(buf, leak);
            }
            break;
        default:
            break;
    }
    return buf;
}


//SimpleLeakAwareCompositeByteBuf
class SimpleLeakAwareCompositeByteBuf extends WrappedCompositeByteBuf {

    final ResourceLeakTracker<ByteBuf> leak;//内部资源泄漏探测器

    SimpleLeakAwareCompositeByteBuf(CompositeByteBuf wrapped, ResourceLeakTracker<ByteBuf> leak) {
        super(wrapped);
        this.leak = ObjectUtil.checkNotNull(leak, "leak");
    }
    ...
 }

//追踪字节buf内存泄漏
protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) {
    ResourceLeakTracker<ByteBuf> leak;
    switch (ResourceLeakDetector.getLevel()) {
        case SIMPLE:
            leak = AbstractByteBuf.leakDetector.track(buf);
            if (leak != null) {
                buf = new SimpleLeakAwareByteBuf(buf, leak);
            }
            break;
        case ADVANCED:
        case PARANOID:
            leak = AbstractByteBuf.leakDetector.track(buf);
            if (leak != null) {
                buf = new AdvancedLeakAwareByteBuf(buf, leak);
            }
            break;
        default:
            break;
    }
    return buf;
}

再来看创建direct类型复合buf
@Override
public CompositeByteBuf compositeDirectBuffer() {
    return compositeDirectBuffer(DEFAULT_MAX_COMPONENTS);
}

@Override
public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) {
    return toLeakAwareBuffer(new CompositeByteBuf(this, true, maxNumComponents));
}


从上面可以看出创建复合buf时,如果资源泄漏探测功能开启,则追踪复合buf内存泄漏情况。

再来看一下计算新容量
@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
    if (minNewCapacity < 0) {
        throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expected: 0+)");
    }
    if (minNewCapacity > maxCapacity) {
        throw new IllegalArgumentException(String.format(
                "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
                minNewCapacity, maxCapacity));
    }
    final int threshold = CALCULATE_THRESHOLD; // 4 MiB page

    if (minNewCapacity == threshold) {
        return threshold;
    }

    // If over threshold, do not double but just increase by threshold.
    //如果新容量大于阈值,
    if (minNewCapacity > threshold) {
        int newCapacity = minNewCapacity / threshold * threshold;
        if (newCapacity > maxCapacity - threshold) {//新容量+阈值大于最大容量
            newCapacity = maxCapacity;//则容量为最大容量
        } else {
            newCapacity += threshold;//否则增量为阈值
        }
        return newCapacity;
    }

    // Not over threshold. Double up to 4 MiB, starting from 64.
    //否则从容量64开始,每次扩展一倍,直至大于最小新容量
    int newCapacity = 64;
    while (newCapacity < minNewCapacity) {
        newCapacity <<= 1;
    }

    return Math.min(newCapacity, maxCapacity);
}

再来看字节buf分配器度量提供者ByteBufAllocatorMetricProvider

public interface ByteBufAllocatorMetricProvider {

    /**
     * Returns a {@link ByteBufAllocatorMetric} for a {@link ByteBufAllocator}.
     返回一个字节buf分配器度量器
     */
    ByteBufAllocatorMetric metric();
}


public interface ByteBufAllocatorMetric {
    /**
     * Returns the number of bytes of heap memory used by a {@link ByteBufAllocator} or {@code -1} if unknown.
     返回字节buf分配使用的堆内存
     */
    long usedHeapMemory();

    /**
     * Returns the number of bytes of direct memory used by a {@link ByteBufAllocator} or {@code -1} if unknown.
     返回字节buf分配使用的direct内存
     */
    long usedDirectMemory();
}

总结:
创建字节buf主要根据字节buf分配器的directByDefault属性,来决定分配buf是否为direct类型还是heap类型;创建direct和heap buf实际通过newDirectBuffer和newHeapBuffer方法,待子类扩展。看出ioBuffer方法创建的字节buf,优先为direct类型,当系统平台不支持Unsafe时,才为heap类型;创建复合buf主要根据字节buf分配器的directByDefault属性,来决定分配buf是否为direct类型还是heap类型;创建复合buf时,如果资源泄漏探测功能开启,则追踪复合buf内存泄漏情况。
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics