`
Donald_Draper
  • 浏览: 950738 次
社区版块
存档分类
最新评论

netty 复合buf概念

阅读更多
netty 字节buf定义:http://donald-draper.iteye.com/blog/2393813
netty 资源泄漏探测器:http://donald-draper.iteye.com/blog/2393940
netty 抽象字节buf解析:http://donald-draper.iteye.com/blog/2394078
netty 抽象字节buf引用计数器:http://donald-draper.iteye.com/blog/2394109
引言
前一篇文章我们看了抽象字节buf引用计数器,我们先来看回顾一下:
抽象字节引用计数器AbstractReferenceCountedByteBuf,内部有一个引用计数器,以及原子更新引用计数器的refCntUpdater(AbstractReferenceCountedByteBuf),更新引用计数器,实际通过refCntUpdater CAS操作,释放对象引用的时候,如果引用计数器为0,则释放对象相关资源。
今天我们将要看的是,抽象字节buf引用计数器一个具体实现CompositeByteBuf
package io.netty.buffer;

import io.netty.util.internal.EmptyArrays;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;

import static io.netty.util.internal.ObjectUtil.checkNotNull;
/**
 * A virtual buffer which shows multiple buffers as a single merged buffer.  It is recommended to use
 * {@link ByteBufAllocator#compositeBuffer()} or {@link Unpooled#wrappedBuffer(ByteBuf...)} instead of calling the
 * constructor explicitly.
 复合直接为一个虚拟的buf,将多个buf展示为一个合并的buf。强烈建议使用ByteBufAllocator#compositeBuffer()和
Unpooled#wrappedBuffer(ByteBuf...)构造复合buf。
 */
public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements Iterable<ByteBuf> {

    private static final ByteBuffer EMPTY_NIO_BUFFER = Unpooled.EMPTY_BUFFER.nioBuffer();
    private static final Iterator<ByteBuf> EMPTY_ITERATOR = Collections.<ByteBuf>emptyList().iterator();

    private final ByteBufAllocator alloc;//字节buf分配器
    private final boolean direct;//是否为direct buf
    private final List<Component> components;//字节buf集合
    private final int maxNumComponents;//buf即最大容量
    private boolean freed;
}


先来看一下组件Component的定义
private static final class Component {
    final ByteBuf buf;//内部字节buf
    final int length;//buf字节长度
    int offset;//在复合buf中的开始位置
    int endOffset;//在复合buf中的结束位置

    Component(ByteBuf buf) {
        this.buf = buf;
        length = buf.readableBytes();
    }

    void freeIfNecessary() {
        buf.release(); // We should not get a NPE here. If so, it must be a bug.
    }
}

从上来看,组件可以看做字节buf的包装。

再来看构造:
// Special constructor used by WrappedCompositeByteBuf
CompositeByteBuf(ByteBufAllocator alloc) {
    super(Integer.MAX_VALUE);
    this.alloc = alloc;
    direct = false;
    maxNumComponents = 0;
    components = Collections.emptyList();
}

public CompositeByteBuf(ByteBufAllocator alloc, boolean direct, int maxNumComponents) {
    super(AbstractByteBufAllocator.DEFAULT_MAX_CAPACITY);
    if (alloc == null) {
        throw new NullPointerException("alloc");
    }
    this.alloc = alloc;
    this.direct = direct;
    this.maxNumComponents = maxNumComponents;
    //创建字节buf集
    components = newList(maxNumComponents);
}

public CompositeByteBuf(ByteBufAllocator alloc, boolean direct, int maxNumComponents, ByteBuf... buffers) {
    this(alloc, direct, maxNumComponents, buffers, 0, buffers.length);
}

public CompositeByteBuf(
        ByteBufAllocator alloc, boolean direct, int maxNumComponents, Iterable<ByteBuf> buffers) {
    super(AbstractByteBufAllocator.DEFAULT_MAX_CAPACITY);
    if (alloc == null) {
        throw new NullPointerException("alloc");
    }
    if (maxNumComponents < 2) {
        throw new IllegalArgumentException(
                "maxNumComponents: " + maxNumComponents + " (expected: >= 2)");
    }

    this.alloc = alloc;
    this.direct = direct;
    this.maxNumComponents = maxNumComponents;
    components = newList(maxNumComponents);//创建字节buf集
    addComponents0(false, 0, buffers);//添加字节buf数组,到buf集
    consolidateIfNeeded();//调整底层字节buf
    setIndex(0, capacity());//更新复合buf读写索引
}

CompositeByteBuf(
        ByteBufAllocator alloc, boolean direct, int maxNumComponents, ByteBuf[] buffers, int offset, int len) {
    super(AbstractByteBufAllocator.DEFAULT_MAX_CAPACITY);
    if (alloc == null) {
        throw new NullPointerException("alloc");
    }
    if (maxNumComponents < 2) {
        throw new IllegalArgumentException(
                "maxNumComponents: " + maxNumComponents + " (expected: >= 2)");
    }

    this.alloc = alloc;
    this.direct = direct;
    this.maxNumComponents = maxNumComponents;
    components = newList(maxNumComponents);//创建字节buf集
    addComponents0(false, 0, buffers, offset, len);//添加字节buf数组,到buf集
    consolidateIfNeeded();//调整底层字节buf
    setIndex(0, capacity());//更新复合buf读写索引
}

复合buf的构造主要有一下几点要看:

1.
components = newList(maxNumComponents);//创建字节buf集

2.
addComponents0(false, 0, buffers, offset, len);//添加字节buf数组,到buf集

3.
consolidateIfNeeded();//检查是否需要扩展buf集


4.
setIndex(0, capacity());//更新复合buf读写索引


我们分别来看这几点:
1.
components = newList(maxNumComponents);//创建字节buf集


private static List<Component> newList(int maxNumComponents) {
    return new ArrayList<Component>(Math.min(AbstractByteBufAllocator.DEFAULT_MAX_COMPONENTS, maxNumComponents));
}


2.
addComponents0(false, 0, buffers, offset, len);//添加字节buf数组,到buf集


private int addComponents0(boolean increaseIndex, int cIndex, Iterable<ByteBuf> buffers) {
    if (buffers instanceof ByteBuf) {
        // If buffers also implements ByteBuf (e.g. CompositeByteBuf), it has to go to addComponent(ByteBuf).
	//处理实现CompositeByteBuf的字节buf场景
        return addComponent0(increaseIndex, cIndex, (ByteBuf) buffers);
    }
    checkNotNull(buffers, "buffers");
    //如果buffers不是集合类,则将buffers转换为字节buf集合
    if (!(buffers instanceof Collection)) {
        List<ByteBuf> list = new ArrayList<ByteBuf>();
        try {
            for (ByteBuf b: buffers) {
                list.add(b);
            }
            buffers = list;
        } finally {
            if (buffers != list) {
                for (ByteBuf b: buffers) {
                    if (b != null) {
                        try {
			    //释放源buf数组中的字节buf
                            b.release();
                        } catch (Throwable ignored) {
                            // ignore
                        }
                    }
                }
            }
        }
    }

    Collection<ByteBuf> col = (Collection<ByteBuf>) buffers;
    return addComponents0(increaseIndex, cIndex, col.toArray(new ByteBuf[col.size()]), 0 , col.size());
}

private int addComponents0(boolean increaseWriterIndex, int cIndex, ByteBuf[] buffers, int offset, int len) {
    checkNotNull(buffers, "buffers");
    int i = offset;
    try {
        checkComponentIndex(cIndex);

        // No need for consolidation
        while (i < len) {
            // Increment i now to prepare for the next iteration and prevent a duplicate release (addComponent0
            // will release if an exception occurs, and we also release in the finally block here).
            ByteBuf b = buffers[i++];
            if (b == null) {
                break;
            }
	    //添加字节buf到复合buf的索引cIndex上
            cIndex = addComponent0(increaseWriterIndex, cIndex, b) + 1;
            int size = components.size();
            if (cIndex > size) {
                cIndex = size;
            }
        }
        return cIndex;
    } finally {
        for (; i < len; ++i) {
            ByteBuf b = buffers[i];
            if (b != null) {
                try {
		    //释放源buf数组中的字节buf
                    b.release();
                } catch (Throwable ignored) {
                    // ignore
                }
            }
        }
    }
}

//检查索引是否越界
 private void checkComponentIndex(int cIndex) {
     ensureAccessible();
     if (cIndex < 0 || cIndex > components.size()) {
         throw new IndexOutOfBoundsException(String.format(
                 "cIndex: %d (expected: >= 0 && <= numComponents(%d))",
                 cIndex, components.size()));
     }
 }

**
 * Precondition is that {@code buffer != null}.
  添加字节buf到复合buf的索引cIndex上
 */
private int addComponent0(boolean increaseWriterIndex, int cIndex, ByteBuf buffer) {
    assert buffer != null;
    boolean wasAdded = false;
    try {
        checkComponentIndex(cIndex);
        int readableBytes = buffer.readableBytes();
        // No need to consolidate - just add a component to the list.
        @SuppressWarnings("deprecation")
	//包装buf为buf组件
        Component c = new Component(buffer.order(ByteOrder.BIG_ENDIAN).slice());
        if (cIndex == components.size()) {
            wasAdded = components.add(c);
            if (cIndex == 0) {
	        //buf添加到复合buf集第一个位置
                c.endOffset = readableBytes;
            } else {
	        //buf添加到复合buf集最后一个位置
                Component prev = components.get(cIndex - 1);
                c.offset = prev.endOffset;
                c.endOffset = c.offset + readableBytes;
            }
        } else {
	   //添加buf到复合buf集合
            components.add(cIndex, c);
            wasAdded = true;
            if (readableBytes != 0) {
	        //更新索引
                updateComponentOffsets(cIndex);
            }
        }
        if (increaseWriterIndex) {
	    //更新写索引
            writerIndex(writerIndex() + buffer.readableBytes());
        }
        return cIndex;
    } finally {
        if (!wasAdded) {
            buffer.release();
        }
    }
}

来看一下更新索引和更新写索引:

//更新复合buf集合中cIndex位置上的buf的开始位置和终止位置
private void updateComponentOffsets(int cIndex) {
    int size = components.size();
    if (size <= cIndex) {
        return;
    }

    Component c = components.get(cIndex);
    if (cIndex == 0) {
        c.offset = 0;
        c.endOffset = c.length;
        cIndex ++;
    }

    for (int i = cIndex; i < size; i ++) {
        Component prev = components.get(i - 1);
        Component cur = components.get(i);
        cur.offset = prev.endOffset;
        cur.endOffset = cur.offset + cur.length;
    }
}



//更新写索引
@Override
public CompositeByteBuf writerIndex(int writerIndex) {
    return (CompositeByteBuf) super.writerIndex(writerIndex);
}



3.
consolidateIfNeeded();//检查是否需要扩展buf集


/**
 * This should only be called as last operation from a method as this may adjust the underlying
 * array of components and so affect the index etc.
 */
private void consolidateIfNeeded() {
    // Consolidate if the number of components will exceed the allowed maximum by the current
    // operation.
    final int numComponents = components.size();
    if (numComponents > maxNumComponents) {
        //如果当前buf的数量大于复合buf集最大容量,则获取最后一个buf的结束位置,即复合buf的数据长度
        final int capacity = components.get(numComponents - 1).endOffset;
        //分配capacity容量的字节buf
        ByteBuf consolidated = allocBuffer(capacity);

        // We're not using foreach to avoid creating an iterator.
	//将复合buf集中的所有字节buf,整合到一个buf中
        for (int i = 0; i < numComponents; i ++) {
            Component c = components.get(i);
            ByteBuf b = c.buf;
            consolidated.writeBytes(b);
            c.freeIfNecessary();
        }
        Component c = new Component(consolidated);
        c.endOffset = c.length;
        components.clear();//清空原始复合buf集
        components.add(c);//添加整合后的buf到复合buf集
    }
}


//分配capacity容量的字节buf
private ByteBuf allocBuffer(int capacity) {
    return direct ? alloc().directBuffer(capacity) : alloc().heapBuffer(capacity);
}


4.
setIndex(0, capacity());//更新复合buf读写索引


@Override
public CompositeByteBuf setIndex(int readerIndex, int writerIndex) {
    return (CompositeByteBuf) super.setIndex(readerIndex, writerIndex);
}


从上面可以看出:
复合字节缓冲CompositeByteBuf,内部有一个字节buf数组,用于存放字节buf,每个字节buf添加到复合buf集时,将被包装成一个buf组件,如果添加buf是,复合buf集已满,则将buf集中的所有buf,整合到一个组件buf中,并将原始buf集清空,添加整合后的buf到buf集。复合buf的读写索引为字节buf集的起始索引和size;每个组件buf Component内部记录着字节buf在
复合buf中的起始位置和结束位置,及buf可读数据长度。

其他添加字节buf到复合buf的的方法见附。


来看移除buf
/**
 * Remove the {@link ByteBuf} from the given index.
 *
 * @param cIndex the index on from which the {@link ByteBuf} will be remove
 */
public CompositeByteBuf removeComponent(int cIndex) {
    checkComponentIndex(cIndex);
    //直接从组件buf集合移除buf
    Component comp = components.remove(cIndex);
    comp.freeIfNecessary();
    if (comp.length > 0) {
        // Only need to call updateComponentOffsets if the length was > 0
        updateComponentOffsets(cIndex);//更新索引对应的字节buf的起止位置
    }
    return this;
}

/**
 * Remove the number of {@link ByteBuf}s starting from the given index.
 *
 * @param cIndex the index on which the {@link ByteBuf}s will be started to removed
 * @param numComponents the number of components to remove
 */
public CompositeByteBuf removeComponents(int cIndex, int numComponents) {
    checkComponentIndex(cIndex, numComponents);

    if (numComponents == 0) {
        return this;
    }
    List<Component> toRemove = components.subList(cIndex, cIndex + numComponents);
    boolean needsUpdate = false;
    for (Component c: toRemove) {
        if (c.length > 0) {
            needsUpdate = true;
        }
        c.freeIfNecessary();
    }
    //移除cIndex到cIndex + numComponents字节buf
    toRemove.clear();

    if (needsUpdate) {
        // Only need to call updateComponentOffsets if the length was > 0
        updateComponentOffsets(cIndex);//更新索引对应的字节buf的起止位置
    }
    return this;
}

再来看返回复合buf迭代器:
 @Override
public Iterator<ByteBuf> iterator() {
    ensureAccessible();
    if (components.isEmpty()) {
        return EMPTY_ITERATOR;
    }
    return new CompositeByteBufIterator();
}

//CompositeByteBufIterator
private final class CompositeByteBufIterator implements Iterator<ByteBuf> {
    private final int size = components.size();
    private int index;

    @Override
    public boolean hasNext() {
        return size > index;
    }

    @Override
    public ByteBuf next() {
        if (size != components.size()) {
            throw new ConcurrentModificationException();
        }
        if (!hasNext()) {
            throw new NoSuchElementException();
        }
        try {
            return components.get(index++).buf;
        } catch (IndexOutOfBoundsException e) {
            throw new ConcurrentModificationException();
        }
    }

    @Override
    public void remove() {
        throw new UnsupportedOperationException("Read-Only");
    }
}


再来从复合buf读取数据到字节buf集合
/**
 * Same with {@link #slice(int, int)} except that this method returns a list.
 */
public List<ByteBuf> decompose(int offset, int length) {
    checkIndex(offset, length);
    if (length == 0) {
        return Collections.emptyList();
    }
    //获取开始位置对应的buf的索引
    int componentId = toComponentIndex(offset);
    List<ByteBuf> slice = new ArrayList<ByteBuf>(components.size());
    // The first component
    Component firstC = components.get(componentId);
    ByteBuf first = firstC.buf.duplicate();
    first.readerIndex(offset - firstC.offset);
    ByteBuf buf = first;
    int bytesToSlice = length;
    do {
        int readableBytes = buf.readableBytes();
        if (bytesToSlice <= readableBytes) {
            // Last component
	    //字节长度小于索引对应的buf的可读字节数
            buf.writerIndex(buf.readerIndex() + bytesToSlice);
	    //直接添加buf
            slice.add(buf);
            break;
        } else {
	    //否则向后继续读取buf,直至读到length长度的字节
            // Not the last component
            slice.add(buf);
            bytesToSlice -= readableBytes;
            componentId ++;

            // Fetch the next component.
            buf = components.get(componentId).buf.duplicate();
        }
    } while (bytesToSlice > 0);

    // Slice all components because only readable bytes are interesting.
    //见字节buf对应slice 映射添加到指定的索引上
    for (int i = 0; i < slice.size(); i ++) {
        slice.set(i, slice.get(i).slice());
    }
    return slice;
}

再来看字节buf相关的方法:
//判断buf是否为direct类型
@Override
public boolean isDirect() {
    int size = components.size();
    if (size == 0) {
        return false;
    }
    for (int i = 0; i < size; i++) {
       if (!components.get(i).buf.isDirect()) {
           return false;
       }
    }
    return true;
}

//判断底层buf是否为字节数组
@Override
public boolean hasArray() {
    switch (components.size()) {
    case 0:
        return true;
    case 1:
        return components.get(0).buf.hasArray();
    default:
        return false;
    }
}

//转化复合buf为字节数组
@Override
public byte[] array() {
    switch (components.size()) {
    case 0:
        return EmptyArrays.EMPTY_BYTES;
    case 1:
        return components.get(0).buf.array();
    default:
        throw new UnsupportedOperationException();
    }
}

//设置读写索引
@Override
public CompositeByteBuf readerIndex(int readerIndex) {
    return (CompositeByteBuf) super.readerIndex(readerIndex);
}

@Override
public CompositeByteBuf writerIndex(int writerIndex) {
    return (CompositeByteBuf) super.writerIndex(writerIndex);
}

@Override
public CompositeByteBuf setIndex(int readerIndex, int writerIndex) {
    return (CompositeByteBuf) super.setIndex(readerIndex, writerIndex);
}

复合buf的其他方法,在附篇,我们只需要知道复合buf实际为字节buf数据概念即可,如果想进一步了解复合buf,可以看附篇的方法。
总结:
复合字节缓冲CompositeByteBuf,内部有一个字节buf数组,用于存放字节buf,每个字节buf添加到复合buf集时,将被包装成一个buf组件,如果添加buf是,复合buf集已满,则将buf集中的所有buf,整合到一个组件buf中,并将原始buf集清空,添加整合后的buf到buf集。复合buf的读写索引为字节buf集的起始索引和size;每个组件buf Component内部记录着字节buf在复合buf中的起始位置和结束位置,及buf可读数据长度。


附:
这部分,不做详细详解,just see see...

添加字节buf到复合buf
/**
 * Add the given {@link ByteBuf}.
 * <p>
 * Be aware that this method does not increase the {@code writerIndex} of the {@link CompositeByteBuf}.
 * If you need to have it increased use {@link #addComponent(boolean, ByteBuf)}.
 添加字节buf到复合buf,不会更新写索引
 * <p>
 * {@link ByteBuf#release()} ownership of {@code buffer} is transfered to this {@link CompositeByteBuf}.
 * @param buffer the {@link ByteBuf} to add. {@link ByteBuf#release()} ownership is transfered to this
 * {@link CompositeByteBuf}.
 */
public CompositeByteBuf addComponent(ByteBuf buffer) {
    return addComponent(false, buffer);
}

/**
 * Add the given {@link ByteBuf}s.
 * <p>
 * Be aware that this method does not increase the {@code writerIndex} of the {@link CompositeByteBuf}.
 * If you need to have it increased use {@link #addComponents(boolean, ByteBuf[])}.
 与上面不同的是,添加的是一个字节buf组
 * <p>
 * {@link ByteBuf#release()} ownership of all {@link ByteBuf} objects in {@code buffers} is transfered to this
 * {@link CompositeByteBuf}.
 * @param buffers the {@link ByteBuf}s to add. {@link ByteBuf#release()} ownership of all {@link ByteBuf#release()}
 * ownership of all {@link ByteBuf} objects is transfered to this {@link CompositeByteBuf}.
 */
public CompositeByteBuf addComponents(ByteBuf... buffers) {
    return addComponents(false, buffers);
}

/**
 * Add the given {@link ByteBuf}s.
 * <p>
 * Be aware that this method does not increase the {@code writerIndex} of the {@link CompositeByteBuf}.
 * If you need to have it increased use {@link #addComponents(boolean, Iterable)}.
  添加具有迭代器属性的字节buf到复合buf,不会更新写索引
 * <p>
 * {@link ByteBuf#release()} ownership of all {@link ByteBuf} objects in {@code buffers} is transfered to this
 * {@link CompositeByteBuf}.
 * @param buffers the {@link ByteBuf}s to add. {@link ByteBuf#release()} ownership of all {@link ByteBuf#release()}
 * ownership of all {@link ByteBuf} objects is transfered to this {@link CompositeByteBuf}.
 */
public CompositeByteBuf addComponents(Iterable<ByteBuf> buffers) {
    return addComponents(false, buffers);
}

/**
 * Add the given {@link ByteBuf} on the specific index.
 * <p>
 * Be aware that this method does not increase the {@code writerIndex} of the {@link CompositeByteBuf}.
 * If you need to have it increased use {@link #addComponent(boolean, int, ByteBuf)}.
 添加到字节buf到复合buf的指定索引上
 * <p>
 * {@link ByteBuf#release()} ownership of {@code buffer} is transfered to this {@link CompositeByteBuf}.
 * @param cIndex the index on which the {@link ByteBuf} will be added.
 * @param buffer the {@link ByteBuf} to add. {@link ByteBuf#release()} ownership is transfered to this
 * {@link CompositeByteBuf}.
 */
public CompositeByteBuf addComponent(int cIndex, ByteBuf buffer) {
    return addComponent(false, cIndex, buffer);
}

/**
 * Add the given {@link ByteBuf} and increase the {@code writerIndex} if {@code increaseWriterIndex} is
 * {@code true}.
 *添加buf到复合buf,并根据increaseWriterIndex参数,决定是否更新复合buf写索引
 * {@link ByteBuf#release()} ownership of {@code buffer} is transfered to this {@link CompositeByteBuf}.
 * @param buffer the {@link ByteBuf} to add. {@link ByteBuf#release()} ownership is transfered to this
 * {@link CompositeByteBuf}.
 */
public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) {
    checkNotNull(buffer, "buffer");
    addComponent0(increaseWriterIndex, components.size(), buffer);
    consolidateIfNeeded();
    return this;
}

/**
 * Add the given {@link ByteBuf}s and increase the {@code writerIndex} if {@code increaseWriterIndex} is
 * {@code true}.
 *与上面方法不同的是,可以添加多个buf
 * {@link ByteBuf#release()} ownership of all {@link ByteBuf} objects in {@code buffers} is transfered to this
 * {@link CompositeByteBuf}.
 * @param buffers the {@link ByteBuf}s to add. {@link ByteBuf#release()} ownership of all {@link ByteBuf#release()}
 * ownership of all {@link ByteBuf} objects is transfered to this {@link CompositeByteBuf}.
 */
public CompositeByteBuf addComponents(boolean increaseWriterIndex, ByteBuf... buffers) {
    addComponents0(increaseWriterIndex, components.size(), buffers, 0, buffers.length);
    consolidateIfNeeded();
    return this;
}

/**
 * Add the given {@link ByteBuf}s and increase the {@code writerIndex} if {@code increaseWriterIndex} is
 * {@code true}.
 * 与上面方法不同的是,添加的buf为集合buf
 * {@link ByteBuf#release()} ownership of all {@link ByteBuf} objects in {@code buffers} is transfered to this
 * {@link CompositeByteBuf}.
 * @param buffers the {@link ByteBuf}s to add. {@link ByteBuf#release()} ownership of all {@link ByteBuf#release()}
 * ownership of all {@link ByteBuf} objects is transfered to this {@link CompositeByteBuf}.
 */
public CompositeByteBuf addComponents(boolean increaseWriterIndex, Iterable<ByteBuf> buffers) {
    addComponents0(increaseWriterIndex, components.size(), buffers);
    consolidateIfNeeded();
    return this;
}

/**
 * Add the given {@link ByteBuf} on the specific index and increase the {@code writerIndex}
 * if {@code increaseWriterIndex} is {@code true}.
 *添加buf到复合buf的指定索引上,并根据increaseWriterIndex参数,决定是否更新复合buf写索引
 * {@link ByteBuf#release()} ownership of {@code buffer} is transfered to this {@link CompositeByteBuf}.
 * @param cIndex the index on which the {@link ByteBuf} will be added.
 * @param buffer the {@link ByteBuf} to add. {@link ByteBuf#release()} ownership is transfered to this
 * {@link CompositeByteBuf}.
 */
public CompositeByteBuf addComponent(boolean increaseWriterIndex, int cIndex, ByteBuf buffer) {
    checkNotNull(buffer, "buffer");
    addComponent0(increaseWriterIndex, cIndex, buffer);
    consolidateIfNeeded();
    return this;
}


get*相关方法:
//获取位置index对应的字节
@Override
public byte getByte(int index) {
    return _getByte(index);
}

@Override
protected byte _getByte(int index) {
    Component c = findComponent(index);
    //委托给组件内部buf对应的方法
    return c.buf.getByte(index - c.offset);
}


//获取索引所在的组件buf
private Component findComponent(int offset) {
    checkIndex(offset);
    for (int low = 0, high = components.size(); low <= high;) {
        int mid = low + high >>> 1;
        Component c = components.get(mid);
        if (offset >= c.endOffset) {
            low = mid + 1;
        } else if (offset < c.offset) {
            high = mid - 1;
        } else {
            assert c.length != 0;
            return c;
        }
    }
    throw new Error("should not reach here");
}


@Override
protected int _getInt(int index) {
    Component c = findComponent(index);
    if (index + 4 <= c.endOffset) {
       //委托给组件内部buf对应的方法
        return c.buf.getInt(index - c.offset);
    } else if (order() == ByteOrder.BIG_ENDIAN) {
        return (_getShort(index) & 0xffff) << 16 | _getShort(index + 2) & 0xffff;
    } else {
        return _getShort(index) & 0xFFFF | (_getShort(index + 2) & 0xFFFF) << 16;
    }
}

其他get*原始类型方法,思路基本一致,首先获取位置所在的组件buf,
再委托给组件内部buf对应的get*方法。
再来看getBytes*相关方法
//读取复合buf数据到字节数组
@Override
public CompositeByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
    checkDstIndex(index, length, dstIndex, dst.length);
    if (length == 0) {
        return this;
    }
    //获取位置index开始的组件buf索引
    int i = toComponentIndex(index);
    while (length > 0) {
        Component c = components.get(i);
        ByteBuf s = c.buf;
        int adjustment = c.offset;
        int localLength = Math.min(length, s.capacity() - (index - adjustment));
	//读取buf数据到目的buf
        s.getBytes(index - adjustment, dst, dstIndex, localLength);
        index += localLength;
        dstIndex += localLength;
        length -= localLength;
        i ++;
    }
    return this;
}

//读取buf到nio 字节buf
@Override
public CompositeByteBuf getBytes(int index, ByteBuffer dst) {
    int limit = dst.limit();
    int length = dst.remaining();

    checkIndex(index, length);
    if (length == 0) {
        return this;
    }

    int i = toComponentIndex(index);
    try {
        while (length > 0) {
            Component c = components.get(i);
            ByteBuf s = c.buf;
            int adjustment = c.offset;
            int localLength = Math.min(length, s.capacity() - (index - adjustment));
	    //调整目的buf limit
            dst.limit(dst.position() + localLength);
	    //读取组件buf数据到目的buf
            s.getBytes(index - adjustment, dst);
            index += localLength;
            length -= localLength;
            i ++;
        }
    } finally {
        dst.limit(limit);
    }
    return this;
}

再来看读取到字节buf,输出流,聚集字节buf
@Override
public CompositeByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
    checkDstIndex(index, length, dstIndex, dst.capacity());
    if (length == 0) {
        return this;
    }

    int i = toComponentIndex(index);
    while (length > 0) {
        Component c = components.get(i);
        ByteBuf s = c.buf;
        int adjustment = c.offset;
        int localLength = Math.min(length, s.capacity() - (index - adjustment));
	//读buf数据,写到目的buf中
        s.getBytes(index - adjustment, dst, dstIndex, localLength);
        index += localLength;
        dstIndex += localLength;
        length -= localLength;
        i ++;
    }
    return this;
}

@Override
public int getBytes(int index, GatheringByteChannel out, int length)
        throws IOException {
    int count = nioBufferCount();
    if (count == 1) {
        return out.write(internalNioBuffer(index, length));
    } else {
        long writtenBytes = out.write(nioBuffers(index, length));
        if (writtenBytes > Integer.MAX_VALUE) {
            return Integer.MAX_VALUE;
        } else {
            return (int) writtenBytes;
        }
    }
}

//获取底层buf数
@Override
public int nioBufferCount() {
    switch (components.size()) {
    case 0:
        return 1;
    case 1:
        return components.get(0).buf.nioBufferCount();
    default:
        int count = 0;
        int componentsCount = components.size();
        for (int i = 0; i < componentsCount; i++) {
            Component c = components.get(i);
            count += c.buf.nioBufferCount();
        }
        return count;
    }
}
//获取底层nio字节buf
@Override
public ByteBuffer internalNioBuffer(int index, int length) {
    switch (components.size()) {
    case 0:
        return EMPTY_NIO_BUFFER;
    case 1:
        return components.get(0).buf.internalNioBuffer(index, length);
    default:
        throw new UnsupportedOperationException();
    }
}
//获取从位置index开始,长度为length字节数据的nio 字节buf
@Override
public ByteBuffer nioBuffer(int index, int length) {
    checkIndex(index, length);

    switch (components.size()) {
    case 0:
        return EMPTY_NIO_BUFFER;
    case 1:
        ByteBuf buf = components.get(0).buf;
        if (buf.nioBufferCount() == 1) {
            return components.get(0).buf.nioBuffer(index, length);
        }
    }

    ByteBuffer merged = ByteBuffer.allocate(length).order(order());
    ByteBuffer[] buffers = nioBuffers(index, length);

    for (ByteBuffer buf: buffers) {
        merged.put(buf);
    }

    merged.flip();
    return merged;
}
//从位置index开始,读取length个字节数据,并将读取的数据添加到nio字节buf数据
@Override
public ByteBuffer[] nioBuffers(int index, int length) {
    checkIndex(index, length);
    if (length == 0) {
        return new ByteBuffer[] { EMPTY_NIO_BUFFER };
    }

    List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(components.size());
    int i = toComponentIndex(index);
    while (length > 0) {
        Component c = components.get(i);
        ByteBuf s = c.buf;
        int adjustment = c.offset;
        int localLength = Math.min(length, s.capacity() - (index - adjustment));
        switch (s.nioBufferCount()) {
            case 0:
                throw new UnsupportedOperationException();
            case 1:
                buffers.add(s.nioBuffer(index - adjustment, localLength));
                break;
            default:
                Collections.addAll(buffers, s.nioBuffers(index - adjustment, localLength));
        }

        index += localLength;
        length -= localLength;
        i ++;
    }
    //转换读取的nio 字节buf集合 为nio字节buf数组
    return buffers.toArray(new ByteBuffer[buffers.size()]);
}

//读取数据到文件通道
@Override
public int getBytes(int index, FileChannel out, long position, int length)
        throws IOException {
    int count = nioBufferCount();
    if (count == 1) {
        return out.write(internalNioBuffer(index, length), position);
    } else {
        long writtenBytes = 0;
        for (ByteBuffer buf : nioBuffers(index, length)) {
            writtenBytes += out.write(buf, position + writtenBytes);
        }
        if (writtenBytes > Integer.MAX_VALUE) {
            return Integer.MAX_VALUE;
        }
        return (int) writtenBytes;
    }
}
//读取数据到输出流
@Override
public CompositeByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
    checkIndex(index, length);
    if (length == 0) {
        return this;
    }

    int i = toComponentIndex(index);
    while (length > 0) {
        Component c = components.get(i);
        ByteBuf s = c.buf;
        int adjustment = c.offset;
        int localLength = Math.min(length, s.capacity() - (index - adjustment));
        s.getBytes(index - adjustment, out, localLength);
        index += localLength;
        length -= localLength;
        i ++;
    }
    return this;
}

从上面可以看出,getBytes*方法,首先定位位置index所在的组件buf,然后通过字节buf
相应的getBytes*方法,将数据写到目的字节buf,nio buf,文件通道,输出流等。



再来看set方法:

@Override
public CompositeByteBuf setByte(int index, int value) {
    Component c = findComponent(index);
    c.buf.setByte(index - c.offset, value);
    return this;
}

@Override
protected void _setByte(int index, int value) {
    setByte(index, value);
}

@Override
public CompositeByteBuf setInt(int index, int value) {
    return (CompositeByteBuf) super.setInt(index, value);
}

@Override
protected void _setInt(int index, int value) {
    Component c = findComponent(index);
    if (index + 4 <= c.endOffset) {
        c.buf.setInt(index - c.offset, value);
    } else if (order() == ByteOrder.BIG_ENDIAN) {
        _setShort(index, (short) (value >>> 16));
        _setShort(index + 2, (short) value);
    } else {
        _setShort(index, (short) value);
        _setShort(index + 2, (short) (value >>> 16));
    }
}

set*原始类型方法,思路基本一致,首先获取位置所在的组件buf,
再委托给组件内部buf对应的set*方法。

再来看setBytes*方法:

@Override
public CompositeByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
    checkSrcIndex(index, length, srcIndex, src.length);
    if (length == 0) {
        return this;
    }

    int i = toComponentIndex(index);
    while (length > 0) {
        Component c = components.get(i);
        ByteBuf s = c.buf;
        int adjustment = c.offset;
        int localLength = Math.min(length, s.capacity() - (index - adjustment));
        s.setBytes(index - adjustment, src, srcIndex, localLength);
        index += localLength;
        srcIndex += localLength;
        length -= localLength;
        i ++;
    }
    return this;
}

@Override
public CompositeByteBuf setBytes(int index, ByteBuffer src) {
    int limit = src.limit();
    int length = src.remaining();

    checkIndex(index, length);
    if (length == 0) {
        return this;
    }

    int i = toComponentIndex(index);
    try {
        while (length > 0) {
            Component c = components.get(i);
            ByteBuf s = c.buf;
            int adjustment = c.offset;
            int localLength = Math.min(length, s.capacity() - (index - adjustment));
            src.limit(src.position() + localLength);
            s.setBytes(index - adjustment, src);
            index += localLength;
            length -= localLength;
            i ++;
        }
    } finally {
        src.limit(limit);
    }
    return this;
}

@Override
public CompositeByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
    checkSrcIndex(index, length, srcIndex, src.capacity());
    if (length == 0) {
        return this;
    }

    int i = toComponentIndex(index);
    while (length > 0) {
        Component c = components.get(i);
        ByteBuf s = c.buf;
        int adjustment = c.offset;
        int localLength = Math.min(length, s.capacity() - (index - adjustment));
        s.setBytes(index - adjustment, src, srcIndex, localLength);
        index += localLength;
        srcIndex += localLength;
        length -= localLength;
        i ++;
    }
    return this;
}

@Override
public int setBytes(int index, InputStream in, int length) throws IOException {
    checkIndex(index, length);
    if (length == 0) {
        return in.read(EmptyArrays.EMPTY_BYTES);
    }

    int i = toComponentIndex(index);
    int readBytes = 0;

    do {
        Component c = components.get(i);
        ByteBuf s = c.buf;
        int adjustment = c.offset;
        int localLength = Math.min(length, s.capacity() - (index - adjustment));
        if (localLength == 0) {
            // Skip empty buffer
            i++;
            continue;
        }
        int localReadBytes = s.setBytes(index - adjustment, in, localLength);
        if (localReadBytes < 0) {
            if (readBytes == 0) {
                return -1;
            } else {
                break;
            }
        }

        if (localReadBytes == localLength) {
            index += localLength;
            length -= localLength;
            readBytes += localLength;
            i ++;
        } else {
            index += localReadBytes;
            length -= localReadBytes;
            readBytes += localReadBytes;
        }
    } while (length > 0);

    return readBytes;
}

@Override
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
    checkIndex(index, length);
    if (length == 0) {
        return in.read(EMPTY_NIO_BUFFER);
    }

    int i = toComponentIndex(index);
    int readBytes = 0;
    do {
        Component c = components.get(i);
        ByteBuf s = c.buf;
        int adjustment = c.offset;
        int localLength = Math.min(length, s.capacity() - (index - adjustment));
        if (localLength == 0) {
            // Skip empty buffer
            i++;
            continue;
        }
        int localReadBytes = s.setBytes(index - adjustment, in, localLength);

        if (localReadBytes == 0) {
            break;
        }

        if (localReadBytes < 0) {
            if (readBytes == 0) {
                return -1;
            } else {
                break;
            }
        }

        if (localReadBytes == localLength) {
            index += localLength;
            length -= localLength;
            readBytes += localLength;
            i ++;
        } else {
            index += localReadBytes;
            length -= localReadBytes;
            readBytes += localReadBytes;
        }
    } while (length > 0);

    return readBytes;
}

@Override
public int setBytes(int index, FileChannel in, long position, int length) throws IOException {
    checkIndex(index, length);
    if (length == 0) {
        return in.read(EMPTY_NIO_BUFFER, position);
    }

    int i = toComponentIndex(index);
    int readBytes = 0;
    do {
        Component c = components.get(i);
        ByteBuf s = c.buf;
        int adjustment = c.offset;
        int localLength = Math.min(length, s.capacity() - (index - adjustment));
        if (localLength == 0) {
            // Skip empty buffer
            i++;
            continue;
        }
        int localReadBytes = s.setBytes(index - adjustment, in, position + readBytes, localLength);

        if (localReadBytes == 0) {
            break;
        }

        if (localReadBytes < 0) {
            if (readBytes == 0) {
                return -1;
            } else {
                break;
            }
        }

        if (localReadBytes == localLength) {
            index += localLength;
            length -= localLength;
            readBytes += localLength;
            i ++;
        } else {
            index += localReadBytes;
            length -= localReadBytes;
            readBytes += localReadBytes;
        }
    } while (length > 0);

    return readBytes;
}


setBytes*方法,首先定位位置index所在的组件buf,然后通过字节buf
相应的setBytes*方法,将字节数组,字节buf,nio buf,文件通道,输出流等数据写到当前复合buf中。

再来看readBytes方法:

@Override
public CompositeByteBuf readBytes(ByteBuf dst) {
    return (CompositeByteBuf) super.readBytes(dst);
}

@Override
public CompositeByteBuf readBytes(ByteBuf dst, int length) {
    return (CompositeByteBuf) super.readBytes(dst, length);
}

@Override
public CompositeByteBuf readBytes(ByteBuf dst, int dstIndex, int length) {
    return (CompositeByteBuf) super.readBytes(dst, dstIndex, length);
}

@Override
public CompositeByteBuf readBytes(byte[] dst) {
    return (CompositeByteBuf) super.readBytes(dst);
}

@Override
public CompositeByteBuf readBytes(byte[] dst, int dstIndex, int length) {
    return (CompositeByteBuf) super.readBytes(dst, dstIndex, length);
}

@Override
public CompositeByteBuf readBytes(ByteBuffer dst) {
    return (CompositeByteBuf) super.readBytes(dst);
}

@Override
public CompositeByteBuf readBytes(OutputStream out, int length) throws IOException {
    return (CompositeByteBuf) super.readBytes(out, length);
}

从上来看readBytes*方法,委托给父类抽象buf的readBytes*方法,实际通过getBytes*方法。


再来看write*方法:

@Override
public CompositeByteBuf writeByte(int value) {
    return (CompositeByteBuf) super.writeByte(value);
}
@Override
public CompositeByteBuf writeInt(int value) {
    return (CompositeByteBuf) super.writeInt(value);
}

@Override
public CompositeByteBuf writeBytes(ByteBuf src) {
    return (CompositeByteBuf) super.writeBytes(src);
}

@Override
public CompositeByteBuf writeBytes(ByteBuf src, int length) {
    return (CompositeByteBuf) super.writeBytes(src, length);
}

@Override
public CompositeByteBuf writeBytes(ByteBuf src, int srcIndex, int length) {
    return (CompositeByteBuf) super.writeBytes(src, srcIndex, length);
}

@Override
public CompositeByteBuf writeBytes(byte[] src) {
    return (CompositeByteBuf) super.writeBytes(src);
}

@Override
public CompositeByteBuf writeBytes(byte[] src, int srcIndex, int length) {
    return (CompositeByteBuf) super.writeBytes(src, srcIndex, length);
}

@Override
public CompositeByteBuf writeBytes(ByteBuffer src) {
    return (CompositeByteBuf) super.writeBytes(src);
}

从上来看write*方法,委托给父类抽象buf的write*方法,实际通过write*方法。


再来看其他方法,看看就行:

@Override
public CompositeByteBuf discardSomeReadBytes() {
    return discardReadComponents();
}

 /**
  * Discard all {@link ByteBuf}s which are read.
  丢弃所有已经读取的字节buf组件
  */
 public CompositeByteBuf discardReadComponents() {
     ensureAccessible();
     final int readerIndex = readerIndex();
     if (readerIndex == 0) {
         return this;
     }

     // Discard everything if (readerIndex = writerIndex = capacity).
     int writerIndex = writerIndex();
     if (readerIndex == writerIndex && writerIndex == capacity()) {
         for (Component c: components) {
             c.freeIfNecessary();
         }
         components.clear();
         setIndex(0, 0);
         adjustMarkers(readerIndex);
         return this;
     }

     // Remove read components.
     int firstComponentId = toComponentIndex(readerIndex);
     for (int i = 0; i < firstComponentId; i ++) {
         components.get(i).freeIfNecessary();
     }
     components.subList(0, firstComponentId).clear();

     // Update indexes and markers.
     Component first = components.get(0);
     int offset = first.offset;
     updateComponentOffsets(0);
     setIndex(readerIndex - offset, writerIndex - offset);
     adjustMarkers(offset);
     return this;
 }

//丢弃已读buf组件
 @Override
 public CompositeByteBuf discardReadBytes() {
     ensureAccessible();
     final int readerIndex = readerIndex();
     if (readerIndex == 0) {
         return this;
     }

     // Discard everything if (readerIndex = writerIndex = capacity).
     int writerIndex = writerIndex();
     //已读完,则清空组件buf集
     if (readerIndex == writerIndex && writerIndex == capacity()) {
         for (Component c: components) {
             c.freeIfNecessary();
         }
         components.clear();
         setIndex(0, 0);
         adjustMarkers(readerIndex);
         return this;
     }
     //丢弃已经读取的组件buf
     // Remove read components.
     int firstComponentId = toComponentIndex(readerIndex);
     for (int i = 0; i < firstComponentId; i ++) {
         components.get(i).freeIfNecessary();
     }
     components.subList(0, firstComponentId).clear();

     // Remove or replace the first readable component with a new slice.
     Component c = components.get(0);
     int adjustment = readerIndex - c.offset;
     if (adjustment == c.length) {
         // new slice would be empty, so remove instead
         components.remove(0);
     } else {
         Component newC = new Component(c.buf.slice(adjustment, c.length - adjustment));
         components.set(0, newC);
     }

     // Update indexes and markers.
     updateComponentOffsets(0);
     setIndex(0, writerIndex - readerIndex);
     adjustMarkers(readerIndex);
     return this;
 }
//释放复合buf
@Override
protected void deallocate() {
    if (freed) {
        return;
    }

    freed = true;
    int size = components.size();
    // We're not using foreach to avoid creating an iterator.
    // see https://github.com/netty/netty/issues/2642
    for (int i = 0; i < size; i++) {
        components.get(i).freeIfNecessary();
    }
}

@Override
public ByteBuf unwrap() {
    return null;
}

@Override
public boolean hasMemoryAddress() {
    switch (components.size()) {
    case 0:
        return Unpooled.EMPTY_BUFFER.hasMemoryAddress();
    case 1:
        return components.get(0).buf.hasMemoryAddress();
    default:
        return false;
    }
}

@Override
public long memoryAddress() {
    switch (components.size()) {
    case 0:
        return Unpooled.EMPTY_BUFFER.memoryAddress();
    case 1:
        return components.get(0).buf.memoryAddress();
    default:
        throw new UnsupportedOperationException();
    }
}

@Override
public int capacity() {
    final int numComponents = components.size();
    if (numComponents == 0) {
        return 0;
    }
    return components.get(numComponents - 1).endOffset;
}

@Override
public CompositeByteBuf retain(int increment) {
    return (CompositeByteBuf) super.retain(increment);
}

@Override
public CompositeByteBuf retain() {
    return (CompositeByteBuf) super.retain();
}

@Override
public CompositeByteBuf touch() {
    return this;
}

@Override
public CompositeByteBuf touch(Object hint) {
    return this;
}

@Override
public ByteBuf copy(int index, int length) {
    checkIndex(index, length);
    ByteBuf dst = Unpooled.buffer(length);
    if (length != 0) {
        copyTo(index, length, toComponentIndex(index), dst);
    }
    return dst;
}

private void copyTo(int index, int length, int componentId, ByteBuf dst) {
    int dstIndex = 0;
    int i = componentId;

    while (length > 0) {
        Component c = components.get(i);
        ByteBuf s = c.buf;
        int adjustment = c.offset;
        int localLength = Math.min(length, s.capacity() - (index - adjustment));
        s.getBytes(index - adjustment, dst, dstIndex, localLength);
        index += localLength;
        dstIndex += localLength;
        length -= localLength;
        i ++;
    }

    dst.writerIndex(dst.capacity());
}
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics