netty 抽象通道后续

netty Inboudn/Outbound通道Invoker:http://donald-draper.iteye.com/blog/2388233
netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
netty 通道Outbound缓冲区:http://donald-draper.iteye.com/blog/2393098
前一篇文章我们看了通道Outbound缓存区ChannelOutboundBuffer ,先来回顾一下:


添加刷新操作,即遍历未刷新写请求链表,将写请求添加到刷新链表中,如果写请求取消,则更新通道待发送字节数,如果待发送字节数消息,小于通道配置的写buf size,则更新通道可写状态。



将刷新链上的写请求消息,添加到nio buffer数组中方法nioBuffers,主要是将刷新链上的写请求消息包装成direct buf添加到通道Outbound缓存区的nio buf数组中,这个方法主要在NioSocketChannel#doWrite方法重用。方法调用后,#nioBufferCount和#nioBufferSize,将返回当前nio buf数组的长度和可读字节数。


public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {  
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);  
    private static final ClosedChannelException FLUSH0_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(  
            new ClosedChannelException(), AbstractUnsafe.class, "flush0()");//flush0方法调用时,通道关闭异常  
    private static final ClosedChannelException ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(  
            new ClosedChannelException(), AbstractUnsafe.class, "ensureOpen(...)");//确保通道打开方法调用时,通道关闭异常  
    private static final ClosedChannelException CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(  
            new ClosedChannelException(), AbstractUnsafe.class, "close(...)");//close方法调用时,通道关闭异常  
    private static final ClosedChannelException WRITE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(  
            new ClosedChannelException(), AbstractUnsafe.class, "write(...)");//write方法调用时,通道关闭异常  
    private static final NotYetConnectedException FLUSH0_NOT_YET_CONNECTED_EXCEPTION = ThrowableUtil.unknownStackTrace(  
            new NotYetConnectedException(), AbstractUnsafe.class, "flush0()");//flush0方法调用时,通道还未连接异常  
    private final Channel parent;//所属通道  
    private final ChannelId id;//通道id  
    private final Unsafe unsafe;//硬件底层操作类  
    private final DefaultChannelPipeline pipeline;//Channel管道  
    private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);//空异步任务  
    private final CloseFuture closeFuture = new CloseFuture(this);//异步关闭任务  
    private volatile SocketAddress localAddress;//本地socket地址  
    private volatile SocketAddress remoteAddress;//远端socket地址  
    private volatile EventLoop eventLoop;//通道注册的事件循环  
    private volatile boolean registered;//是否注册  
    /** Cache for the string representation of this channel */  
    private boolean strValActive;  
    private String strVal;  
     * Creates a new instance. 
     * @param parent 
     *        the parent of this channel. {@code null} if there's no parent. 
    protected AbstractChannel(Channel parent) {  
        this.parent = parent;  
        id = newId();  
        unsafe = newUnsafe();  
        pipeline = newChannelPipeline();  
     * Creates a new instance. 
     * @param parent 
     *        the parent of this channel. {@code null} if there's no parent. 
    protected AbstractChannel(Channel parent, ChannelId id) {  
        this.parent = parent;  
        this.id = id;  
        unsafe = newUnsafe();  
        pipeline = newChannelPipeline();  

public boolean isWritable() {
    //如果unsafe关联的通道Outbound 缓冲区不为空,且可写返回true
    ChannelOutboundBuffer buf = unsafe.outboundBuffer();
    return buf != null && buf.isWritable();

//直到通道可写前,通道Outbound 缓冲区的字节数。如果通道不可写,则返回0 
public long bytesBeforeUnwritable() {
    ChannelOutboundBuffer buf = unsafe.outboundBuffer();
    // isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable.
    // We should be consistent with that here.
    return buf != null ? buf.bytesBeforeUnwritable() : 0;
public long bytesBeforeWritable() {
    ChannelOutboundBuffer buf = unsafe.outboundBuffer();
    // isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable.
    // We should be consistent with that here.
    return buf != null ? buf.bytesBeforeWritable() : Long.MAX_VALUE;
public Channel parent() {
    return parent;
public ChannelPipeline pipeline() {
    return pipeline;
public ByteBufAllocator alloc() {
    return config().getAllocator();
public EventLoop eventLoop() {
    EventLoop eventLoop = this.eventLoop;
    if (eventLoop == null) {
        throw new IllegalStateException("channel not registered to an event loop");
    return eventLoop;
public SocketAddress localAddress() {
    SocketAddress localAddress = this.localAddress;
    if (localAddress == null) {
        try {
            this.localAddress = localAddress = unsafe().localAddress();
        } catch (Throwable t) {
            // Sometimes fails on a closed socket in Windows.
            return null;
    return localAddress;

 * @deprecated no use-case for this.
protected void invalidateLocalAddress() {
    localAddress = null;
public SocketAddress remoteAddress() {
    SocketAddress remoteAddress = this.remoteAddress;
    if (remoteAddress == null) {
        try {
            this.remoteAddress = remoteAddress = unsafe().remoteAddress();
        } catch (Throwable t) {
            // Sometimes fails on a closed socket in Windows.
            return null;
    return remoteAddress;

 * @deprecated no use-case for this.
protected void invalidateRemoteAddress() {
    remoteAddress = null;
public boolean isRegistered() {
    return registered;
public ChannelFuture bind(SocketAddress localAddress) {
    return pipeline.bind(localAddress);
public ChannelFuture connect(SocketAddress remoteAddress) {
    return pipeline.connect(remoteAddress);
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
    return pipeline.connect(remoteAddress, localAddress);
public ChannelFuture disconnect() {
    return pipeline.disconnect();
public ChannelFuture close() {
    return pipeline.close();
public ChannelFuture deregister() {
    return pipeline.deregister();
public Channel flush() {
    return this;
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.bind(localAddress, promise);

public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return pipeline.connect(remoteAddress, promise);

public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.connect(remoteAddress, localAddress, promise);

public ChannelFuture disconnect(ChannelPromise promise) {
    return pipeline.disconnect(promise);

public ChannelFuture close(ChannelPromise promise) {
    return pipeline.close(promise);

public ChannelFuture deregister(ChannelPromise promise) {
    return pipeline.deregister(promise);
public Channel read() {
    return this;
public ChannelFuture write(Object msg) {
    return pipeline.write(msg);

public ChannelFuture write(Object msg, ChannelPromise promise) {
    return pipeline.write(msg, promise);
public ChannelFuture writeAndFlush(Object msg) {
    return pipeline.writeAndFlush(msg);

public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
    return pipeline.writeAndFlush(msg, promise);
public ChannelPromise newPromise() {
    return pipeline.newPromise();
public ChannelProgressivePromise newProgressivePromise() {
    return pipeline.newProgressivePromise();
public ChannelFuture newSucceededFuture() {
    return pipeline.newSucceededFuture();
public ChannelFuture newFailedFuture(Throwable cause) {
    return pipeline.newFailedFuture(cause);
public ChannelFuture closeFuture() {
    return closeFuture;
public Unsafe unsafe() {
    return unsafe;
public final ChannelPromise voidPromise() {
    return pipeline.voidPromise();

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {







