`
Donald_Draper
  • 浏览: 950750 次
社区版块
存档分类
最新评论

netty 抽象字节buf解析

阅读更多
netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
netty 通道Outbound缓冲区:http://donald-draper.iteye.com/blog/2393098
netty 抽象通道后续:http://donald-draper.iteye.com/blog/2393166
netty 抽象nio通道:http://donald-draper.iteye.com/blog/2393269
netty 抽象nio字节通道:http://donald-draper.iteye.com/blog/2393323
netty 抽象nio消息通道:http://donald-draper.iteye.com/blog/2393364
netty NioServerSocketChannel解析:http://donald-draper.iteye.com/blog/2393443
netty 通道配置接口定义:http://donald-draper.iteye.com/blog/2393484
netty 默认通道配置初始化:http://donald-draper.iteye.com/blog/2393504
netty 默认通道配置后续:http://donald-draper.iteye.com/blog/2393510
netty 字节buf定义:http://donald-draper.iteye.com/blog/2393813
netty 资源泄漏探测器:http://donald-draper.iteye.com/blog/2393940
引言
本打算上一文章看一抽象字节buf,但中途遇到了资源泄漏探测器,就简单分析了资源泄漏探测器,今天我们回到抽象字节buf,先来回顾字节buf接口的定义:
对象引用计数器ReferenceCounted,主要记录对象的引用数量,当引用数量为0时,表示可以回收对象,在调试模式下,如果发现对象出现内存泄漏,可以用touch方法记录操作的相关信息,通过ResourceLeakDetector获取操作的相关信息,以便分析内存泄漏的原因。

字节缓存ByteBuf继承了对象引用计数器ReferenceCounted,拥有一个最大容量限制,如果用户尝试用 #capacity(int)和 #ensureWritable(int)方法,增加buf容量超过最大容量,将会抛出非法参数异常;字节buf有两个索引,一个为读索引readerIndex,一个为写索引writerIndex,读索引不能大于写索引,写索引不能小于读索引,buf可读字节数为writerIndex - readerIndex,buf可写字节数为capacity - writerIndex,buf可写的最大字节数为maxCapacity - writerIndex;

可以使用markReader/WriterIndex标记当前buf读写索引位置,resetReader/WriterIndex方法可以重回先前标记的索引位置;

当内存空间负载过度时,我们可以使用discardReadBytes丢弃一些数据,以节省空间;

我们可以使用ensureWritable检测当buf是否有足够的空间写数据;

提供了getBytes方法,可以将buf中的数据转移到目的ByteBuf,Byte数组,Nio字节buf ByteBuffer,OutputStream,聚集字节通道
GatheringByteChannel和文件通道FileChannel中,这些方法不会修改当前buf读写索引,具体是否修改目的对象索引或位置,见java doc 描述。

提供了setBytes方法,可以将源ByteBuf,Byte数组,Nio字节buf ByteBuffer,InputputStream,分散字节通道ScatteringByteChannel和文件通道FileChannel中的数据转移到当前buf中,这些方法不会修改当前buf的读写索引,至于源对象索引或位置,见java doc 描述。

提供了readBytes方法,可以将buf中的数据转移到目的ByteBuf,Byte数组,Nio字节buf ByteBuffer,OutputStream,聚集字节通道GatheringByteChannel和文件通道FileChannel中,这些方法具体会会修改当前buf读索引,至于会不会修改源对象索引或位置,见java doc 描述。

提供了writeBytes方法,可以将源ByteBuf,Byte数组,Nio字节buf ByteBuffer,
InputputStream,分散字节通道ScatteringByteChannel和文件通道FileChannel中的数据写到当前buf中,这些方法会修改当前buf的写索引,至于会不会修改源对象索引或位置,见java
doc 描述。


set*原始类型方法不会修改读写索引;
get*原始类型方法不会修改读写索引;

write*原始类型方法会修改写索引;
read*原始类型方法,会修改读索引;

字节buf中的set/get*方法不会修改当前buf的读写索引,而write*修改写索引,read*会修改读索引;

提供了copy,slice和retainSlice,duplicate和retainedDuplicate方法,用于拷贝,切割,复制当前buf数据,retained*方法会增加buf的
引用计数器;

提供nioBuffer和nioBuffers方法,用于包装当前buf可读数据为java nio ByteBuffer和ByteBuffer数组。

今天我们来看一下抽象字节buf的定义
package io.netty.buffer;

import io.netty.util.ByteProcessor;
import io.netty.util.CharsetUtil;
import io.netty.util.IllegalReferenceCountException;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.ResourceLeakDetectorFactory;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.nio.charset.Charset;

import static io.netty.util.internal.MathUtil.isOutOfBounds;

/**
 * A skeletal implementation of a buffer.
 */
public abstract class AbstractByteBuf extends ByteBuf {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractByteBuf.class);
    private static final String PROP_MODE = "io.netty.buffer.bytebuf.checkAccessible";
    private static final boolean checkAccessible;//访问buf时,是否可需要检查

    static {
        checkAccessible = SystemPropertyUtil.getBoolean(PROP_MODE, true);
        if (logger.isDebugEnabled()) {
            logger.debug("-D{}: {}", PROP_MODE, checkAccessible);
        }
    }
    //内存泄漏探测器
    static final ResourceLeakDetector<ByteBuf> leakDetector =
            ResourceLeakDetectorFactory.instance().newResourceLeakDetector(ByteBuf.class);

    int readerIndex;//读索引
    int writerIndex;//写索引
    private int markedReaderIndex;//读索引标记
    private int markedWriterIndex;//写索引标记
    private int maxCapacity;//最大容量

    protected AbstractByteBuf(int maxCapacity) {
        if (maxCapacity < 0) {
            throw new IllegalArgumentException("maxCapacity: " + maxCapacity + " (expected: >= 0)");
        }
        this.maxCapacity = maxCapacity;
    }:
}

从上面来看,字节buf内部有两个索引,一个读索引,一个写索引,两个索引标记,即读写索引对应的标记,buf的最大容量为maxCapacity;buf的构造,主要是初始化最大容量。

先来看读写索引相关的操作,很简单,看一下就了。
字节buf,读写索引设值获取、标记重置相关方法:
@Override
public boolean isReadOnly() {
    return false;
}

@SuppressWarnings("deprecation")
@Override
public ByteBuf asReadOnly() {
    if (isReadOnly()) {
        return this;
    }
    return Unpooled.unmodifiableBuffer(this);
}

@Override
public int maxCapacity() {
    return maxCapacity;
}

protected final void maxCapacity(int maxCapacity) {
    this.maxCapacity = maxCapacity;
}

@Override
public int readerIndex() {
    return readerIndex;
}

@Override
public ByteBuf readerIndex(int readerIndex) {
    if (readerIndex < 0 || readerIndex > writerIndex) {
        throw new IndexOutOfBoundsException(String.format(
                "readerIndex: %d (expected: 0 <= readerIndex <= writerIndex(%d))", readerIndex, writerIndex));
    }
    this.readerIndex = readerIndex;
    return this;
}

@Override
public int writerIndex() {
    return writerIndex;
}

@Override
public ByteBuf writerIndex(int writerIndex) {
    if (writerIndex < readerIndex || writerIndex > capacity()) {
        throw new IndexOutOfBoundsException(String.format(
                "writerIndex: %d (expected: readerIndex(%d) <= writerIndex <= capacity(%d))",
                writerIndex, readerIndex, capacity()));
    }
    this.writerIndex = writerIndex;
    return this;
}


@Override
public ByteBuf clear() {
    readerIndex = writerIndex = 0;
    return this;
}

@Override
public boolean isReadable() {
    return writerIndex > readerIndex;
}

@Override
public boolean isReadable(int numBytes) {
    return writerIndex - readerIndex >= numBytes;
}

@Override
public boolean isWritable() {
    return capacity() > writerIndex;
}

@Override
public boolean isWritable(int numBytes) {
    return capacity() - writerIndex >= numBytes;
}

@Override
public int readableBytes() {
    return writerIndex - readerIndex;
}

@Override
public int writableBytes() {
    return capacity() - writerIndex;
}

@Override
public int maxWritableBytes() {
    return maxCapacity() - writerIndex;
}

@Override
public ByteBuf markReaderIndex() {
    markedReaderIndex = readerIndex;
    return this;
}

@Override
public ByteBuf resetReaderIndex() {
    readerIndex(markedReaderIndex);
    return this;
}

@Override
public ByteBuf markWriterIndex() {
    markedWriterIndex = writerIndex;
    return this;
}

@Override
public ByteBuf resetWriterIndex() {
    writerIndex = markedWriterIndex;
    return this;
}


来看设置读写索引方法
@Override
public ByteBuf setIndex(int readerIndex, int writerIndex) {
    if (readerIndex < 0 || readerIndex > writerIndex || writerIndex > capacity()) {
        throw new IndexOutOfBoundsException(String.format(
                "readerIndex: %d, writerIndex: %d (expected: 0 <= readerIndex <= writerIndex <= capacity(%d))",
                readerIndex, writerIndex, capacity()));
    }
    setIndex0(readerIndex, writerIndex);
    return this;
}

final void setIndex0(int readerIndex, int writerIndex) {
    this.readerIndex = readerIndex;
    this.writerIndex = writerIndex;
}

再来看丢弃已经读过的字节数据:

@Override
public ByteBuf discardReadBytes() {
    ensureAccessible();
    if (readerIndex == 0) {
        return this;
    }
    if (readerIndex != writerIndex) {
        //更新索引
        setBytes(0, this, readerIndex, writerIndex - readerIndex);
        writerIndex -= readerIndex;
	////更新读写索引标记
        adjustMarkers(readerIndex);
        readerIndex = 0;
    } else {
        adjustMarkers(readerIndex);
        writerIndex = readerIndex = 0;
    }
    return this;
}


/**
 * Should be called by every method that tries to access the buffers content to check
 * if the buffer was released before.
 确保buf可以访问
 */
protected final void ensureAccessible() {
    if (checkAccessible && refCnt() == 0) {
        throw new IllegalReferenceCountException(0);
    }
}

//更新读写索引标记
protected final void adjustMarkers(int decrement) {
    int markedReaderIndex = this.markedReaderIndex;
    if (markedReaderIndex <= decrement) {
        this.markedReaderIndex = 0;
        int markedWriterIndex = this.markedWriterIndex;
        if (markedWriterIndex <= decrement) {
            this.markedWriterIndex = 0;
        } else {
            this.markedWriterIndex = markedWriterIndex - decrement;
        }
    } else {
        this.markedReaderIndex = markedReaderIndex - decrement;
        markedWriterIndex -= decrement;
    }
}


从上面可以看出,丢弃已读数据方法discardReadBytes,丢弃buf数据时,只修改读写索引和相应的标记,
并不删除数据。

再来看,根据负载丢弃数据方法:
@Override
public ByteBuf discardSomeReadBytes() {
    ensureAccessible();
    if (readerIndex == 0) {
        return this;
    }

    if (readerIndex == writerIndex) {
        //读写索引相等,则更新读写索引为0
        adjustMarkers(readerIndex);
        writerIndex = readerIndex = 0;
        return this;
    }

    if (readerIndex >= capacity() >>> 1) {
        //丢弃已读的数据,与discardReadBytes方法作用相同
        setBytes(0, this, readerIndex, writerIndex - readerIndex);
        writerIndex -= readerIndex;
        adjustMarkers(readerIndex);
        readerIndex = 0;
    }
    return this;
}

来看get*方法:
来看获取一个字节:
@Override
public byte getByte(int index) {
    checkIndex(index);
    return _getByte(index);
}
//待子类扩展
protected abstract byte _getByte(int index);

检查索引是否越界
protected final void checkIndex(int index) {
    checkIndex(index, 1);
}

protected final void checkIndex(int index, int fieldLength) {
    ensureAccessible();
    checkIndex0(index, fieldLength);
}

final void checkIndex0(int index, int fieldLength) {
    if (isOutOfBounds(index, fieldLength, capacity())) {
        throw new IndexOutOfBoundsException(String.format(
                "index: %d, length: %d (expected: range(0, %d))", index, fieldLength, capacity()));
    }
}


//MathUtil
package io.netty.util.internal;
/**
 * Math utility methods.
 */
public final class MathUtil {
    /**
     * Determine if the requested {@code index} and {@code length} will fit within {@code capacity}.
     * @param index The starting index.
     * @param length The length which will be utilized (starting from {@code index}).
     * @param capacity The capacity that {@code index + length} is allowed to be within.
     * @return {@code true} if the requested {@code index} and {@code length} will fit within {@code capacity}.
     * {@code false} if this would result in an index out of bounds exception.
     */
    public static boolean isOutOfBounds(int index, int length, int capacity) {
        return (index | length | (index + length) | (capacity - (index + length))) < 0;
    }
    ...
}

再看获取一个int值:

@Override
public int getInt(int index) {
    checkIndex(index, 4);
    return _getInt(index);
}

protected abstract int _getInt(int index);


其他get原始类型方法,思路基本相同;

来看getBytes(*)方法:


@Override
public ByteBuf getBytes(int index, ByteBuf dst) {
    getBytes(index, dst, dst.writableBytes());
    return this;
}

@Override
public ByteBuf getBytes(int index, ByteBuf dst, int length) {
    getBytes(index, dst, dst.writerIndex(), length);
    //更新目的buf索引
    dst.writerIndex(dst.writerIndex() + length);
    return this;
}

@Override
public ByteBuf getBytes(int index, byte[] dst) {
    getBytes(index, dst, 0, dst.length);
    return this;
}


字节buf接口定义中:
public abstract ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length);
public abstract ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length);


从上面可以看出,getBytes(...,ByteBuf,...)方法不会修改当前buf读写索引,会修改目的buf的写索引。getBytes(...,byte[],...)方法不会修改当前buf读写索引。


@Override
public CharSequence readCharSequence(int length, Charset charset) {
    CharSequence sequence = getCharSequence(readerIndex, length, charset);
    readerIndex += length;
    return sequence;
}


@Override
public CharSequence getCharSequence(int index, int length, Charset charset) {
    // TODO: We could optimize this for UTF8 and US_ASCII
    return toString(index, length, charset);
}

@Override
public String toString(Charset charset) {
    return toString(readerIndex, readableBytes(), charset);
}

@Override
public String toString(int index, int length, Charset charset) {
    return ByteBufUtil.decodeString(this, index, length, charset);
}

//ByteBufUtil
/**
 * A collection of utility methods that is related with handling {@link ByteBuf},
 * such as the generation of hex dump and swapping an integer's byte order.
 */
public final class ByteBufUtil {
   //Java nio CharBuffer 线程本地字符buf
   private static final FastThreadLocal<CharBuffer> CHAR_BUFFERS = new FastThreadLocal<CharBuffer>() {
           @Override
           protected CharBuffer initialValue() throws Exception {
               return CharBuffer.allocate(1024);
           }
       };
   //根据字符编码解析字节buf为字符串
   static String decodeString(ByteBuf src, int readerIndex, int len, Charset charset) {
       if (len == 0) {
           return StringUtil.EMPTY_STRING;
       }
       //获取字符编码
       final CharsetDecoder decoder = CharsetUtil.decoder(charset);
       final int maxLength = (int) ((double) len * decoder.maxCharsPerByte());
       //获取线程本地字符buf
       CharBuffer dst = CHAR_BUFFERS.get();
       if (dst.length() < maxLength) {
          //重新分配maxLength长度的字符buf
           dst = CharBuffer.allocate(maxLength);
           if (maxLength <= MAX_CHAR_BUFFER_SIZE) {
	      //添加buf到线程本地字符buf缓存
               CHAR_BUFFERS.set(dst);
           }
       } else {
           //清除线程本地字符buf
           dst.clear();
       }
       if (src.nioBufferCount() == 1) {
           // Use internalNioBuffer(...) to reduce object creation.
	   //使用源字节buf的内部nio 字节buf,解码数据
           decodeString(decoder, src.internalNioBuffer(readerIndex, len), dst);
       } else {
           // We use a heap buffer as CharsetDecoder is most likely able to use a fast-path if src and dst buffers
           // are both backed by a byte array.
	   //否则分配一个堆buf
           ByteBuf buffer = src.alloc().heapBuffer(len);
           try {
	       //将源buf数据写到,写到新的堆buf中
               buffer.writeBytes(src, readerIndex, len);
               // Use internalNioBuffer(...) to reduce object creation.
                //使用buf的内部nio 字节buf,解码数据
               decodeString(decoder, buffer.internalNioBuffer(buffer.readerIndex(), len), dst);
           } finally {
               // Release the temporary buffer again. 释放buf
               buffer.release();
           }
       }
       //返回解码结果
       return dst.flip().toString();
   }
   //根据字符解码器器,解码nio字节buf数据到字符buf
   private static void decodeString(CharsetDecoder decoder, ByteBuffer src, CharBuffer dst) {
       try {
           //委托给字符解码器
           CoderResult cr = decoder.decode(src, dst, true);
           if (!cr.isUnderflow()) {
               cr.throwException();
           }
	   //将解码后的数据写到目的字符buf中
           cr = decoder.flush(dst);
           if (!cr.isUnderflow()) {
               cr.throwException();
           }
       } catch (CharacterCodingException x) {
           throw new IllegalStateException(x);
       }
   }
}


回到抽象字节buf,来看set方法:

@Override
public ByteBuf setByte(int index, int value) {
    checkIndex(index);
    _setByte(index, value);
    return this;
}
//待子类扩展
protected abstract void _setByte(int index, int value);

@Override
    public ByteBuf setInt(int index, int value) {
        checkIndex(index, 4);
        _setInt(index, value);
        return this;
    }
//待子类扩展
protected abstract void _setInt(int index, int value);


其他set原始类型方法,思路基本相同。

再来看setBytes(*)系列方法:

@Override
public ByteBuf setBytes(int index, ByteBuf src) {
    setBytes(index, src, src.readableBytes());
    return this;
}

@Override
public ByteBuf setBytes(int index, ByteBuf src, int length) {
    checkIndex(index, length);
    if (src == null) {
        throw new NullPointerException("src");
    }
    if (length > src.readableBytes()) {
        throw new IndexOutOfBoundsException(String.format(
                "length(%d) exceeds src.readableBytes(%d) where src is: %s", length, src.readableBytes(), src));
    }
    setBytes(index, src, src.readerIndex(), length);
    //更新源字节buf的读索引
    src.readerIndex(src.readerIndex() + length);
    return this;
}


@Override
public ByteBuf setBytes(int index, byte[] src) {
    setBytes(index, src, 0, src.length);
    return this;
}

//ByteBuf
public abstract ByteBuf setBytes(int index, byte[] src, int srcIndex, int length);
public abstract ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length);



从上面可以看出,setBytes(...,ByteBuf,...)方法不会修改当前buf读写索引,会修改源buf的读索引。setBytes(...,byte[],...)方法不会修改当前buf读写索引。


来set写字符序列
@Override
public int setCharSequence(int index, CharSequence sequence, Charset charset) {
    if (charset.equals(CharsetUtil.UTF_8)) {
        //确保有足有容量可写
        ensureWritable(ByteBufUtil.utf8MaxBytes(sequence));
        return ByteBufUtil.writeUtf8(this, index, sequence, sequence.length());
    }
    if (charset.equals(CharsetUtil.US_ASCII)) {
        //ASCII编码,长度为字符序列长度
        int len = sequence.length();
        ensureWritable(len);
        return ByteBufUtil.writeAscii(this, index, sequence, len);
    }
    //获取字符序列,charset编码对应的字节数组
    byte[] bytes = sequence.toString().getBytes(charset);
    //确保buf容量足够
    ensureWritable(bytes.length);
    //委托给setBytes
    setBytes(index, bytes);
    return bytes.length;
}

//ByteBufUtil
/**
 * Returns max bytes length of UTF8 character sequence.
 返回字符系列UTF-8编码的长度
 */
public static int utf8MaxBytes(CharSequence seq) {
    return seq.length() * MAX_BYTES_PER_CHAR_UTF8;
}

// Fast-Path implementation
static int writeUtf8(AbstractByteBuf buffer, int writerIndex, CharSequence seq, int len) {
    int oldWriterIndex = writerIndex;

    // We can use the _set methods as these not need to do any index checks and reference checks.
    // This is possible as we called ensureWritable(...) before.
    for (int i = 0; i < len; i++) {
       //遍历字符序列,将字符编码成UTF-8字节,写入字节buf中
        char c = seq.charAt(i);
        if (c < 0x80) {
            buffer._setByte(writerIndex++, (byte) c);
        } else if (c < 0x800) {
            buffer._setByte(writerIndex++, (byte) (0xc0 | (c >> 6)));
            buffer._setByte(writerIndex++, (byte) (0x80 | (c & 0x3f)));
        } else if (isSurrogate(c)) {
            if (!Character.isHighSurrogate(c)) {
                buffer._setByte(writerIndex++, WRITE_UTF_UNKNOWN);
                continue;
            }
            final char c2;
            try {
                // Surrogate Pair consumes 2 characters. Optimistically try to get the next character to avoid
                // duplicate bounds checking with charAt. If an IndexOutOfBoundsException is thrown we will
                // re-throw a more informative exception describing the problem.
                c2 = seq.charAt(++i);
            } catch (IndexOutOfBoundsException e) {
                buffer._setByte(writerIndex++, WRITE_UTF_UNKNOWN);
                break;
            }
            if (!Character.isLowSurrogate(c2)) {
                buffer._setByte(writerIndex++, WRITE_UTF_UNKNOWN);
                buffer._setByte(writerIndex++, Character.isHighSurrogate(c2) ? WRITE_UTF_UNKNOWN : c2);
                continue;
            }
            int codePoint = Character.toCodePoint(c, c2);
            // See http://www.unicode.org/versions/Unicode7.0.0/ch03.pdf#G2630.
            buffer._setByte(writerIndex++, (byte) (0xf0 | (codePoint >> 18)));
            buffer._setByte(writerIndex++, (byte) (0x80 | ((codePoint >> 12) & 0x3f)));
            buffer._setByte(writerIndex++, (byte) (0x80 | ((codePoint >> 6) & 0x3f)));
            buffer._setByte(writerIndex++, (byte) (0x80 | (codePoint & 0x3f)));
        } else {
            buffer._setByte(writerIndex++, (byte) (0xe0 | (c >> 12)));
            buffer._setByte(writerIndex++, (byte) (0x80 | ((c >> 6) & 0x3f)));
            buffer._setByte(writerIndex++, (byte) (0x80 | (c & 0x3f)));
        }
    }
    return writerIndex - oldWriterIndex;
}

// Fast-Path implementation
static int writeAscii(AbstractByteBuf buffer, int writerIndex, CharSequence seq, int len) {

    // We can use the _set methods as these not need to do any index checks and reference checks.
    // This is possible as we called ensureWritable(...) before.
    for (int i = 0; i < len; i++) {
        buffer._setByte(writerIndex++, (byte) seq.charAt(i));
    }
    return len;
}



回到抽象字节buf,来看read*相关方法:
先来看读一个字节数据
@Override
public byte readByte() {
   //检查是否有1个可读字节数据
    checkReadableBytes0(1);
    int i = readerIndex;
    //获取读索引对应的字节
    byte b = _getByte(i);
    //更新读索引
    readerIndex = i + 1;
    return b;
}

//检查是否有minimumReadableBytes个可读字节数据
private void checkReadableBytes0(int minimumReadableBytes) {
    ensureAccessible();
    if (readerIndex > writerIndex - minimumReadableBytes) {
        throw new IndexOutOfBoundsException(String.format(
                "readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s",
                readerIndex, minimumReadableBytes, writerIndex, this));
    }
}

@Override
public int readInt() {
    //读一个int值,所以检查是否有4个可读字节数据
    checkReadableBytes0(4);
    //委托给_getInt
    int v = _getInt(readerIndex);
    readerIndex += 4;
    return v;
}

其他get*原始类型方法,思路基本相同

来看readBytes(*)相关方法:
//读取当前buf数据,写到字节数组中
@Override
public ByteBuf readBytes(byte[] dst) {
    readBytes(dst, 0, dst.length);
    return this;
}
@Override
public ByteBuf readBytes(byte[] dst, int dstIndex, int length) {
    checkReadableBytes(length);
    //委托getBytes
    getBytes(readerIndex, dst, dstIndex, length);
    readerIndex += length;//更新读索引
    return this;
}

从上面来看readBytes(byte[],...),会修改当前buf的读索引
@Override
public ByteBuf readBytes(ByteBuf dst) {
    readBytes(dst, dst.writableBytes());
    return this;
}

//读当前buf数据,写到目的字节buf中
@Override
public ByteBuf readBytes(ByteBuf dst, int length) {
    if (length > dst.writableBytes()) {
        throw new IndexOutOfBoundsException(String.format(
                "length(%d) exceeds dst.writableBytes(%d) where dst is: %s", length, dst.writableBytes(), dst));
    }
    readBytes(dst, dst.writerIndex(), length);
    dst.writerIndex(dst.writerIndex() + length);//更新目的buf的写索引
    return this;
}

@Override
public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) {
    checkReadableBytes(length);
    getBytes(readerIndex, dst, dstIndex, length);//更新目的buf的写索引
    readerIndex += length;//更新读索引
    return this;
}

//读当前buf数据到nio 字节buf
@Override
public ByteBuf readBytes(ByteBuffer dst) {
    int length = dst.remaining();
    checkReadableBytes(length);
    getBytes(readerIndex, dst);
    readerIndex += length;
    return this;
}

//ByteBuf
public abstract ByteBuf getBytes(int index, ByteBuffer dst);


//读当前buf数据到GatheringByteChannel
@Override
public int readBytes(GatheringByteChannel out, int length)
        throws IOException {
    checkReadableBytes(length);
    int readBytes = getBytes(readerIndex, out, length);
    readerIndex += readBytes;
    return readBytes;
}


//ByteBuf
public abstract int getBytes(int index, GatheringByteChannel out, int length) throws IOException;

//读当前buf数据到FileChannel
@Override
public int readBytes(FileChannel out, long position, int length)
        throws IOException {
    checkReadableBytes(length);
    int readBytes = getBytes(readerIndex, out, position, length);
    readerIndex += readBytes;
    return readBytes;
}

//ByteBuf
 public abstract int getBytes(int index, FileChannel out, long position, int length) throws IOException


//读当前buf数据到OutputStream
@Override
public ByteBuf readBytes(OutputStream out, int length) throws IOException {
    checkReadableBytes(length);
    getBytes(readerIndex, out, length);
    readerIndex += length;
    return this;
}

//ByteBuf
public abstract ByteBuf getBytes(int index, OutputStream out, int length) throws IOException;


从上面可以看出,read*原始类型方法会修改当前buf读索引,readBytes(...,ByteBuf,...)方法会修改当前buf读索引,同时会修改目的buf的写索引,readBytes(...,byte[],...)方法会修改当前buf读索引。read*操作实际委托个get*的相关操作,同时更新buf读索引。

再来看readSlice相关方法
@Override
public ByteBuf readSlice(int length) {
    ByteBuf slice = slice(readerIndex, length);
    readerIndex += length;
    return slice;
}

@Override
public ByteBuf readRetainedSlice(int length) {
    ByteBuf slice = retainedSlice(readerIndex, length);
    readerIndex += length;
    return slice;
}

@Override
public ByteBuf slice(int index, int length) {
    return new UnpooledSlicedByteBuf(this, index, length);
}

@Override
public ByteBuf retainedSlice(int index, int length) {
    return slice(index, length).retain();
}

//UnpooledSlicedByteBuf
class UnpooledSlicedByteBuf extends AbstractUnpooledSlicedByteBuf {
	UnpooledSlicedByteBuf(AbstractByteBuf buffer, int index, int length) {
	    super(buffer, index, length);
	 @Override
    public AbstractByteBuf unwrap() {
        return (AbstractByteBuf) super.unwrap();
    }

    @Override
    protected byte _getByte(int index) {
        return unwrap()._getByte(idx(index));
    }
    //其他get方法思路基本相同
	...

}


//AbstractUnpooledSlicedByteBuf,可以理解为字节buf的静态代理
abstract class AbstractUnpooledSlicedByteBuf extends AbstractDerivedByteBuf {
    private final ByteBuf buffer;//内部字节buf
    private final int adjustment;
    //获取内部字节buf
    @Override
    public ByteBuf unwrap() {
        return buffer;
    }
     @Override
    public byte getByte(int index) {
        checkIndex0(index, 1);
        return unwrap().getByte(idx(index));
    }
    @Override
    protected byte _getByte(int index) {
        return unwrap().getByte(idx(index));
    }
    //其他字节buf的相关方法,都是委托给内部字节buf
    ...
 }


//AbstractDerivedByteBuf,可以理解实现了引用计数器的抽象字节buf
public abstract class AbstractDerivedByteBuf extends AbstractByteBuf {

    protected AbstractDerivedByteBuf(int maxCapacity) {
        super(maxCapacity);
    }

    @Override
    public final int refCnt() {
        return refCnt0();
    }

    int refCnt0() {
        return unwrap().refCnt();
    }

    @Override
    public final ByteBuf retain() {
        return retain0();
    }

    ByteBuf retain0() {
        unwrap().retain();
        return this;
    }
    ...
}


//ByteBuf
/**
 * Return the underlying buffer instance if this buffer is a wrapper of another buffer.
 *
 * @return {@code null} if this buffer is not a wrapper
 */
public abstract ByteBuf unwrap();

从上面可以看出retainedSlice和slice方法返回则的字节buf,实际为字节buf底层unwrap buf,
可以理解为字节buf的快照或引用,数据更改相互影响,retainedSlice方法会增加字节buf的引用计数器。

再来看读部分数据到字节序列
@Override
public CharSequence readCharSequence(int length, Charset charset) {
    CharSequence sequence = getCharSequence(readerIndex, length, charset);
    readerIndex += length;
    return sequence;
}


再来看skipBytes方法
//跳过length长度的字节,只更新读索引,不删除实际buf数据
@Override
public ByteBuf skipBytes(int length) {
    checkReadableBytes(length);
    readerIndex += length;
    return this;
}


再来看write*相关方法:

@Override
public ByteBuf writeByte(int value) {
    ensureAccessible();
    //确保buf,足够容下1个字节数据
    ensureWritable0(1);
    //委托给_setByte,并更新写索引
    _setByte(writerIndex++, value);
    return this;
}

//确保buf,足够容下minWritableBytes个字节数据
@Override
public ByteBuf ensureWritable(int minWritableBytes) {
    if (minWritableBytes < 0) {
        throw new IllegalArgumentException(String.format(
                "minWritableBytes: %d (expected: >= 0)", minWritableBytes));
    }
    ensureWritable0(minWritableBytes);
    return this;
}

private void ensureWritable0(int minWritableBytes) {
    if (minWritableBytes <= writableBytes()) {
        return;
    }

    if (minWritableBytes > maxCapacity - writerIndex) {
        throw new IndexOutOfBoundsException(String.format(
                "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                writerIndex, minWritableBytes, maxCapacity, this));
    }

    // Normalize the current capacity to the power of 2. 计算新的buf容量
    int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);

    // Adjust to the new capacity.
    //调整容量
    capacity(newCapacity);
}

@Override
public int ensureWritable(int minWritableBytes, boolean force) {
    if (minWritableBytes < 0) {
        throw new IllegalArgumentException(String.format(
                "minWritableBytes: %d (expected: >= 0)", minWritableBytes));
    }
    if (minWritableBytes <= writableBytes()) {
        //不够写
        return 0;
    }
    final int maxCapacity = maxCapacity();
    final int writerIndex = writerIndex();
    if (minWritableBytes > maxCapacity - writerIndex) {
        if (!force || capacity() == maxCapacity) {
            return 1;
        }
	//扩展至最大容量
        capacity(maxCapacity);
        return 3;
    }
    //计算新的容量,并更新
    // Normalize the current capacity to the power of 2.
    int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);

    // Adjust to the new capacity.
    capacity(newCapacity);
    return 2;
}

再来看写int
@Override
public ByteBuf writeInt(int value) {
    ensureAccessible();
    //确保buf,足够容下4个字节数据
    ensureWritable0(4);
    _setInt(writerIndex, value);
    writerIndex += 4;//更新写索引
    return this;
}

其他write*原始类型方法,思路基本相同;

再来看writeBytes相关方法

将字节数组数据写到当前buf

@Override
public ByteBuf writeBytes(byte[] src) {
    writeBytes(src, 0, src.length);
    return this;
}

@Override
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
    ensureAccessible();
    ensureWritable(length);
    setBytes(writerIndex, src, srcIndex, length);
    writerIndex += length;
    return this;
}


将字节buf数据写到当前buf

@Override
public ByteBuf writeBytes(ByteBuf src) {
    writeBytes(src, src.readableBytes());
    return this;
}

@Override
public ByteBuf writeBytes(ByteBuf src, int length) {
    if (length > src.readableBytes()) {
        throw new IndexOutOfBoundsException(String.format(
                "length(%d) exceeds src.readableBytes(%d) where src is: %s", length, src.readableBytes(), src));
    }
    writeBytes(src, src.readerIndex(), length);
    src.readerIndex(src.readerIndex() + length);
    return this;
}

@Override
public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) {
    ensureAccessible();
    ensureWritable(length);
    setBytes(writerIndex, src, srcIndex, length);
    writerIndex += length;
    return this;
}


再来看将nio 字节buf,InputStream,ScatteringByteChannel,FileChannel中的数据写到
当前buf

@Override
public ByteBuf writeBytes(ByteBuffer src) {
    ensureAccessible();
    int length = src.remaining();
    ensureWritable(length);
    setBytes(writerIndex, src);
    writerIndex += length;
    return this;
}

//ByteBuf
public abstract ByteBuf setBytes(int index, ByteBuffer src);


@Override
public int writeBytes(InputStream in, int length)
        throws IOException {
    ensureAccessible();
    ensureWritable(length);
    int writtenBytes = setBytes(writerIndex, in, length);
    if (writtenBytes > 0) {
        writerIndex += writtenBytes;
    }
    return writtenBytes;
}

//ByteBuf
public abstract int setBytes(int index, InputStream in, int length) throws IOException;


@Override
public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
    ensureAccessible();
    ensureWritable(length);
    int writtenBytes = setBytes(writerIndex, in, length);
    if (writtenBytes > 0) {
        writerIndex += writtenBytes;
    }
    return writtenBytes;
}


//ByteBuf
public abstract int setBytes(int index, ScatteringByteChannel in, int length) throws IOException;


@Override
public int writeBytes(FileChannel in, long position, int length) throws IOException {
    ensureAccessible();
    ensureWritable(length);
    int writtenBytes = setBytes(writerIndex, in, position, length);
    if (writtenBytes > 0) {
        writerIndex += writtenBytes;
    }
    return writtenBytes;
}


//ByteBuf
public abstract int setBytes(int index, FileChannel in, long position, int length) throws IOException;


//写字节序列
@Override
public int writeCharSequence(CharSequence sequence, Charset charset) {
    int written = setCharSequence(writerIndex, sequence, charset);
    writerIndex += written;
    return written;
}

从上面可以看出write*原始类型方法会修改当前buf写索引,writeBytes(...,ByteBuf,...)方法会修改当前buf写索引,同时会修改目的buf的读索引,writeBytes(...,byte[],...)方法会修改当前buf写索引。write*操作实际委托个set*的相关操作,同时更新buf写索引。

再来看复制方法:
@Override
public ByteBuf copy() {
    return copy(readerIndex, readableBytes());
}

//ByteBuf
public abstract ByteBuf copy(int index, int length);



slice和retainedSlice相关的方法,我们在前面已说,这里仅仅展示一下:
@Override
public ByteBuf slice() {
    return slice(readerIndex, readableBytes());
}

@Override
public ByteBuf retainedSlice() {
    return slice().retain();
}

@Override
public ByteBuf slice(int index, int length) {
    return new UnpooledSlicedByteBuf(this, index, length);
}

@Override
public ByteBuf retainedSlice(int index, int length) {
    return slice(index, length).retain();
}


来看完全复制duplicate
@Override
public ByteBuf duplicate() {
    return new UnpooledDuplicatedByteBuf(this);
}

@Override
public ByteBuf retainedDuplicate() {
    return duplicate().retain();
}

//UnpooledDuplicatedByteBuf
class UnpooledDuplicatedByteBuf extends DuplicatedByteBuf {
    UnpooledDuplicatedByteBuf(AbstractByteBuf buffer) {
        super(buffer);
    }

    @Override
    public AbstractByteBuf unwrap() {
        return (AbstractByteBuf) super.unwrap();
    }

    @Override
    protected byte _getByte(int index) {
        return unwrap()._getByte(index);
    }
    //其他方法思路一样委托给内存的buf
    ...
}


//可以理解为buf的静态代理
@Deprecated
public class DuplicatedByteBuf extends AbstractDerivedByteBuf {

    private final ByteBuf buffer;

    public DuplicatedByteBuf(ByteBuf buffer) {
        this(buffer, buffer.readerIndex(), buffer.writerIndex());
    }
    @Override
    public byte getByte(int index) {
        return unwrap().getByte(index);
    }
    其他方法思路一样委托给内存的buf
    ...
}

从上面可以看出:retainedDuplicate和duplicate方法返回则的字节buf,实际为字节buf底层unwrap buf,可以理解为字节buf的快照或引用,数据更改相互影响,retainedDuplicate方法会增加字节buf的引用计数器。

再来看转化nio ByteBuffer
@Override
public ByteBuffer nioBuffer() {
    return nioBuffer(readerIndex, readableBytes());
}

@Override
public ByteBuffer[] nioBuffers() {
    return nioBuffers(readerIndex, readableBytes());
}

//ByteBuf
public abstract ByteBuffer[] nioBuffers(int index, int length);

总结:

字节buf内部有两个索引,一个读索引,一个写索引,两个索引标记,即读写索引对应的标记,buf的最大容量为maxCapacity;buf的构造,主要是初始化最大容量。

弃已读数据方法discardReadBytes,丢弃buf数据时,只修改读写索引和相应的标记,并不删除数据。

get*原始类型方法不会修改当前buf读写索引,getBytes(...,ByteBuf,...)方法不会修改当前buf读写索引,会修改目的buf的写索引。getBytes(...,byte[],...)方法不会修改当前buf读写索引。

set*原始类型方法不会修改当前buf读写索引,setBytes(...,ByteBuf,...)方法不会修改当前buf读写索引,会修改源buf的读索引。setBytes(...,byte[],...)方法不会修改当前buf读写索引。

read*原始类型方法会修改当前buf读索引,readBytes(...,ByteBuf,...)方法会修改当前buf读索引,同时会修改目的buf的写索引,readBytes(...,byte[],...)方法会修改当前buf读索引。
read*操作实际委托个get*的相关操作,同时更新buf读索引。

跳过length长度的字节,只更新读索引,不删除实际buf数据。

retainedSlice和slice方法返回则的字节buf,实际为字节buf底层unwrap buf,可以理解为字节buf的快照或引用,数据更改相互影响,retainedSlice方法会增加字节buf的引用计数器。

write*原始类型方法会修改当前buf写索引,writeBytes(...,ByteBuf,...)方法会修改当前buf写索引,同时会修改目的buf的读索引,readBytes(...,byte[],...)方法会修改当前buf写索引。
write*操作实际委托个set*的相关操作,同时更新buf写索引。

retainedDuplicate和duplicate方法返回则的字节buf,实际为字节buf底层unwrap buf,可以理解为字节buf的快照或引用,数据更改相互影响,retainedDuplicate方法会增加字节buf的引用计数器。



附:

//字节查找
@Override
public int indexOf(int fromIndex, int toIndex, byte value) {
    return ByteBufUtil.indexOf(this, fromIndex, toIndex, value);
}


//ByteBufUtil

/
**
 * The default implementation of {@link ByteBuf#indexOf(int, int, byte)}.
 * This method is useful when implementing a new buffer type.
 */
public static int indexOf(ByteBuf buffer, int fromIndex, int toIndex, byte value) {
    if (fromIndex <= toIndex) {
        return firstIndexOf(buffer, fromIndex, toIndex, value);
    } else {
        return lastIndexOf(buffer, fromIndex, toIndex, value);
    }
}


private static int firstIndexOf(ByteBuf buffer, int fromIndex, int toIndex, byte value) {
    fromIndex = Math.max(fromIndex, 0);
    if (fromIndex >= toIndex || buffer.capacity() == 0) {
        return -1;
    }

    return buffer.forEachByte(fromIndex, toIndex - fromIndex, new ByteProcessor.IndexOfProcessor(value));
}

private static int lastIndexOf(ByteBuf buffer, int fromIndex, int toIndex, byte value) {
    fromIndex = Math.min(fromIndex, buffer.capacity());
    if (fromIndex < 0 || buffer.capacity() == 0) {
        return -1;
    }

    return buffer.forEachByteDesc(toIndex, fromIndex - toIndex, new ByteProcessor.IndexOfProcessor(value));
}

//ByteProcessor
public interface ByteProcessor {
    /**
     * A {@link ByteProcessor} which finds the first appearance of a specific byte.
     */
    class IndexOfProcessor implements ByteProcessor {
        private final byte byteToFind;

        public IndexOfProcessor(byte byteToFind) {
            this.byteToFind = byteToFind;
        }

        @Override
        public boolean process(byte value) {
            return value != byteToFind;
        }
    }
    ...
}


//获取字节在buf中的第一个索引位置与开始索引之前的长度
@Override
public int bytesBefore(byte value) {
    return bytesBefore(readerIndex(), readableBytes(), value);
}

@Override
public int bytesBefore(int length, byte value) {
    checkReadableBytes(length);
    return bytesBefore(readerIndex(), length, value);
}

@Override
public int bytesBefore(int index, int length, byte value) {
    //委托给indexOf
    int endIndex = indexOf(index, index + length, value);
    if (endIndex < 0) {
        return -1;
    }
    return endIndex - index;
}

//遍历buf所有字节,调用处理器,处理
升序方式
@Override
public int forEachByte(ByteProcessor processor) {
    ensureAccessible();
    try {
        return forEachByteAsc0(readerIndex, writerIndex, processor);
    } catch (Exception e) {
        PlatformDependent.throwException(e);
        return -1;
    }
}

@Override
public int forEachByte(int index, int length, ByteProcessor processor) {
    checkIndex(index, length);
    try {
        return forEachByteAsc0(index, index + length, processor);
    } catch (Exception e) {
        PlatformDependent.throwException(e);
        return -1;
    }
}

private int forEachByteAsc0(int start, int end, ByteProcessor processor) throws Exception {
    for (; start < end; ++start) {
        if (!processor.process(_getByte(start))) {
            return start;
        }
    }

    return -1;
}


倒序方式

@Override
public int forEachByteDesc(ByteProcessor processor) {
    ensureAccessible();
    try {
        return forEachByteDesc0(writerIndex - 1, readerIndex, processor);
    } catch (Exception e) {
        PlatformDependent.throwException(e);
        return -1;
    }
}

@Override
public int forEachByteDesc(int index, int length, ByteProcessor processor) {
    checkIndex(index, length);
    try {
        return forEachByteDesc0(index + length - 1, index, processor);
    } catch (Exception e) {
        PlatformDependent.throwException(e);
        return -1;
    }
}

private int forEachByteDesc0(int rStart, final int rEnd, ByteProcessor processor) throws Exception {
    for (; rStart >= rEnd; --rStart) {
        if (!processor.process(_getByte(rStart))) {
            return rStart;
        }
    }
    return -1;
}


@Override
public int hashCode() {
    return ByteBufUtil.hashCode(this);
}



@Override
public boolean equals(Object o) {
    return this == o || (o instanceof ByteBuf && ByteBufUtil.equals(this, (ByteBuf) o));
}

@Override
public int compareTo(ByteBuf that) {
    return ByteBufUtil.compare(this, that);
}

@Override
public String toString() {
    if (refCnt() == 0) {
        return StringUtil.simpleClassName(this) + "(freed)";
    }

    StringBuilder buf = new StringBuilder()
        .append(StringUtil.simpleClassName(this))
        .append("(ridx: ").append(readerIndex)
        .append(", widx: ").append(writerIndex)
        .append(", cap: ").append(capacity());
    if (maxCapacity != Integer.MAX_VALUE) {
        buf.append('/').append(maxCapacity);
    }

    ByteBuf unwrapped = unwrap();
    if (unwrapped != null) {
        buf.append(", unwrapped: ").append(unwrapped);
    }
    buf.append(')');
    return buf.toString();
}
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics