`
Donald_Draper
  • 浏览: 950716 次
社区版块
存档分类
最新评论

netty Unpooled字节buf分配器

 
阅读更多
netty 字节buf定义:http://donald-draper.iteye.com/blog/2393813
netty 资源泄漏探测器:http://donald-draper.iteye.com/blog/2393940
netty 抽象字节buf解析:http://donald-draper.iteye.com/blog/2394078
netty 抽象字节buf引用计数器:http://donald-draper.iteye.com/blog/2394109
netty 复合buf概念:http://donald-draper.iteye.com/blog/2394408
netty 抽象字节buf分配器:http://donald-draper.iteye.com/blog/2394419
引言
上一篇我们看了抽象字节buf分配器,先来回顾一下:
创建字节buf主要根据字节buf分配器的directByDefault属性,来决定分配buf是否为direct类型还是heap类型;创建direct和heap buf实际通过newDirectBuffer和newHeapBuffer方法,待子类扩展。看出ioBuffer方法创建的字节buf,优先为direct类型,当系统平台不支持Unsafe时,才为heap类型;创建复合buf主要根据字节buf分配器的directByDefault属性,来决定分配buf是否为direct类型还是heap类型;创建复合buf时,如果资源泄漏探测功能开启,则追踪复合buf内存泄漏情况。

今天我们来看抽象字节buf分配器一个具体实现非池类字节buf分配器UnpooledByteBufAllocator
/**
 * Simplistic {@link ByteBufAllocator} implementation that does not pool anything.
 */
public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {
    //字节buf分配器Metric
    private final UnpooledByteBufAllocatorMetric metric = new UnpooledByteBufAllocatorMetric();
    private final boolean disableLeakDetector;//是否关闭资源泄漏探测

    /**
     * Default instance which uses leak-detection for direct buffers.
     默认非池类字节buf分配器
     */
    public static final UnpooledByteBufAllocator DEFAULT =
            new UnpooledByteBufAllocator(PlatformDependent.directBufferPreferred());

    /**
     * Create a new instance which uses leak-detection for direct buffers.
     *
     * @param preferDirect {@code true} if {@link #buffer(int)} should try to allocate a direct buffer rather than
     *                     a heap buffer
     */
    public UnpooledByteBufAllocator(boolean preferDirect) {
        this(preferDirect, false);
    }

    /**
     * Create a new instance
     *
     * @param preferDirect {@code true} if {@link #buffer(int)} should try to allocate a direct buffer rather than
     *                     a heap buffer
     * @param disableLeakDetector {@code true} if the leak-detection should be disabled completely for this
     *                            allocator. This can be useful if the user just want to depend on the GC to handle
     *                            direct buffers when not explicit released.
     */
    public UnpooledByteBufAllocator(boolean preferDirect, boolean disableLeakDetector) {
        super(preferDirect);
        this.disableLeakDetector = disableLeakDetector;
    }
}


来看一下度量器
private static final class UnpooledByteBufAllocatorMetric implements ByteBufAllocatorMetric {
    final LongCounter directCounter = PlatformDependent.newLongCounter();//direct内存使用计数器
    final LongCounter heapCounter = PlatformDependent.newLongCounter();//Heap内存使用计数器
    @Override
    public long usedHeapMemory() {
        return heapCounter.value();
    }
    @Override
    public long usedDirectMemory() {
        return directCounter.value();
    }
    @Override
    public String toString() {
        return StringUtil.simpleClassName(this) +
                "(usedHeapMemory: " + usedHeapMemory() + "; usedDirectMemory: " + usedDirectMemory() + ')';
    }
}


//LongCounter Long计数器
/**
 * Counter for long.
 */
public interface LongCounter {
    void add(long delta);
    void increment();
    void decrement();
    long value();
}

来看创建堆buf
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
    return PlatformDependent.hasUnsafe() ?
            new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
            new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}

堆buf有两种,我们分别来看
先看InstrumentedUnpooledUnsafeHeapByteBuf
1.
//InstrumentedUnpooledUnsafeHeapByteBuf
private static final class InstrumentedUnpooledUnsafeHeapByteBuf extends UnpooledUnsafeHeapByteBuf {
    InstrumentedUnpooledUnsafeHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(alloc, initialCapacity, maxCapacity);
    }

    @Override
    byte[] allocateArray(int initialCapacity) {
        byte[] bytes = super.allocateArray(initialCapacity);
	//更新buf分配器Heap内存使用量
        ((UnpooledByteBufAllocator) alloc()).incrementHeap(bytes.length);
        return bytes;
    }

    @Override
    void freeArray(byte[] array) {
        int length = array.length;
        super.freeArray(array);
	//释放buf分配器Heap内存使用量
        ((UnpooledByteBufAllocator) alloc()).decrementHeap(length);
    }
}


//UnpooledUnsafeHeapByteBuf
class UnpooledUnsafeHeapByteBuf extends UnpooledHeapByteBuf {

    /**
     * Creates a new heap buffer with a newly allocated byte array.
     *
     * @param initialCapacity the initial capacity of the underlying byte array
     * @param maxCapacity the max capacity of the underlying byte array
     */
    UnpooledUnsafeHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(alloc, initialCapacity, maxCapacity);
    }
    ...
}

2.
再来看另一种堆buf InstrumentedUnpooledHeapByteBuf
//InstrumentedUnpooledHeapByteBuf
private static final class InstrumentedUnpooledHeapByteBuf extends UnpooledHeapByteBuf {
    InstrumentedUnpooledHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(alloc, initialCapacity, maxCapacity);
    }

    @Override
    byte[] allocateArray(int initialCapacity) {
        byte[] bytes = super.allocateArray(initialCapacity);
	//更新buf分配器Heap内存使用量
        ((UnpooledByteBufAllocator) alloc()).incrementHeap(bytes.length);
        return bytes;
    }

    @Override
    void freeArray(byte[] array) {
        int length = array.length;
        super.freeArray(array);
	//释放buf分配器Heap内存使用量
        ((UnpooledByteBufAllocator) alloc()).decrementHeap(length);
    }
}

我们来看一下Unpooled堆字节buf:
/**
 * Big endian Java heap buffer implementation.
 */
public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {

    private final ByteBufAllocator alloc;
    byte[] array;//存储数据的字节数组
    private ByteBuffer tmpNioBuf;//临时nio 字节部分

    /**
     * Creates a new heap buffer with a newly allocated byte array.
     *
     * @param initialCapacity the initial capacity of the underlying byte array
     * @param maxCapacity the max capacity of the underlying byte array
     */
    protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(maxCapacity);

        checkNotNull(alloc, "alloc");

        if (initialCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
        }

        this.alloc = alloc;
        setArray(allocateArray(initialCapacity));
        setIndex(0, 0);
    }

    /**
     * Creates a new heap buffer with an existing byte array.
     *
     * @param initialArray the initial underlying byte array
     * @param maxCapacity the max capacity of the underlying byte array
     */
    protected UnpooledHeapByteBuf(ByteBufAllocator alloc, byte[] initialArray, int maxCapacity) {
        super(maxCapacity);

        checkNotNull(alloc, "alloc");
        checkNotNull(initialArray, "initialArray");

        if (initialArray.length > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "initialCapacity(%d) > maxCapacity(%d)", initialArray.length, maxCapacity));
        }

        this.alloc = alloc;
        setArray(initialArray);
        setIndex(0, initialArray.length);
    }
    //分配initialCapacity容量的字节数组
    byte[] allocateArray(int initialCapacity) {
        return new byte[initialCapacity];
    }
    ...
    @Override
    public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
        checkDstIndex(index, length, dstIndex, dst.length);
        System.arraycopy(array, index, dst, dstIndex, length);
        return this;
    }
    ....
    @Override
    public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
        checkSrcIndex(index, length, srcIndex, src.capacity());
        if (src.hasMemoryAddress()) {
            PlatformDependent.copyMemory(src.memoryAddress() + srcIndex, array, index, length);
        } else  if (src.hasArray()) {
            setBytes(index, src.array(), src.arrayOffset() + srcIndex, length);
        } else {
            src.getBytes(srcIndex, array, index, length);
        }
        return this;
    }

    @Override
    public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
        checkSrcIndex(index, length, srcIndex, src.length);
        System.arraycopy(src, srcIndex, array, index, length);
        return this;
    }
    ...

    @Override
    public ByteBuffer nioBuffer(int index, int length) {
        ensureAccessible();
        return ByteBuffer.wrap(array, index, length).slice();
    }

    @Override
    public ByteBuffer[] nioBuffers(int index, int length) {
        return new ByteBuffer[] { nioBuffer(index, length) };
    }

    @Override
    public ByteBuffer internalNioBuffer(int index, int length) {
        checkIndex(index, length);
        return (ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length);
    }
    ...
    private ByteBuffer internalNioBuffer() {
        ByteBuffer tmpNioBuf = this.tmpNioBuf;
        if (tmpNioBuf == null) {
            this.tmpNioBuf = tmpNioBuf = ByteBuffer.wrap(array);
        }
        return tmpNioBuf;
    }
    ...
     @Override
    protected void deallocate() {
        freeArray(array);
        array = null;
    }
    //待子类扩展
    void freeArray(byte[] array) {
        // NOOP
    }
}

从上面可以看出,非池类堆字节buf,实际为一个字节数组。

//堆内存使用更新与释放,委托给内部度量器
void incrementHeap(int amount) {
    metric.heapCounter.add(amount);
}
void decrementHeap(int amount) {
    metric.heapCounter.add(-amount);
}


再来看分配direct buf:
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    final ByteBuf buf;
    if (PlatformDependent.hasUnsafe()) {
        buf = PlatformDependent.useDirectBufferNoCleaner() ?
                new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
                new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
    } else {
        buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }
    return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}

从上面来看direct buf有三种,分别为InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf,
InstrumentedUnpooledUnsafeDirectByteBuf,InstrumentedUnpooledDirectByteBuf

我们分别来看这三种:
1.
private static final class InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf
        extends UnpooledUnsafeNoCleanerDirectByteBuf {
    InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(
            UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(alloc, initialCapacity, maxCapacity);
    }

    @Override
    protected ByteBuffer allocateDirect(int initialCapacity) {
        ByteBuffer buffer = super.allocateDirect(initialCapacity);
	//更新buf分配器direct内存使用量
        ((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity());
        return buffer;
    }

    @Override
    ByteBuffer reallocateDirect(ByteBuffer oldBuffer, int initialCapacity) {
        int capacity = oldBuffer.capacity();
        ByteBuffer buffer = super.reallocateDirect(oldBuffer, initialCapacity);
	//更新buf分配器direct内存使用量
        ((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity() - capacity);
        return buffer;
    }

    @Override
    protected void freeDirect(ByteBuffer buffer) {
        int capacity = buffer.capacity();
        super.freeDirect(buffer);
	//释放buf分配器direct内存使用量
        ((UnpooledByteBufAllocator) alloc()).decrementDirect(capacity);
    }
}

//UnpooledUnsafeNoCleanerDirectByteBuf
class UnpooledUnsafeNoCleanerDirectByteBuf extends UnpooledUnsafeDirectByteBuf {

    UnpooledUnsafeNoCleanerDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(alloc, initialCapacity, maxCapacity);
    }
    ...
}

再来看第二种:
2.
private static final class InstrumentedUnpooledUnsafeDirectByteBuf extends UnpooledUnsafeDirectByteBuf {
    InstrumentedUnpooledUnsafeDirectByteBuf(
            UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(alloc, initialCapacity, maxCapacity);
    }

    @Override
    protected ByteBuffer allocateDirect(int initialCapacity) {
        ByteBuffer buffer = super.allocateDirect(initialCapacity);
	//更新buf分配器direct内存使用量
        ((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity());
        return buffer;
    }

    @Override
    protected void freeDirect(ByteBuffer buffer) {
        int capacity = buffer.capacity();
        super.freeDirect(buffer);
	//释放buf分配器direct内存使用量
        ((UnpooledByteBufAllocator) alloc()).decrementDirect(capacity);
    }
}

//UnpooledUnsafeDirectByteBuf
/**
 * A NIO {@link ByteBuffer} based buffer.  It is recommended to use {@link Unpooled#directBuffer(int)}
 * and {@link Unpooled#wrappedBuffer(ByteBuffer)} instead of calling the
 * constructor explicitly.
 */
public class UnpooledUnsafeDirectByteBuf extends AbstractReferenceCountedByteBuf {

    private final ByteBufAllocator alloc;
    private ByteBuffer tmpNioBuf;
    private int capacity;//容量
    private boolean doNotFree;//是否需要释放内存
    ByteBuffer buffer;//内部nio 字节buf
    long memoryAddress;
    /**
     * Creates a new direct buffer.
     *
     * @param initialCapacity the initial capacity of the underlying direct buffer
     * @param maxCapacity     the maximum capacity of the underlying direct buffer
     */
    protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(maxCapacity);
        if (alloc == null) {
            throw new NullPointerException("alloc");
        }
        if (initialCapacity < 0) {
            throw new IllegalArgumentException("initialCapacity: " + initialCapacity);
        }
        if (maxCapacity < 0) {
            throw new IllegalArgumentException("maxCapacity: " + maxCapacity);
        }
        if (initialCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
        }

        this.alloc = alloc;
        setByteBuffer(allocateDirect(initialCapacity), false);
    }

    /**
     * Creates a new direct buffer by wrapping the specified initial buffer.
     *
     * @param maxCapacity the maximum capacity of the underlying direct buffer
     */
    protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, ByteBuffer initialBuffer, int maxCapacity) {
        // We never try to free the buffer if it was provided by the end-user as we not know if this is an duplicate or
        // an slice. This is done to prevent an IllegalArgumentException when using Java9 as Unsafe.invokeCleaner(...)
        // will check if the given buffer is either an duplicate or slice and in this case throw an
        // IllegalArgumentException.
        //
        // See http://hg.openjdk.java.net/jdk9/hs-demo/jdk/file/0d2ab72ba600/src/jdk.unsupported/share/classes/
        // sun/misc/Unsafe.java#l1250
        //
        // We also call slice() explicitly here to preserve behaviour with previous netty releases.
        this(alloc, initialBuffer.slice(), maxCapacity, false);
    }

    UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, ByteBuffer initialBuffer, int maxCapacity, boolean doFree) {
        super(maxCapacity);
        if (alloc == null) {
            throw new NullPointerException("alloc");
        }
        if (initialBuffer == null) {
            throw new NullPointerException("initialBuffer");
        }
        if (!initialBuffer.isDirect()) {
            throw new IllegalArgumentException("initialBuffer is not a direct buffer.");
        }
        if (initialBuffer.isReadOnly()) {
            throw new IllegalArgumentException("initialBuffer is a read-only buffer.");
        }

        int initialCapacity = initialBuffer.remaining();
        if (initialCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
        }

        this.alloc = alloc;
        doNotFree = !doFree;
        setByteBuffer(initialBuffer.order(ByteOrder.BIG_ENDIAN), false);
        writerIndex(initialCapacity);
    }

    /**
     * Allocate a new direct {@link ByteBuffer} with the given initialCapacity.
     分配initialCapacity容量的direct buf,实际委托给内部的nio 字节buf
     */
    protected ByteBuffer allocateDirect(int initialCapacity) {
        return ByteBuffer.allocateDirect(initialCapacity);
    }
   ...
}

从上面我们可以看出,非池类Direct buf,实际为一个nio 字节buf。
再来看第三种:
3.
private static final class InstrumentedUnpooledDirectByteBuf extends UnpooledDirectByteBuf {
    InstrumentedUnpooledDirectByteBuf(
            UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(alloc, initialCapacity, maxCapacity);
    }

    @Override
    protected ByteBuffer allocateDirect(int initialCapacity) {
        ByteBuffer buffer = super.allocateDirect(initialCapacity);
	//更新buf分配器direct内存使用量
        ((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity());
        return buffer;
    }

    @Override
    protected void freeDirect(ByteBuffer buffer) {
        int capacity = buffer.capacity();
        super.freeDirect(buffer);
	//释放buf分配器direct内存使用量
        ((UnpooledByteBufAllocator) alloc()).decrementDirect(capacity);
    }
}



//UnpooledDirectByteBuf
/**
 * A NIO {@link ByteBuffer} based buffer.  It is recommended to use {@link Unpooled#directBuffer(int)}
 * and {@link Unpooled#wrappedBuffer(ByteBuffer)} instead of calling the
 * constructor explicitly.
 */
public class UnpooledDirectByteBuf extends AbstractReferenceCountedByteBuf {

    private final ByteBufAllocator alloc;

    private ByteBuffer buffer;//内部字节buf
    private ByteBuffer tmpNioBuf;
    private int capacity;//容量
    private boolean doNotFree;//是否需要释放内存

    /**
     * Creates a new direct buffer.
     *
     * @param initialCapacity the initial capacity of the underlying direct buffer
     * @param maxCapacity     the maximum capacity of the underlying direct buffer
     */
    protected UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(maxCapacity);
        if (alloc == null) {
            throw new NullPointerException("alloc");
        }
        if (initialCapacity < 0) {
            throw new IllegalArgumentException("initialCapacity: " + initialCapacity);
        }
        if (maxCapacity < 0) {
            throw new IllegalArgumentException("maxCapacity: " + maxCapacity);
        }
        if (initialCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
        }

        this.alloc = alloc;
        setByteBuffer(ByteBuffer.allocateDirect(initialCapacity));
    }

    /**
     * Creates a new direct buffer by wrapping the specified initial buffer.
     *
     * @param maxCapacity the maximum capacity of the underlying direct buffer
     */
    protected UnpooledDirectByteBuf(ByteBufAllocator alloc, ByteBuffer initialBuffer, int maxCapacity) {
        super(maxCapacity);
        if (alloc == null) {
            throw new NullPointerException("alloc");
        }
        if (initialBuffer == null) {
            throw new NullPointerException("initialBuffer");
        }
        if (!initialBuffer.isDirect()) {
            throw new IllegalArgumentException("initialBuffer is not a direct buffer.");
        }
        if (initialBuffer.isReadOnly()) {
            throw new IllegalArgumentException("initialBuffer is a read-only buffer.");
        }

        int initialCapacity = initialBuffer.remaining();
        if (initialCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
        }

        this.alloc = alloc;
        doNotFree = true;
        setByteBuffer(initialBuffer.slice().order(ByteOrder.BIG_ENDIAN));
        writerIndex(initialCapacity);
    }

    /**
     * Allocate a new direct {@link ByteBuffer} with the given initialCapacity.
     分配initialCapacity容量的direct buf,实际委托给内部的nio 字节buf
     */
    protected ByteBuffer allocateDirect(int initialCapacity) {
        return ByteBuffer.allocateDirect(initialCapacity);
    }
    ...
}


再来看更新direct 内存使用量,更新操作,直接委托给内部度量器:
void incrementDirect(int amount) {
    metric.directCounter.add(amount);
}

void decrementDirect(int amount) {
    metric.directCounter.add(-amount);
}

再来看复合buf

@Override
public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
    CompositeByteBuf buf = new CompositeByteBuf(this, false, maxNumComponents);
    return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}

@Override
public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) {
    CompositeByteBuf buf = new CompositeByteBuf(this, true, maxNumComponents);
    return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}




总结:

非池类堆字节buf,实际为一个字节数组,直接在Java虚拟机堆内存中,分配字节缓存;非池类Direct buf,实际为一个nio 字节buf,从操作系统实际物理内存中,分配字节缓存。Unpooled创建字节buf,实际委托给内部字节分配器UnpooledByteBufAllocator。

附:
我们再来看一下Unpooled:
/**
 * Creates a new {@link ByteBuf} by allocating new space or by wrapping
 * or copying existing byte arrays, byte buffers and a string.
 *Unpooled用于创建一个新的字节buf,包装和拷贝一个已经存在的字节数组,字节buf或String
 * <h3>Use static import</h3>
 * This classes is intended to be used with Java 5 static import statement:
 *
 * <pre>
 * import static io.netty.buffer.{@link Unpooled}.*;
 *
 * {@link ByteBuf} heapBuffer    = buffer(128);
 * {@link ByteBuf} directBuffer  = directBuffer(256);
 * {@link ByteBuf} wrappedBuffer = wrappedBuffer(new byte[128], new byte[256]);
 * {@link ByteBuf} copiedBuffer  = copiedBuffer({@link ByteBuffer}.allocate(128));
 * </pre>
 *
 * <h3>Allocating a new buffer</h3>
 *
 * Three buffer types are provided out of the box.
 *
 * [list]
 * [*]{@link #buffer(int)} allocates a new fixed-capacity heap buffer.

 * [*]{@link #directBuffer(int)} allocates a new fixed-capacity direct buffer.

 * [/list]
 *
 * <h3>Creating a wrapped buffer</h3>
 *包装buf,为底层buf的视图,所有底层buf的改变,包装buf都可以看到
 * Wrapped buffer is a buffer which is a view of one or more existing
 * byte arrays and byte buffers.  Any changes in the content of the original
 * array or buffer will be visible in the wrapped buffer.  Various wrapper
 * methods are provided and their name is all {@code wrappedBuffer()}.
 * You might want to take a look at the methods that accept varargs closely if
 * you want to create a buffer which is composed of more than one array to
 * reduce the number of memory copy.
 *
 * <h3>Creating a copied buffer</h3>
 *拷贝buf,并不会共享数据,底层buf数据的改变,拷贝buf看不到
 * Copied buffer is a deep copy of one or more existing byte arrays, byte
 * buffers or a string.  Unlike a wrapped buffer, there's no shared data
 * between the original data and the copied buffer.  Various copy methods are
 * provided and their name is all {@code copiedBuffer()}.  It is also convenient
 * to use this operation to merge multiple buffers into one buffer.
 */
public final class Unpooled {
    //字节buf分配器,默认为UnpooledByteBufAllocator.DEFAULT
    private static final ByteBufAllocator ALLOC = UnpooledByteBufAllocator.DEFAULT;

    /**
     * Big endian byte order.
     */
    public static final ByteOrder BIG_ENDIAN = ByteOrder.BIG_ENDIAN;

    /**
     * Little endian byte order.
     */
    public static final ByteOrder LITTLE_ENDIAN = ByteOrder.LITTLE_ENDIAN;

    /**
     * A buffer whose capacity is {@code 0}.
     */
    public static final ByteBuf EMPTY_BUFFER = ALLOC.buffer(0, 0);

    static {
        assert EMPTY_BUFFER instanceof EmptyByteBuf: "EMPTY_BUFFER must be an EmptyByteBuf.";
    }
}

再来看分配buf:

/**
 * Creates a new big-endian Java heap buffer with reasonably small initial capacity, which
 * expands its capacity boundlessly on demand.
 */
public static ByteBuf buffer() {
    return ALLOC.heapBuffer();
}

/**
 * Creates a new big-endian direct buffer with reasonably small initial capacity, which
 * expands its capacity boundlessly on demand.
 */
public static ByteBuf directBuffer() {
    return ALLOC.directBuffer();
}

/**
 * Creates a new big-endian Java heap buffer with the specified {@code capacity}, which
 * expands its capacity boundlessly on demand.  The new buffer's {@code readerIndex} and
 * {@code writerIndex} are {@code 0}.
 */
public static ByteBuf buffer(int initialCapacity) {
    return ALLOC.heapBuffer(initialCapacity);
}

/**
 * Creates a new big-endian direct buffer with the specified {@code capacity}, which
 * expands its capacity boundlessly on demand.  The new buffer's {@code readerIndex} and
 * {@code writerIndex} are {@code 0}.
 */
public static ByteBuf directBuffer(int initialCapacity) {
    return ALLOC.directBuffer(initialCapacity);
}

/**
 * Returns a new big-endian composite buffer with no components.
 */
public static CompositeByteBuf compositeBuffer() {
    return compositeBuffer(AbstractByteBufAllocator.DEFAULT_MAX_COMPONENTS);
}

/**
 * Returns a new big-endian composite buffer with no components.
 */
public static CompositeByteBuf compositeBuffer(int maxNumComponents) {
    return new CompositeByteBuf(ALLOC, false, maxNumComponents);
}

从上面来看Unpooled创建字节buf,实际委托给内部字节分配器UnpooledByteBufAllocator。

包装buf相关的方法:
/**
 * Creates a new big-endian buffer which wraps the specified {@code array}.
 * A modification on the specified array's content will be visible to the
 * returned buffer.
 */
public static ByteBuf wrappedBuffer(byte[] array) {
    if (array.length == 0) {
        return EMPTY_BUFFER;
    }
    return new UnpooledHeapByteBuf(ALLOC, array, array.length);
}


/**
 * Creates a new buffer which wraps the specified buffer's readable bytes.
 * A modification on the specified buffer's content will be visible to the
 * returned buffer.
 * @param buffer The buffer to wrap. Reference count ownership of this variable is transfered to this method.
 * @return The readable portion of the {@code buffer}, or an empty buffer if there is no readable portion.
 * The caller is responsible for releasing this buffer.
 */
public static ByteBuf wrappedBuffer(ByteBuf buffer) {
    if (buffer.isReadable()) {
        return buffer.slice();
    } else {
        buffer.release();
        return EMPTY_BUFFER;
    }
}

/**
 * Creates a new big-endian composite buffer which wraps the readable bytes of the
 * specified buffers without copying them.  A modification on the content
 * of the specified buffers will be visible to the returned buffer.
 * @param maxNumComponents Advisement as to how many independent buffers are allowed to exist before
 * consolidation occurs.
 * @param buffers The buffers to wrap. Reference count ownership of all variables is transfered to this method.
 * @return The readable portion of the {@code buffers}. The caller is responsible for releasing this buffer.
 */
public static ByteBuf wrappedBuffer(int maxNumComponents, ByteBuf... buffers) {
    switch (buffers.length) {
    case 0:
        break;
    case 1:
        ByteBuf buffer = buffers[0];
        if (buffer.isReadable()) {
            return wrappedBuffer(buffer.order(BIG_ENDIAN));
        } else {
            buffer.release();
        }
        break;
    default:
        for (int i = 0; i < buffers.length; i++) {
            ByteBuf buf = buffers[i];
            if (buf.isReadable()) {
                return new CompositeByteBuf(ALLOC, false, maxNumComponents, buffers, i, buffers.length);
            }
            buf.release();
        }
        break;
    }
    return EMPTY_BUFFER;
}

...

拷贝buf相关的方法:

**
 * Creates a new big-endian buffer whose content is a copy of the
 * specified {@code array}.  The new buffer's {@code readerIndex} and
 * {@code writerIndex} are {@code 0} and {@code array.length} respectively.
 */
public static ByteBuf copiedBuffer(byte[] array) {
    if (array.length == 0) {
        return EMPTY_BUFFER;
    }
    return wrappedBuffer(array.clone());
}

/**
 * Creates a new buffer whose content is a copy of the specified
 * {@code buffer}'s readable bytes.  The new buffer's {@code readerIndex}
 * and {@code writerIndex} are {@code 0} and {@code buffer.readableBytes}
 * respectively.
 */
public static ByteBuf copiedBuffer(ByteBuf buffer) {
    int readable = buffer.readableBytes();
    if (readable > 0) {
        ByteBuf copy = buffer(readable);
        copy.writeBytes(buffer, buffer.readerIndex(), readable);
        return copy;
    } else {
        return EMPTY_BUFFER;
    }
}


/**
 * Return a unreleasable view on the given {@link ByteBuf} which will just ignore release and retain calls.
 返回一个不可释放的buf
 */
public static ByteBuf unreleasableBuffer(ByteBuf buf) {
    return new UnreleasableByteBuf(buf);
}


//UnreleasableByteBuf
final class UnreleasableByteBuf extends WrappedByteBuf {

    private SwappedByteBuf swappedBuf;
    ...
     @Override
    public ByteBuf retain(int increment) {
        return this;
    }

    @Override
    public ByteBuf retain() {
        return this;
    }

    @Override
    public ByteBuf touch() {
        return this;
    }

    @Override
    public ByteBuf touch(Object hint) {
        return this;
    }

    @Override
    public boolean release() {
        return false;
    }

    @Override
    public boolean release(int decrement) {
        return false;
    }
}

//WrappedByteBuf,理解为字节静态代理
class WrappedByteBuf extends ByteBuf {

    protected final ByteBuf buf; 
    @Override
    public final int readerIndex() {
        return buf.readerIndex();
    }

    @Override
    public final ByteBuf readerIndex(int readerIndex) {
        buf.readerIndex(readerIndex);
        return this;
    }

    @Override
    public final int writerIndex() {
        return buf.writerIndex();
    }

    @Override
    public final ByteBuf writerIndex(int writerIndex) {
        buf.writerIndex(writerIndex);
        return this;
    }
    ...
}


/**
 * Wrap the given {@link ByteBuf}s in an unmodifiable {@link ByteBuf}. Be aware the returned {@link ByteBuf} will
 * not try to slice the given {@link ByteBuf}s to reduce GC-Pressure.
 包装buf为不可修改字节buf
 */
public static ByteBuf unmodifiableBuffer(ByteBuf... buffers) {
    return new FixedCompositeByteBuf(ALLOC, buffers);
}

//FixedCompositeByteBuf
/**
 * {@link ByteBuf} implementation which allows to wrap an array of {@link ByteBuf} in a read-only mode.
 * This is useful to write an array of {@link ByteBuf}s.
 */
final class FixedCompositeByteBuf extends AbstractReferenceCountedByteBuf {
    private static final ByteBuf[] EMPTY = { Unpooled.EMPTY_BUFFER };
    private final int nioBufferCount;
    private final int capacity;
    private final ByteBufAllocator allocator;
    private final ByteOrder order;
    private final Object[] buffers;
    private final boolean direct;
    ...
}



//Unpooled





  • 大小: 86.2 KB
  • 大小: 81.8 KB
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics