`
Donald_Draper
  • 浏览: 950732 次
社区版块
存档分类
最新评论

netty 通道Outbound缓冲区

阅读更多
netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
引言:
前面篇文章我们看了抽象通道的内部类抽象Unsafe,先来回顾一下:
      抽象Unsafe内部关联一个通道Outbound buf(ChannelOutboundBuffer),一个接收字节buf分配器Hander( RecvByteBufAllocator.Handle)。
      通道注册到事件循环,首先检查事件循环是否为空,通道是否已注册到事件循环,通道是否兼容事件循环,检查通过后,如果线程在当前事件循环,则委托给register0完成实际注册任务,否则创建一个任务线程,完成通道注册事件循环实际工作register0,并将任务线程交由事件循环执行。register0方法首先确保任务没取消,通道打开,调用doRegister完成注册,确保在实际通知注册任务完成前,调用handlerAdded事件,触发通道已注册事件fireChannelRegistered,如果通道激活且第一次注册,则触发通道已激活事件fireChannelActive,否则如果通道配置为自动读取,则读取数据beginRead,实际委托给
doBeginRead方法,待子类实现。这个过程中触发的事件,则传递给通道内部的Channel管道。地址绑定方法委托给doBind,待子类实现。
      关闭通道方法,首先确保异步关闭任务没有取消,如果Outbound buf为空,则添加异步结果监听器;再次检查关闭任务有没有执行完,执行完则更新异步任务结果;获取关闭线程执行器,如果关闭执行器不为空,则创建关闭任务线程,并由关闭执行器执行,否则在当前事务循环中执行实际关闭任务。实际关闭任务过程为,调用doClose0完成通道关闭任务,待子类实现,然后设置刷新Outbound 写请求队列数据失败,关闭OutBound buf,如果通道正在刷新,则延迟触发ChannelInactive事件,并反注册,否则直接触发ChannelInactive事件并反注册。
     写消息,首先检查Outbound buf是否为null,为空,则通道关闭,设置任务失败,否则转换消息,估算消息大小,添加消息到OutBound Buf中。
     刷新操作,首先将Outbound buf中写请求,添加到刷新队列中,然后将实际刷新工作委托给doWrite,doWrite方法,待子类实现。
在抽象通道的变量声明中,我们看到一个有Outbound buf,在抽象Unsafe的中的写消息,实际为将消息添加的Outbound buf中,刷新操作,即将Outbound buf中为未刷新的消息队列添加到刷新队列中,然后发送刷新队列消息。
今天我们就来看一下ChannelOutboundBuffer
package io.netty.channel;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.Unpooled;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.InternalThreadLocalMap;
import io.netty.util.internal.PromiseNotificationUtil;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

/**
 * (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending
 * outbound write requests.
 抽象通道用通道Outbound  buf 存放写请求消息
 * 
 * All methods must be called by a transport implementation from an I/O thread, except the following ones:
 * [list]
 * [*]{@link #size()} and {@link #isEmpty()}

 * [*]{@link #isWritable()}

 * [*]{@link #getUserDefinedWritability(int)} and {@link #setUserDefinedWritability(int, boolean)}

 * [/list]
 * 

 */
public final class ChannelOutboundBuffer {
    // Assuming a 64-bit JVM:
    //  - 16 bytes object header
    //  - 8 reference fields
    //  - 2 long fields
    //  - 2 int fields
    //  - 1 boolean field
    //  - padding
    //Entry buf 头部数据size
    static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
            SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);
    //通道Outbound buf 线程本地Buf,存放刷新链表中的写请求消息
    private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
        @Override
        protected ByteBuffer[] initialValue() throws Exception {
            return new ByteBuffer[1024];
        }
    };
   //buf 关联通道
    private final Channel channel;

    // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
    //
    // The Entry that is the first in the linked-list structure that was flushed
    private Entry flushedEntry;刷新写请求链的链头
    // The Entry which is the first unflushed in the linked-list structure
    private Entry unflushedEntry;//未刷新的写请求链的链头
    // The Entry which represents the tail of the buffer
    private Entry tailEntry;
    // The number of flushed entries that are not written yet
    private int flushed;//刷新Entry链上待发送的写请求数

    private int nioBufferCount;//当前待发送的消息buf数量
    private long nioBufferSize;//当前待发送的所有消息buf的字节数

    private boolean inFail;//是否刷新失败
    //通道待发送的字节数
    private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
            AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");

    @SuppressWarnings("UnusedDeclaration")
    private volatile long totalPendingSize;

    private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");

    @SuppressWarnings("UnusedDeclaration")
    private volatile int unwritable;//通道写状态
   //触发通道ChannelWritabilityChanged事件任务线程
    private volatile Runnable fireChannelWritabilityChangedTask;

    ChannelOutboundBuffer(AbstractChannel channel) {
        this.channel = channel;
    }
}

从上面来看,通道Outbound缓存区内部关联一个通道,同时有一个线程本地buf数组,
一个未刷新的buf链表和一个刷新buf链表。

我们来看一下buf Entry的定义
static final class Entry {
   //Entry回收器
    private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
        @Override
        protected Entry newObject(Handle<Entry> handle) {
            return new Entry(handle);
        }
    };

    private final Handle<Entry> handle;//Entry 对象回收Handle
    Entry next;//后继
    Object msg;//消息对象
    ByteBuffer[] bufs;//存放消息的字节buf数组
    ByteBuffer buf;//存放消息的buf
    ChannelPromise promise;//写请求任务
    long progress;
    long total;//消息size
    int pendingSize;//Entry消息字节数
    int count = -1;//消息当前buf的数量
    boolean cancelled;

    private Entry(Handle<Entry> handle) {
        this.handle = handle;
    }
    //创建Entry实例
    static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
        Entry entry = RECYCLER.get();
        entry.msg = msg;
        entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;//待发送的消息字节数
        entry.total = total;
        entry.promise = promise;
        return entry;
    }
    //取消写请求
    int cancel() {
        if (!cancelled) {
            cancelled = true;
            int pSize = pendingSize;
            // release message and replace with an empty buffer
	    //释放消息对象
            ReferenceCountUtil.safeRelease(msg);
            msg = Unpooled.EMPTY_BUFFER;

            pendingSize = 0;
            total = 0;
            progress = 0;
            bufs = null;
            buf = null;
            return pSize;
        }
        return 0;
    }
    //回收写请求,委托给
    void recycle() {
        next = null;
        bufs = null;
        buf = null;
        msg = null;
        promise = null;
        progress = 0;
        total = 0;
        pendingSize = 0;
        count = -1;
        cancelled = false;
        handle.recycle(this);
    }
    //回收Entry,返回后继写请求
    Entry recycleAndGetNext() {
        Entry next = this.next;
        recycle();
        return next;
    }
}

通道写消息时,消息将会被包装成写请求Entry。

来看添加消息到通道Outbound缓冲区
/**
 * Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once
 * the message was written.
 */
public void addMessage(Object msg, int size, ChannelPromise promise) {
   //包装消息,为写请求Entry
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        flushedEntry = null;
        tailEntry = entry;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
        tailEntry = entry;
    }
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }

    // increment pending bytes after adding message to the unflushed arrays.
    // See https://github.com/netty/netty/issues/1619
    //在添加消息到未刷新写请求链表后,更新待发送的字节数
    incrementPendingOutboundBytes(entry.pendingSize, false);
}

//计算消息size
private static long total(Object msg) {
    if (msg instanceof ByteBuf) {
        return ((ByteBuf) msg).readableBytes();
    }
    if (msg instanceof FileRegion) {
        return ((FileRegion) msg).count();
    }
    if (msg instanceof ByteBufHolder) {
        return ((ByteBufHolder) msg).content().readableBytes();
    }
    return -1;
}

//在添加消息到未刷新写请求链表后,更新待发送的字节数
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
    if (size == 0) {
        return;
    }
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
    //如果待发送的字节数,大于通道写buf大小,则更新通道可状态
    if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
        //更新通道写状态
        setUnwritable(invokeLater);
    }
}

//ChannelConfig
/**
 * Returns the high water mark of the write buffer.  If the number of bytes
 * queued in the write buffer exceeds this value, {@link Channel#isWritable()}
 * will start to return {@code false}.
 */
int getWriteBufferHighWaterMark();


//更新通道写状态
private void setUnwritable(boolean invokeLater) {
    for (;;) {
        final int oldValue = unwritable;
        final int newValue = oldValue | 1;
        if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
            if (oldValue == 0 && newValue != 0) {
	        //写状态改变,则触发通道ChannelWritabilityChanged事件
                fireChannelWritabilityChanged(invokeLater);
            }
            break;
        }
    }
}


private void fireChannelWritabilityChanged(boolean invokeLater) {
    final ChannelPipeline pipeline = channel.pipeline();
    if (invokeLater) {
       //如果需要延时通知,则创建ChannelWritabilityChanged事件触发任务线程
        Runnable task = fireChannelWritabilityChangedTask;
        if (task == null) {
            fireChannelWritabilityChangedTask = task = new Runnable() {
                @Override
                public void run() {
		    //通知通道,通道写状态已改变
                    pipeline.fireChannelWritabilityChanged();
                }
            };
        }
	//将ChannelWritabilityChanged事件触发线程,交由通道事件循环执行
        channel.eventLoop().execute(task);
    } else {
        //否则直接触发ChannelWritabilityChanged事件
        pipeline.fireChannelWritabilityChanged();
    }
}


从上面可以看出,添加消息到通道Outbound缓冲区,首先包装消息为写请求Entry,
将写请求Entry添加到未刷新写请求链表上,并更新通道当前待发送的字节数据,
如果通道待发送的字节数大于通道写bufsize,则更新通道写状态,并触发通道ChannelWritabilityChanged事件。触发事件实际操作委托给通道的Channel管道。

//更新通道待发送字节数的另一个版本,与上面不同道是,如果
通道待发送的字节数大于通道写bufsize,延时触发通道ChannelWritabilityChanged事件。
/**
  * Increment the pending bytes which will be written at some point.
  * This method is thread-safe!
  */
 void incrementPendingOutboundBytes(long size) {
     incrementPendingOutboundBytes(size, true);
 }

再来看刷新操作:
/**
 * Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
 * and so you will be able to handle them.
 刷新通道Outbound缓存区,即将先前添加的消息,标记为刷新
 */
public void addFlush() {
    // There is no need to process all entries if there was already a flush before and no new messages
    // where added in the meantime.
    //
    // See https://github.com/netty/netty/issues/2577
    Entry entry = unflushedEntry;
    if (entry != null) {
        if (flushedEntry == null) {
            // there is no flushedEntry yet, so start with the entry
            flushedEntry = entry;
        }
	//遍历未刷新写请求链表,将写请求添加到刷新链表中
        do {
            flushed ++;
            if (!entry.promise.setUncancellable()) {
                // Was cancelled so make sure we free up memory and notify about the freed bytes
		//如果写请求取消,则更新通道待发送字节数
                int pending = entry.cancel();
                decrementPendingOutboundBytes(pending, false, true);
            }
            entry = entry.next;
        } while (entry != null);

        // All flushed so reset unflushedEntry
	//置空未刷新写请求链表
        unflushedEntry = null;
    }
}

我们来看:
//如果写请求取消,则更新通道待发送字节数
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);




private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
    if (size == 0) {
        return;
    }
    //更新通道待发送字节数
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
    //待发送字节数消息,小于通道配置的写buf size
    if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
        //更新通道可写状态
        setWritable(invokeLater);
    }
}


//ChannelConfig
/**
 * Returns the low water mark of the write buffer.  Once the number of bytes
 * queued in the write buffer exceeded the
 * {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then
 * dropped down below this value, {@link Channel#isWritable()} will start to return
 * {@code true} again.
 一旦通道待发送字节数大于通道写buf的 high water mark,则丢弃如下值,Channel#isWritable将返回true
 */
int getWriteBufferLowWaterMark();


//更新通道可写状态
private void setWritable(boolean invokeLater) {
    for (;;) {
        final int oldValue = unwritable;
        final int newValue = oldValue & ~1;
        if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
            if (oldValue != 0 && newValue == 0) {
                fireChannelWritabilityChanged(invokeLater);
            }
            break;
        }
    }
}


//更新通道待发送字节数,另一个版本
/**
 * Decrement the pending bytes which will be written at some point.
 * This method is thread-safe!
 */
void decrementPendingOutboundBytes(long size) {
    decrementPendingOutboundBytes(size, true, true);
}

从上面可以出,添加刷新操作,即遍历未刷新写请求链表,将写请求添加到刷新链表中,
如果写请求取消,则更新通道待发送字节数,如果待发送字节数消息,小于通道配置的写buf size,则更新通道可写状态。



再来看移除操作:
/**
 * Will remove the current message, mark its {@link ChannelPromise} as success and return {@code true}. If no
 * flushed message exists at the time this method is called it will return {@code false} to signal that no more
 * messages are ready to be handled.
 移除当前消息,并标记通道异步任务为成功,并返回true。如果没有刷新消息存在,则返回false,表示没有消息需要处理
 */
public boolean remove() {
    Entry e = flushedEntry;
    if (e == null) {
         //刷新消息链为空,则清除NioBuffer
        clearNioBuffers();
        return false;
    }
    Object msg = e.msg;

    ChannelPromise promise = e.promise;
    int size = e.pendingSize;
    //移除写请求Entry
    removeEntry(e);

    if (!e.cancelled) {
        // only release message, notify and decrement if it was not canceled before.
	//写请求没有取消,则释放消息,更新任务结果,更新当前通道待发送字节数和可写状态,并触发相应的事件
        ReferenceCountUtil.safeRelease(msg);
        safeSuccess(promise);
        decrementPendingOutboundBytes(size, false, true);
    }
    //取消,则回收
    // recycle the entry
    e.recycle();

    return true;
}

移除操作有几点要看:
1.
if (e == null) {
     //刷新消息链为空,则清除NioBuffer
    clearNioBuffers();
    return false;
}


// Clear all ByteBuffer from the array so these can be GC'ed.
// See https://github.com/netty/netty/issues/3837
private void clearNioBuffers() {
    int count = nioBufferCount;
    if (count > 0) {
        //重置nio buf计数器,填充线程本地nio buf数组为空。
        nioBufferCount = 0;
        Arrays.fill(NIO_BUFFERS.get(), 0, count, null);
    }
}

2.
//移除写请求Entry
removeEntry(e);


private void removeEntry(Entry e) {
    if (-- flushed == 0) {//刷新链为空
        // processed everything
        flushedEntry = null;
        if (e == tailEntry) {//链尾
            tailEntry = null;
            unflushedEntry = null;
        }
    } else {
       //否则,刷新链头往后移一位
        flushedEntry = e.next;
    }
}


从上面可以看出移除操作,主要是从刷新写请求链移除链头写请求,并则释放写请求消息,
更新写请求任务结果,当前通道待发送字节数和可写状态,并触发相应的事件

再来看发送消息异常时,移除写请求
/**
  * Will remove the current message, mark its {@link ChannelPromise} as failure using the given {@link Throwable}
  * and return {@code true}. If no   flushed message exists at the time this method is called it will return
  * {@code false} to signal that no more messages are ready to be handled.
  */
 public boolean remove(Throwable cause) {
     return remove0(cause, true);
 }

 private boolean remove0(Throwable cause, boolean notifyWritability) {
     Entry e = flushedEntry;
     if (e == null) {
     //刷新消息链为空,则清除NioBuffer
         clearNioBuffers();
         return false;
     }
     Object msg = e.msg;

     ChannelPromise promise = e.promise;
     int size = e.pendingSize;
     //移除写请求Entry
     removeEntry(e);

     if (!e.cancelled) {
         // only release message, fail and decrement if it was not canceled before.
	 //写请求没有取消,则释放消息,更新任务结果,更新当前通道待发送字节数和可写状态,并触发相应的事件
         ReferenceCountUtil.safeRelease(msg);
         
         safeFail(promise, cause);
         decrementPendingOutboundBytes(size, false, notifyWritability);
     }
      //取消,则回收
     // recycle the entry
     e.recycle();

     return true;
 }



/**
 * Removes the fully written entries and update the reader index of the partially written entry.
 * This operation assumes all messages in this buffer is {@link ByteBuf}.

 从刷新写请求链表,移除writtenBytes个字节数
 */
public void removeBytes(long writtenBytes) {
    for (;;) {
        Object msg = current();//获取当前写请求消息
        if (!(msg instanceof ByteBuf)) {
	    //写请求非ByteBuf实例,且writtenBytes为0
            assert writtenBytes == 0;
            break;
        }

        final ByteBuf buf = (ByteBuf) msg;
	//获取消息buf的读指针
        final int readerIndex = buf.readerIndex();
	//获取buf中可读的字节数
        final int readableBytes = buf.writerIndex() - readerIndex;
        //如果可读字节数小于,需要移除的字节数
        if (readableBytes <= writtenBytes) {
            if (writtenBytes != 0) {
	        //则更新写请求任务进度
                progress(readableBytes);
		//更新移除字节数
                writtenBytes -= readableBytes;
            }
	    //移除链头写请求消息
            remove();
        } else { // readableBytes > writtenBytes
            if (writtenBytes != 0) {
	       //如果可读字节数大于需要移除的字节数,则移动消息buf的读索引到readerIndex + (int) writtenBytes位置
                buf.readerIndex(readerIndex + (int) writtenBytes);
		//则更新写请求任务进度
                progress(writtenBytes);
            }
            break;
        }
    }
    //最后清除nio buffer
    clearNioBuffers();
}

/**
 * Return the current message to write or {@code null} if nothing was flushed before and so is ready to be written.
 返回当前需要发送的消息
 */
public Object current() {
    Entry entry = flushedEntry;
    if (entry == null) {
        return null;
    }

    return entry.msg;
}



/**
 * Notify the {@link ChannelPromise} of the current message about writing progress.
 获取通道刷新任务的进度
 */
public void progress(long amount) {
    Entry e = flushedEntry;
    assert e != null;
    ChannelPromise p = e.promise;
    if (p instanceof ChannelProgressivePromise) {
        long progress = e.progress + amount;
        e.progress = progress;
        ((ChannelProgressivePromise) p).tryProgress(progress, e.total);
    }
}


从刷新写请求链表,移除writtenBytes个字节数方法removeBytes,自旋,直至从刷新链中移除writtenBytes个字节数,如果链头消息的可读字节数小于writtenBytes,则移除写请求Entry,否则更新writtenBytes,继续从刷新链中的写请求消息中移除writtenBytes个字节数。



再来看将刷新链上的写请求消息,添加到nio buffer数组中:
/**
 * Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only.
 * {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned
 * array and the total number of readable bytes of the NIO buffers respectively.
 将刷新链中的写请求对象消息放到nio buf数组中。#nioBufferCount和#nioBufferSize,将返回当前nio buf数组的长度
 和可读字节数
 * 
 * Note that the returned array is reused and thus should not escape
 * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
 * Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
 返回的nio buf将会被 NioSocketChannel#doWrite方法重用
 * 

 */
public ByteBuffer[] nioBuffers() {
    long nioBufferSize = 0;//nio buf数组中的字节数
    int nioBufferCount = 0;//nio buf数组长度
    final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
    //获取通道Outbound缓存区线程本地的niobuf数组
    ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
    Entry entry = flushedEntry;
    //遍历刷新链,链上的写请求Entry的消息必须为ByteBuf
    while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
        if (!entry.cancelled) {
	    //在写请求没有取消的情况下,获取写请求消息buf,及buf的读索引,和可读字节数
            ByteBuf buf = (ByteBuf) entry.msg;
            final int readerIndex = buf.readerIndex();
            final int readableBytes = buf.writerIndex() - readerIndex;

            if (readableBytes > 0) {
                if (Integer.MAX_VALUE - readableBytes < nioBufferSize) {
		    //如果消息buf可读字节数+nioBufferSize大于整数的最大值,则跳出循环
                    // If the nioBufferSize + readableBytes will overflow an Integer we stop populate the
                    // ByteBuffer array. This is done as bsd/osx don't allow to write more bytes then
                    // Integer.MAX_VALUE with one writev(...) call and so will return 'EINVAL', which will
                    // raise an IOException. On Linux it may work depending on the
                    // architecture and kernel but to be safe we also enforce the limit here.
                    // This said writing more the Integer.MAX_VALUE is not a good idea anyway.
                    //
                    // See also:
                    // - https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2
                    // - http://linux.die.net/man/2/writev
                    break;
                }
		//更新buf的size
                nioBufferSize += readableBytes;
                int count = entry.count;
                if (count == -1) {
                    //noinspection ConstantValueVariableUse
                    entry.count = count =  buf.nioBufferCount();
                }
		//需要buf的数量
                int neededSpace = nioBufferCount + count;
		//如果buf需求数量大于当前nio buf数组
                if (neededSpace > nioBuffers.length) {
		    //则扩容nio数组为原来的两倍,
                    nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
		    //更新nio buf数组
                    NIO_BUFFERS.set(threadLocalMap, nioBuffers);
                }
                if (count == 1) {
		    //如果需要的buf数量为1,则获取写请求的buf
                    ByteBuffer nioBuf = entry.buf;
                    if (nioBuf == null) {
                        // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
                        // derived buffer
			//如果buf为空,则创建一个buf实例
                        entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
                    }
		    //将消息buf,添加到nio buf数组中
                    nioBuffers[nioBufferCount ++] = nioBuf;
                } else {
		    //否则获取写请求的buf数组
                    ByteBuffer[] nioBufs = entry.bufs;
                    if (nioBufs == null) {
                        // cached ByteBuffers as they may be expensive to create in terms
                        // of Object allocation
			//分配buf数组
                        entry.bufs = nioBufs = buf.nioBuffers();
                    }
		    //添加写请求buf数组到通道Outbound缓存区的nio buf数组中
                    nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount);
                }
            }
        }
        entry = entry.next;
    }
    //更新当前nio buffer 计数器和字节数
    this.nioBufferCount = nioBufferCount;
    this.nioBufferSize = nioBufferSize;

    return nioBuffers;
}

1.
//则扩容nio数组
private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
    int newCapacity = array.length;
    do {
        // double capacity until it is big enough
        // See https://github.com/netty/netty/issues/1890
	//则扩容nio数组为原来的两倍
        newCapacity <<= 1;

        if (newCapacity < 0) {
            throw new IllegalStateException();
        }

    } while (neededSpace > newCapacity);
    ByteBuffer[] newArray = new ByteBuffer[newCapacity];
    //拷贝原始中size buf到新的的buf数组中
    System.arraycopy(array, 0, newArray, 0, size);

    return newArray;
}


2.
//添加写请求buf数组到通道Outbound缓存区的nio buf数组中
private static int fillBufferArray(ByteBuffer[] nioBufs, ByteBuffer[] nioBuffers, int nioBufferCount) {
    //遍历添加的buf数组,添加到缓存区的nio buf数组中
    for (ByteBuffer nioBuf: nioBufs) {
        if (nioBuf == null) {
            break;
        }
        nioBuffers[nioBufferCount ++] = nioBuf;
    }
    return nioBufferCount;
}


从上面可以看出:将刷新链上的写请求消息,添加到nio buffer数组中方法nioBuffers,主要是将刷新链上的写请求消息包装成direct buf添加到通道Outbound缓存区的nio buf数组中,这个方法主要在NioSocketChannel#doWrite方法重用。方法调用后,#nioBufferCount和#nioBufferSize,将返回当前nio buf数组的长度和可读字节数。


再来看其他方法:
//刷新失败
void failFlushed(Throwable cause, boolean notify) {
    // Make sure that this method does not reenter.  A listener added to the current promise can be notified by the
    // current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call
    // indirectly (usually by closing the channel.)
    //
    // See https://github.com/netty/netty/issues/1501
    if (inFail) {
        //已失败,直接返回
        return;
    }
    try {
        inFail = true;
        for (;;) {
	    //移除刷新链头部
            if (!remove0(cause, notify)) {
                break;
            }
        }
    } finally {
        inFail = false;
    }
}

//关闭Outbound缓存区
void close(final ClosedChannelException cause) {
    if (inFail) {
        //已将刷新失败,则创建关闭任务线程,委托给事件循环执行
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                close(cause);
            }
        });
        return;
    }

    inFail = true;

    if (channel.isOpen()) {
        throw new IllegalStateException("close() must be invoked after the channel is closed.");
    }

    if (!isEmpty()) {
        throw new IllegalStateException("close() must be invoked after all flushed writes are handled.");
    }

    // Release all unflushed messages.
    try {
        Entry e = unflushedEntry;
	//遍历未刷新的写请求,更新写任务失败
        while (e != null) {
            // Just decrease; do not trigger any events via decrementPendingOutboundBytes()
            int size = e.pendingSize;
	    //更新通道待发送的字节数
            TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);

            if (!e.cancelled) {
	        //如果写任务没有取消,则释放消息,更新任务状态为失败
                ReferenceCountUtil.safeRelease(e.msg);
                safeFail(e.promise, cause);
            }
	    //回收写请求
            e = e.recycleAndGetNext();
        }
    } finally {
        inFail = false;
    }
    //清除Nio buf数组
    clearNioBuffers();
}

//更新任务成功
private static void safeSuccess(ChannelPromise promise) {
    // Only log if the given promise is not of type VoidChannelPromise as trySuccess(...) is expected to return
    // false.
    PromiseNotificationUtil.trySuccess(promise, null, promise instanceof VoidChannelPromise ? null : logger);
}


//PromiseNotificationUtil
/**
 * Try to mark the {@link Promise} as success and log if {@code logger} is not {@code null} in case this fails.
 */
public static <V> void trySuccess(Promise<? super V> p, V result, InternalLogger logger) {
    if (!p.trySuccess(result) && logger != null) {
        Throwable err = p.cause();
        if (err == null) {
            logger.warn("Failed to mark a promise as success because it has succeeded already: {}", p);
        } else {
            logger.warn(
                    "Failed to mark a promise as success because it has failed already: {}, unnotified cause:",
                    p, err);
        }
    }
}


//更新任务失败
private static void safeFail(ChannelPromise promise, Throwable cause) {
    // Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return
    // false.
    PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
}

//回收缓冲区
@Deprecated
public void recycle() {
    // NOOP
}
//通道Outbound缓冲区中可写的字节数
public long totalPendingWriteBytes() {
    return totalPendingSize;
}


/**
 * Returns the number of {@link ByteBuffer} that can be written out of the {@link ByteBuffer} array that was
 * obtained via {@link #nioBuffers()}. This method [b]MUST[/b] be called after {@link #nioBuffers()}
 * was called.
 此方法必须在#nioBuffers方法后调用,用于获取当前缓冲区可写的字节buf数组的长度
 */
public int nioBufferCount() {
    return nioBufferCount;
}

/**
 * Returns the number of bytes that can be written out of the {@link ByteBuffer} array that was
 * obtained via {@link #nioBuffers()}. This method [b]MUST[/b] be called after {@link #nioBuffers()}
 * was called.
  此方法必须在#nioBuffers方法后调用,用于获取当前缓冲区可写的字节buf数组中的字节数
 */
 */
public long nioBufferSize() {
    return nioBufferSize;
}

/**
 * Returns {@code true} if and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did
 * not exceed the write watermark of the {@link Channel} and
 * no {@linkplain #setUserDefinedWritability(int, boolean) user-defined writability flag} has been set to
 * {@code false}.
 缓冲区是否可写,当且仅当#totalPendingWriteBytes方法的通道可写字节数,不超过通道的写watermark,同时
 用户没有使用#setUserDefinedWritability,设置可写标志为false
 */
public boolean isWritable() {
    return unwritable == 0;
}

/**
 * Returns {@code true} if and only if the user-defined writability flag at the specified index is set to
 * {@code true}.
 获取用于定义的可写标志
 */
public boolean getUserDefinedWritability(int index) {
    return (unwritable & writabilityMask(index)) == 0;
}

//可写掩码
private static int writabilityMask(int index) {
    if (index < 1 || index > 31) {
        throw new IllegalArgumentException("index: " + index + " (expected: 1~31)");
    }
    return 1 << index;
}


/**
 * Sets a user-defined writability flag at the specified index.
 根据可写状态,设置用户定义的可写标志
 */
public void setUserDefinedWritability(int index, boolean writable) {
    if (writable) {
        setUserDefinedWritability(index);
    } else {
        clearUserDefinedWritability(index);
    }
}
//设置用户定义的可写标志
private void setUserDefinedWritability(int index) {
    final int mask = ~writabilityMask(index);
    for (;;) {
        final int oldValue = unwritable;
        final int newValue = oldValue & mask;
	//更新可写状态
        if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
            if (oldValue != 0 && newValue == 0) {
	        //通知通道可写状态已改变
                fireChannelWritabilityChanged(true);
            }
            break;
        }
    }
}
//清除可写状态
private void clearUserDefinedWritability(int index) {
    final int mask = writabilityMask(index);
    for (;;) {
        final int oldValue = unwritable;
        final int newValue = oldValue | mask;
        if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
            if (oldValue == 0 && newValue != 0) {
                fireChannelWritabilityChanged(true);
            }
            break;
        }
    }
}


/**
 * Returns the number of flushed messages in this {@link ChannelOutboundBuffer}.
 返回通道Outbound缓冲区中需要刷新的消息数
 */
public int size() {
    return flushed;
}

/**
 * Returns {@code true} if there are flushed messages in this {@link ChannelOutboundBuffer} or {@code false}
 * otherwise.
 如果没有需要刷新的消息,则返回true
 */
public boolean isEmpty() {
    return flushed == 0;
}

/**
 * Get how many bytes can be written until {@link #isWritable()} returns {@code false}.
 * This quantity will always be non-negative. If {@link #isWritable()} is {@code false} then 0.
 返回直到通道不可写,通道Outbound缓冲区还可以写多少字节的数。如果通道不可写,则返回0
 */
public long bytesBeforeUnwritable() {
    //通道写缓存区大小-当前缓冲区字节数
    long bytes = channel.config().getWriteBufferHighWaterMark() - totalPendingSize;
    // If bytes is negative we know we are not writable, but if bytes is non-negative we have to check writability.
    // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
    // together. totalPendingSize will be updated before isWritable().
    if (bytes > 0) {
        return isWritable() ? bytes : 0;
    }
    return 0;
}

/**
 * Get how many bytes must be drained from the underlying buffer until {@link #isWritable()} returns {@code true}.
 * This quantity will always be non-negative. If {@link #isWritable()} is {@code true} then 0.
 获取直到通道可写,通道底层buf有多少字节数据需要发送。如果可写返回0
 */
public long bytesBeforeWritable() {
    //当前缓冲区字节数-通道写缓存的rLowWaterMark
    long bytes = totalPendingSize - channel.config().getWriteBufferLowWaterMark();
    // If bytes is negative we know we are writable, but if bytes is non-negative we have to check writability.
    // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
    // together. totalPendingSize will be updated before isWritable().
    if (bytes > 0) {
        return isWritable() ? 0 : bytes;
    }
    return 0;
}


/**
 * Call {@link MessageProcessor#processMessage(Object)} for each flushed message
 * in this {@link ChannelOutboundBuffer} until {@link MessageProcessor#processMessage(Object)}
 * returns {@code false} or there are no more flushed messages to process.
 当刷新消息时,调用消息处理器的处理消息方法#processMessage,处理每个消息,直到#processMessage方法
 返回false,或没有消息需要刷新
 */
public void forEachFlushedMessage(MessageProcessor processor) throws Exception {
    if (processor == null) {
        throw new NullPointerException("processor");
    }
    Entry entry = flushedEntry;
    if (entry == null) {
        return;
    }
    //遍历刷新链表
    do {
        if (!entry.cancelled) {
	    //如果写任务没有取消,则处理消息
            if (!processor.processMessage(entry.msg)) {
                return;
            }
        }
        entry = entry.next;
    } while (isFlushedEntry(entry));
}
//判断写请求是否在刷新链表上
private boolean isFlushedEntry(Entry e) {
    return e != null && e != unflushedEntry;
}
//消息处理器
public interface MessageProcessor {
    /**
     * Will be called for each flushed message until it either there are no more flushed messages or this
     * method returns {@code false}.
     在刷新消息时,将会调用
     */
    boolean processMessage(Object msg) throws Exception;
}




总结:
通道Outbound缓存区内部关联一个通道,同时有一个线程本地buf数组,一个未刷新的buf链表和一个刷新buf链表。通道写消息时,消息将会被包装成写请求Entry。

添加消息到通道Outbound缓冲区,首先包装消息为写请求Entry,将写请求Entry添加到未刷新写请求链表上,并更新通道当前待发送的字节数据,如果通道待发送的字节数大于通道写bufsize,则更新通道写状态,并触发ChannelWritabilityChanged事件。触发事件实际操作委托给通道的Channel管道。

添加刷新操作,即遍历未刷新写请求链表,将写请求添加到刷新链表中,如果写请求取消,则更新通道待发送字节数,如果待发送字节数消息,小于通道配置的写buf size,则更新通道可写状态。

移除操作,主要是从刷新写请求链移除链头写请求,并则释放写请求消息,更新写请求任务结果,当前通道待发送字节数和可写状态,并触发相应的事件

从刷新写请求链表,移除writtenBytes个字节数方法removeBytes,自旋,直至从刷新链中移除writtenBytes个字节数,如果链头消息的可读字节数小于writtenBytes,则移除写请求Entry,否则更新writtenBytes,继续从刷新链中的写请求消息中移除writtenBytes个字节数。

将刷新链上的写请求消息,添加到nio buffer数组中方法nioBuffers,主要是将刷新链上的写请求消息包装成direct buf添加到通道Outbound缓存区的nio buf数组中,这个方法主要在NioSocketChannel#doWrite方法重用。方法调用后,#nioBufferCount和#nioBufferSize,将返回当前nio buf数组的长度和可读字节数。




附:
package io.netty.util;

import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.lang.ref.WeakReference;
import java.util.Arrays;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import static io.netty.util.internal.MathUtil.safeFindNextPositivePowerOfTwo;
import static java.lang.Math.max;
import static java.lang.Math.min;

/**
 * Light-weight object pool based on a thread-local stack.
 *基于线程本地栈的轻量级对象池
 * @param <T> the type of the pooled object
 */
public abstract class Recycler<T> {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class);

    @SuppressWarnings("rawtypes")
    private static final Handle NOOP_HANDLE = new Handle() {
        @Override
        public void recycle(Object object) {
            // NOOP
        }
    };
    private static final AtomicInteger ID_GENERATOR = new AtomicInteger(Integer.MIN_VALUE);
    private static final int OWN_THREAD_ID = ID_GENERATOR.getAndIncrement();
    private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 32768; // Use 32k instances as default.
    private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
    private static final int INITIAL_CAPACITY;
    private static final int MAX_SHARED_CAPACITY_FACTOR;
    private static final int MAX_DELAYED_QUEUES_PER_THREAD;
    private static final int LINK_CAPACITY;
    private static final int RATIO;

    static {
        // In the future, we might have different maxCapacity for different object types.
        // e.g. io.netty.recycler.maxCapacity.writeTask
        //      io.netty.recycler.maxCapacity.outboundBuffer
        int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread",
                SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));
        if (maxCapacityPerThread < 0) {
            maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;
        }

        DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;

        MAX_SHARED_CAPACITY_FACTOR = max(2,
                SystemPropertyUtil.getInt("io.netty.recycler.maxSharedCapacityFactor",
                        2));

        MAX_DELAYED_QUEUES_PER_THREAD = max(0,
                SystemPropertyUtil.getInt("io.netty.recycler.maxDelayedQueuesPerThread",
                        // We use the same value as default EventLoop number
                        NettyRuntime.availableProcessors() * 2));

        LINK_CAPACITY = safeFindNextPositivePowerOfTwo(
                max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16));

        // By default we allow one push to a Recycler for each 8th try on handles that were never recycled before.
        // This should help to slowly increase the capacity of the recycler while not be too sensitive to allocation
        // bursts.
        RATIO = safeFindNextPositivePowerOfTwo(SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));

        if (logger.isDebugEnabled()) {
            if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) {
                logger.debug("-Dio.netty.recycler.maxCapacityPerThread: disabled");
                logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: disabled");
                logger.debug("-Dio.netty.recycler.linkCapacity: disabled");
                logger.debug("-Dio.netty.recycler.ratio: disabled");
            } else {
                logger.debug("-Dio.netty.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD);
                logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: {}", MAX_SHARED_CAPACITY_FACTOR);
                logger.debug("-Dio.netty.recycler.linkCapacity: {}", LINK_CAPACITY);
                logger.debug("-Dio.netty.recycler.ratio: {}", RATIO);
            }
        }

        INITIAL_CAPACITY = min(DEFAULT_MAX_CAPACITY_PER_THREAD, 256);
    }

    private final int maxCapacityPerThread;//每个线程的最大对象容量
    private final int maxSharedCapacityFactor;//容量共享因子
    private final int ratioMask;
    private final int maxDelayedQueuesPerThread;
    //回收器线程本地对象栈
    private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
        @Override
        protected Stack<T> initialValue() {
            return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
                    ratioMask, maxDelayedQueuesPerThread);
        }
    };

    protected Recycler() {
        this(DEFAULT_MAX_CAPACITY_PER_THREAD);
    }

    protected Recycler(int maxCapacityPerThread) {
        this(maxCapacityPerThread, MAX_SHARED_CAPACITY_FACTOR);
    }

    protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor) {
        this(maxCapacityPerThread, maxSharedCapacityFactor, RATIO, MAX_DELAYED_QUEUES_PER_THREAD);
    }

    protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
                       int ratio, int maxDelayedQueuesPerThread) {
        ratioMask = safeFindNextPositivePowerOfTwo(ratio) - 1;
        if (maxCapacityPerThread <= 0) {
            this.maxCapacityPerThread = 0;
            this.maxSharedCapacityFactor = 1;
            this.maxDelayedQueuesPerThread = 0;
        } else {
            this.maxCapacityPerThread = maxCapacityPerThread;
            this.maxSharedCapacityFactor = max(1, maxSharedCapacityFactor);
            this.maxDelayedQueuesPerThread = max(0, maxDelayedQueuesPerThread);
        }
    }
    //获取本线程的对象
    @SuppressWarnings("unchecked")
    public final T get() {
        if (maxCapacityPerThread == 0) {
	    //创建对象
            return newObject((Handle<T>) NOOP_HANDLE);
        }
	//否则从栈pop一个handle对象,
        Stack<T> stack = threadLocal.get();
        DefaultHandle<T> handle = stack.pop();
        if (handle == null) {
	    //handle对象为空,则创建对象
            handle = stack.newHandle();
            handle.value = newObject(handle);
        }
	//否则返回handle对象的值
        return (T) handle.value;
    }

    //创建对象,在子列扩展
    protected abstract T newObject(Handle<T> handle);

    public interface Handle<T> {
        void recycle(T object);
    }


    static final class DefaultHandle<T> implements Handle<T> {
        private int lastRecycledId;
        private int recycleId;

        boolean hasBeenRecycled;

        private Stack<?> stack;
        private Object value;//Handle对象值

        DefaultHandle(Stack<?> stack) {
            this.stack = stack;
        }

        @Override
        public void recycle(Object object) {
            if (object != value) {
                throw new IllegalArgumentException("object does not belong to handle");
            }
	    //回收对象,将对象放入线程本地栈
            stack.push(this);
        }
    }
     /**
     * @deprecated use {@link Handle#recycle(Object)}.
     回收对象
     */
    @Deprecated
    public final boolean recycle(T o, Handle<T> handle) {
        if (handle == NOOP_HANDLE) {
            return false;
        }

        DefaultHandle<T> h = (DefaultHandle<T>) handle;
        if (h.stack.parent != this) {
            return false;
        }

        h.recycle(o);
        return true;
    }
    //延时回收队列
    private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED =
            new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() {
        @Override
        protected Map<Stack<?>, WeakOrderQueue> initialValue() {
            return new WeakHashMap<Stack<?>, WeakOrderQueue>();
        }
    };

     static final class Stack<T> {

        // we keep a queue of per-thread queues, which is appended to once only, each time a new thread other
        // than the stack owner recycles: when we run out of items in our stack we iterate this collection
        // to scavenge those that can be reused. this permits us to incur minimal thread synchronisation whilst
        // still recycling all items.
        final Recycler<T> parent;
        final Thread thread;
        final AtomicInteger availableSharedCapacity;
        final int maxDelayedQueues;

        private final int maxCapacity;
        private final int ratioMask;
        private DefaultHandle<?>[] elements;
        private int size;
        private int handleRecycleCount = -1; // Start with -1 so the first one will be recycled.
        private WeakOrderQueue cursor, prev;
        private volatile WeakOrderQueue head;
	...
   }
    // a queue that makes only moderate guarantees about visibility: items are seen in the correct order,
    // but we aren't absolutely guaranteed to ever see anything at all, thereby keeping the queue cheap to maintain
    private static final class WeakOrderQueue {

        static final WeakOrderQueue DUMMY = new WeakOrderQueue();

        // Let Link extend AtomicInteger for intrinsics. The Link itself will be used as writerIndex.
        @SuppressWarnings("serial")
        private static final class Link extends AtomicInteger {
            private final DefaultHandle<?>[] elements = new DefaultHandle[LINK_CAPACITY];

            private int readIndex;
            private Link next;
        }

        // chain of data items
        private Link head, tail;
        // pointer to another queue of delayed items for the same stack
        private WeakOrderQueue next;
        private final WeakReference<Thread> owner;
        private final int id = ID_GENERATOR.getAndIncrement();
        private final AtomicInteger availableSharedCapacity;

        private WeakOrderQueue() {
            owner = null;
            availableSharedCapacity = null;
        }

        private WeakOrderQueue(Stack<?> stack, Thread thread) {
            head = tail = new Link();
            owner = new WeakReference<Thread>(thread);

            // Its important that we not store the Stack itself in the WeakOrderQueue as the Stack also is used in
            // the WeakHashMap as key. So just store the enclosed AtomicInteger which should allow to have the
            // Stack itself GCed.
            availableSharedCapacity = stack.availableSharedCapacity;
        }
	...
  }
...
}
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics