`
Donald_Draper
  • 浏览: 950829 次
社区版块
存档分类
最新评论

netty 抽象通道后续

阅读更多
netty Inboudn/Outbound通道Invoker:http://donald-draper.iteye.com/blog/2388233
netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
netty 通道Outbound缓冲区:http://donald-draper.iteye.com/blog/2393098
引言:
前一篇文章我们看了通道Outbound缓存区ChannelOutboundBuffer ,先来回顾一下:
通道Outbound缓存区内部关联一个通道,同时有一个线程本地buf数组,一个未刷新的buf链表和一个刷新buf链表。通道写消息时,消息将会被包装成写请求Entry。

添加消息到通道Outbound缓冲区,首先包装消息为写请求Entry,将写请求Entry添加到未刷新写请求链表上,并更新通道当前待发送的字节数据,如果通道待发送的字节数大于通道写bufsize,则更新通道写状态,并触发ChannelWritabilityChanged事件。触发事件实际操作委托给通道的Channel管道。

添加刷新操作,即遍历未刷新写请求链表,将写请求添加到刷新链表中,如果写请求取消,则更新通道待发送字节数,如果待发送字节数消息,小于通道配置的写buf size,则更新通道可写状态。

移除操作,主要是从刷新写请求链移除链头写请求,并则释放写请求消息,更新写请求任务结果,当前通道待发送字节数和可写状态,并触发相应的事件

从刷新写请求链表,移除writtenBytes个字节数方法removeBytes,自旋,直至从刷新链中移除writtenBytes个字节数,如果链头消息的可读字节数小于writtenBytes,则移除写请求Entry,否则更新writtenBytes,继续从刷新链中的写请求消息中移除writtenBytes个字节数。

将刷新链上的写请求消息,添加到nio buffer数组中方法nioBuffers,主要是将刷新链上的写请求消息包装成direct buf添加到通道Outbound缓存区的nio buf数组中,这个方法主要在NioSocketChannel#doWrite方法重用。方法调用后,#nioBufferCount和#nioBufferSize,将返回当前nio buf数组的长度和可读字节数。

看完了抽象Unsafe定义和通道Outbound缓冲区,今天我们回到抽象通道,接着抽象通道初始化,继续看抽象通道的其他方法

先把抽象通道的变量声明贴出来及构造,以便理解:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {  
  
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);  
  
    private static final ClosedChannelException FLUSH0_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(  
            new ClosedChannelException(), AbstractUnsafe.class, "flush0()");//flush0方法调用时,通道关闭异常  
    private static final ClosedChannelException ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(  
            new ClosedChannelException(), AbstractUnsafe.class, "ensureOpen(...)");//确保通道打开方法调用时,通道关闭异常  
    private static final ClosedChannelException CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(  
            new ClosedChannelException(), AbstractUnsafe.class, "close(...)");//close方法调用时,通道关闭异常  
    private static final ClosedChannelException WRITE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(  
            new ClosedChannelException(), AbstractUnsafe.class, "write(...)");//write方法调用时,通道关闭异常  
    private static final NotYetConnectedException FLUSH0_NOT_YET_CONNECTED_EXCEPTION = ThrowableUtil.unknownStackTrace(  
            new NotYetConnectedException(), AbstractUnsafe.class, "flush0()");//flush0方法调用时,通道还未连接异常  
  
    private final Channel parent;//所属通道  
    private final ChannelId id;//通道id  
    private final Unsafe unsafe;//硬件底层操作类  
    private final DefaultChannelPipeline pipeline;//Channel管道  
    private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);//空异步任务  
    private final CloseFuture closeFuture = new CloseFuture(this);//异步关闭任务  
  
    private volatile SocketAddress localAddress;//本地socket地址  
    private volatile SocketAddress remoteAddress;//远端socket地址  
    private volatile EventLoop eventLoop;//通道注册的事件循环  
    private volatile boolean registered;//是否注册  
    /** Cache for the string representation of this channel */  
    private boolean strValActive;  
    private String strVal;  
    ...  
        /** 
     * Creates a new instance. 
     * 
     * @param parent 
     *        the parent of this channel. {@code null} if there's no parent. 
     */  
    protected AbstractChannel(Channel parent) {  
        this.parent = parent;  
        //创建通道id  
        id = newId();  
        //创建底层操作类unsafe  
        unsafe = newUnsafe();  
        //新建Channel管道  
        pipeline = newChannelPipeline();  
    }  
      
    /** 
     * Creates a new instance. 
     * 
     * @param parent 
     *        the parent of this channel. {@code null} if there's no parent. 
     */  
    protected AbstractChannel(Channel parent, ChannelId id) {  
        this.parent = parent;  
        this.id = id;  
        unsafe = newUnsafe();  
        pipeline = newChannelPipeline();  
    }  
}  

上面抽象通道的变量和构造有单独的文章讲述,这里不再说:
来看其他方法
//判断通道是否可写
@Override
public boolean isWritable() {
    //如果unsafe关联的通道Outbound 缓冲区不为空,且可写返回true
    ChannelOutboundBuffer buf = unsafe.outboundBuffer();
    return buf != null && buf.isWritable();
}

//直到通道可写前,通道Outbound 缓冲区的字节数。如果通道不可写,则返回0 
@Override
public long bytesBeforeUnwritable() {
    ChannelOutboundBuffer buf = unsafe.outboundBuffer();
    // isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable.
    // We should be consistent with that here.
    return buf != null ? buf.bytesBeforeUnwritable() : 0;
}
//获取直到通道可写,通道底层buf有多少字节数据需要发送。如果可写返回0
@Override
public long bytesBeforeWritable() {
    ChannelOutboundBuffer buf = unsafe.outboundBuffer();
    // isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable.
    // We should be consistent with that here.
    return buf != null ? buf.bytesBeforeWritable() : Long.MAX_VALUE;
}
//获取所属通道
@Override
public Channel parent() {
    return parent;
}
//获取通道内部Channel管道
@Override
public ChannelPipeline pipeline() {
    return pipeline;
}
//获取通道字节buf分配器
@Override
public ByteBufAllocator alloc() {
    return config().getAllocator();
}
//获取通道所在的事件循环
@Override
public EventLoop eventLoop() {
    EventLoop eventLoop = this.eventLoop;
    if (eventLoop == null) {
        throw new IllegalStateException("channel not registered to an event loop");
    }
    return eventLoop;
}
//获取通道本地地址
@Override
public SocketAddress localAddress() {
    SocketAddress localAddress = this.localAddress;
    if (localAddress == null) {
        try {
            this.localAddress = localAddress = unsafe().localAddress();
        } catch (Throwable t) {
            // Sometimes fails on a closed socket in Windows.
            return null;
        }
    }
    return localAddress;
}

/**
 * @deprecated no use-case for this.
 */
@Deprecated
protected void invalidateLocalAddress() {
    localAddress = null;
}
//获取远端socket地址
@Override
public SocketAddress remoteAddress() {
    SocketAddress remoteAddress = this.remoteAddress;
    if (remoteAddress == null) {
        try {
            this.remoteAddress = remoteAddress = unsafe().remoteAddress();
        } catch (Throwable t) {
            // Sometimes fails on a closed socket in Windows.
            return null;
        }
    }
    return remoteAddress;
}

/**
 * @deprecated no use-case for this.
 */
@Deprecated
protected void invalidateRemoteAddress() {
    remoteAddress = null;
}
//判断通道是否注册到事件循环
@Override
public boolean isRegistered() {
    return registered;
}
//绑定本地socket地址
@Override
public ChannelFuture bind(SocketAddress localAddress) {
    return pipeline.bind(localAddress);
}
//连接远端socket地址
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
    return pipeline.connect(remoteAddress);
}
//上面两个方法的合体
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
    return pipeline.connect(remoteAddress, localAddress);
}
//断开通道连接
@Override
public ChannelFuture disconnect() {
    return pipeline.disconnect();
}
//关闭通道
@Override
public ChannelFuture close() {
    return pipeline.close();
}
//从事件循环反注册
@Override
public ChannelFuture deregister() {
    return pipeline.deregister();
}
//刷新通道
@Override
public Channel flush() {
    pipeline.flush();
    return this;
}
下面几个方法与上面不同是,带了一个异步任务结果,在操作完成时,通道任务结果
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.bind(localAddress, promise);
}

@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return pipeline.connect(remoteAddress, promise);
}

@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.connect(remoteAddress, localAddress, promise);
}

@Override
public ChannelFuture disconnect(ChannelPromise promise) {
    return pipeline.disconnect(promise);
}

@Override
public ChannelFuture close(ChannelPromise promise) {
    return pipeline.close(promise);
}

@Override
public ChannelFuture deregister(ChannelPromise promise) {
    return pipeline.deregister(promise);
}
//通道读操作
@Override
public Channel read() {
    pipeline.read();
    return this;
}
//写消息
@Override
public ChannelFuture write(Object msg) {
    return pipeline.write(msg);
}

@Override
public ChannelFuture write(Object msg, ChannelPromise promise) {
    return pipeline.write(msg, promise);
}
//写并刷新消息
@Override
public ChannelFuture writeAndFlush(Object msg) {
    return pipeline.writeAndFlush(msg);
}

@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
    return pipeline.writeAndFlush(msg, promise);
}
//创建异步可写任务
@Override
public ChannelPromise newPromise() {
    return pipeline.newPromise();
}
//创建异步可写进度任务
@Override
public ChannelProgressivePromise newProgressivePromise() {
    return pipeline.newProgressivePromise();
}
//创建已经成功的任务
@Override
public ChannelFuture newSucceededFuture() {
    return pipeline.newSucceededFuture();
}
//创建已经失败的任务
@Override
public ChannelFuture newFailedFuture(Throwable cause) {
    return pipeline.newFailedFuture(cause);
}
//获取关闭异步任务
@Override
public ChannelFuture closeFuture() {
    return closeFuture;
}
//获取通道unsafe
@Override
public Unsafe unsafe() {
    return unsafe;
}
//创建空异步任务
 @Override
public final ChannelPromise voidPromise() {
    return pipeline.voidPromise();
}

在上面我们看到好多方法时委托给通道的Channel管道,这是因为:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {


public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {


即通道实际上是一个ChannelOutboundInvoker,当通道相关操作发生时,触发通道相关操作事件。

从上面可以看出,通道的绑定操作、连接,写消息,读操作,刷新操作,反注册、断开连接,关闭通道等操作事件实际调用通道的Channel管道的相关方法,即触发通道相关事件,这些方法是重写了通道OutboundInvoker的相关方法。在抽象Unsafe那篇文章中,我们看到其内部也有绑定、注册,读操作,写操作和关闭操作,这些是通道的实际操作方法。


总结:

通道的绑定操作、连接,写消息,读操作,刷新操作,反注册、断开连接,关闭通道等操作事件实际调用通道的Channel管道的相关方法,即触发通道相关事件,这些方法是重写了通道OutboundInvoker的相关方法。在抽象Unsafe那篇文章中,我们看到其内部也有绑定、注册,读操作,写操作和关闭操作,这些是通道的实际操作方法。

0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics