`
Donald_Draper
  • 浏览: 950775 次
社区版块
存档分类
最新评论

netty 抽象nio字节通道

阅读更多
netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
netty 通道Outbound缓冲区:http://donald-draper.iteye.com/blog/2393098
netty 抽象通道后续:http://donald-draper.iteye.com/blog/2393166
netty 抽象nio通道:http://donald-draper.iteye.com/blog/2393269
引言
前一篇文章,我们看了抽象nio通道,先来回顾一下:
抽象nio通道AbstractNioChannel内部关联一个可选择通道(SelectableChannel)和一个选择key(selectionKey)。抽象Nio通道构造,主要是初始化通道并配置为非阻塞模式。

注册doRegister工作主要是,注册可选择通道到通道所在事件循环的选择器中。反注册doDeregister,委托给事件循环,取消选择key,即从事件循环关联选择器的选择key集合中移除当前选择key。开始读操作doBeginRead,实际工作为将读操作事件,添加选择key的兴趣事件集

抽象nioUnsafe为特殊的Unsafe,允许访问底层的选择通道。选择通道方法返回的实际为抽象nio通道内部的底层可选择通道。移除读兴趣事件removeReadOp,即从选择key兴趣事件集中,移除读操作事件。连接操作,将实际连接操作委托给doConnect,待子类实现,如果连接成功,则通知异步任务连接成功,如果是第一次连接,则触发通道的激活事件fireChannelActive。完成连接操作,实际工作委托给抽象Nio通道的doFinishConnect方法,待子类实现,完成后更新任务结果,触发通道的激活事件fireChannelActive,如果出现异常,则更新连接任务为异常失败。

今天我们来看一下抽象nio字节通道AbstractNioByteChannel
package io.netty.channel.nio;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.FileRegion;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.ChannelInputShutdownReadComplete;
import io.netty.util.internal.StringUtil;

import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;

/**
 * {@link AbstractNioChannel} base class for {@link Channel}s that operate on bytes.
 */
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
    private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);//通道元数据
    private static final String EXPECTED_TYPES =
            " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
            StringUtil.simpleClassName(FileRegion.class) + ')';

    private Runnable flushTask;//刷新任务
     /**
     * Create a new instance
     *创建nio字节通道
     * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
     * @param ch                the underlying {@link SelectableChannel} on which it operates
     */
    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        //读操作事件为SelectionKey.OP_READ
        super(parent, ch, SelectionKey.OP_READ);
    }
}

来看实际写操作:

@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    int writeSpinCount = -1;//写操作自旋此数计数器

    boolean setOpWrite = false;
    for (;;) {
        //获取当前通道Outbound缓冲区中的当前待刷新消息
        Object msg = in.current();
        if (msg == null) {
            // Wrote all messages.
	    //清除通道选择key兴趣事件集中写操作事件
            clearOpWrite();
            // Directly return here so incompleteWrite(...) is not called.
            return;
        }

        if (msg instanceof ByteBuf) {//如果为字节buf
            ByteBuf buf = (ByteBuf) msg;
            int readableBytes = buf.readableBytes();
            if (readableBytes == 0) {
	        //可读字节数为0,则从通道Outbound缓存区中移除写请求
                in.remove();
                continue;
            }

            boolean done = false;
            long flushedAmount = 0;
            if (writeSpinCount == -1) {
	        //获取通道配置的写请求自旋次数
                writeSpinCount = config().getWriteSpinCount();
            }
            for (int i = writeSpinCount - 1; i >= 0; i --) {
	        //实际发送字节数据
                int localFlushedAmount = doWriteBytes(buf);
                if (localFlushedAmount == 0) {
		    //没有字节要发送
                    setOpWrite = true;
                    break;
                }
                //更新刷新字节计数器
                flushedAmount += localFlushedAmount;
                if (!buf.isReadable()) {
		    //如果buf不可读,则数据发送完毕
                    done = true;
                    break;
                }
            }
            //报告字节数据发送进度
            in.progress(flushedAmount);

            if (done) {
	        //发送完,则移除写请求
                in.remove();
            } else {
                // Break the loop and so incompleteWrite(...) is called.
                break;
            }
        } else if (msg instanceof FileRegion) {
	    //如果是文件Region消息
            FileRegion region = (FileRegion) msg;
            boolean done = region.transferred() >= region.count();
            if (!done) {
                long flushedAmount = 0;
                if (writeSpinCount == -1) {
                    writeSpinCount = config().getWriteSpinCount();
                }
                for (int i = writeSpinCount - 1; i >= 0; i--) {
		   //完成实际写操作
                    long localFlushedAmount = doWriteFileRegion(region);
                    if (localFlushedAmount == 0) {
                        setOpWrite = true;
                        break;
                    }
                    //更新刷新字节计数器
                    flushedAmount += localFlushedAmount;
                    if (region.transferred() >= region.count()) {
                        done = true;
                        break;
                    }
                }
                //更新数据发送进度
                in.progress(flushedAmount);
            }

            if (done) {
	        //发送完,则移除写请求
                in.remove();
            } else {
                // Break the loop and so incompleteWrite(...) is called.
                break;
            }
        } else {
            // Should not reach here.
            throw new Error();
        }
    }
    //完成写后继任务
    incompleteWrite(setOpWrite);
}

/**
 * Write a {@link FileRegion}
 *写一个文件region,待子类扩展
 * @param region        the {@link FileRegion} from which the bytes should be written
 * @return amount       the amount of written bytes
 */
protected abstract long doWriteFileRegion(FileRegion region) throws Exception;

/**
 * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
 写一个字节buf,待子类扩展
 * @param buf           the {@link ByteBuf} from which the bytes should be written
 * @return amount       the amount of written bytes
 */
protected abstract int doWriteBytes(ByteBuf buf) throws Exception;


//完成写后继任务
protected final void incompleteWrite(boolean setOpWrite) {
    // Did not write completely.
    if (setOpWrite) {
        //发送数据完成,则重新添加写操作事件到选择key兴趣事件集
        setOpWrite();
    } else {
        //否则,继续刷新通道Outbound缓冲区中的写请求
        // Schedule flush again later so other tasks can be picked up in the meantime
        Runnable flushTask = this.flushTask;
        if (flushTask == null) {
            flushTask = this.flushTask = new Runnable() {
                @Override
                public void run() {
                    flush();
                }
            };
        }
        eventLoop().execute(flushTask);
    }
}
//添加写操作事件到选择key兴趣事件集
protected final void setOpWrite() {
    final SelectionKey key = selectionKey();
    // Check first if the key is still valid as it may be canceled as part of the deregistration
    // from the EventLoop
    // See https://github.com/netty/netty/issues/2104
    if (!key.isValid()) {
        return;
    }
    final int interestOps = key.interestOps();
    if ((interestOps & SelectionKey.OP_WRITE) == 0) {
        key.interestOps(interestOps | SelectionKey.OP_WRITE);
    }
}
//清除通道选择key兴趣事件集中写操作事件
protected final void clearOpWrite() {
    final SelectionKey key = selectionKey();
    // Check first if the key is still valid as it may be canceled as part of the deregistration
    // from the EventLoop
    // See https://github.com/netty/netty/issues/2104
    if (!key.isValid()) {
        return;
    }
    final int interestOps = key.interestOps();
    if ((interestOps & SelectionKey.OP_WRITE) != 0) {
        key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
    }
}

从上面可以看出,写通道Outbound缓冲区,即遍历刷新链上的写请求,如果写请求消息为
字节buf,则调用doWriteBytes完成实际数据发送操作,待子类扩展,如果写请求消息为文件Region,调用doWriteFileRegion完成实际数据发送操作,待子类扩展,数据发送,则更新通道的数据发送进度,并从刷新链上移除写请求;如果所有写请求发送完毕,则重新添加写操作事件到选择key兴趣事件集,否则继续刷新通道Outbound缓冲区中的写请求。

//过滤消息
@Override
protected final Object filterOutboundMessage(Object msg) {
    if (msg instanceof ByteBuf) {
        ByteBuf buf = (ByteBuf) msg;
	//如果消息为direct buf,直接返回消息
        if (buf.isDirect()) {
            return msg;
        }
        //否则包装buf为direct buf,这个newDirectBuffer方法在上一篇文章中以说
        return newDirectBuffer(buf);
    }

    if (msg instanceof FileRegion) {
        //如果是文件Region直接返回
        return msg;
    }

    throw new UnsupportedOperationException(
            "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}


再看其他方法:
//获取通道元数据
@Override
public ChannelMetadata metadata() {
    return METADATA;
}
/**
 * Shutdown the input side of the channel.
 关闭通道输入流
 */
protected abstract ChannelFuture shutdownInput();
//通道是否关闭输入流
protected boolean isInputShutdown0() {
    return false;
}
//创建底层通道操作类Unsafe
@Override
protected AbstractNioUnsafe newUnsafe() {
    return new NioByteUnsafe();
}

从newUnsafe方法,返回的为NioByteUnsafe
我们来看nio字节Unsafe:
protected class NioByteUnsafe extends AbstractNioUnsafe {
    //读操作
    @Override
    public final void read() {
        //获取通道配置,Channel管道,字节buf分配器,接受字节buf分配器Handle
        final ChannelConfig config = config();
        final ChannelPipeline pipeline = pipeline();
        final ByteBufAllocator allocator = config.getAllocator();
        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        allocHandle.reset(config);

        ByteBuf byteBuf = null;
        boolean close = false;
        try {
            do {
	        //分配一个字节buf
                byteBuf = allocHandle.allocate(allocator);
		//读取通道接收缓冲区数据
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                if (allocHandle.lastBytesRead() <= 0) {
                    // nothing was read. release the buffer.
		    //没有数据可读,则释放buf
                    byteBuf.release();
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    break;
                }
                //更新读取消息计数器
                allocHandle.incMessagesRead(1);
                readPending = false;
		//通知通道处理读取数据,触发Channel管道线的fireChannelRead事件
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;
            } while (allocHandle.continueReading());
            //读取操作完毕
            allocHandle.readComplete();
	    //触发Channel管道线的fireChannelReadComplete事件
            pipeline.fireChannelReadComplete();

            if (close) {
	        //如果通道关闭,关闭读操作
                closeOnRead(pipeline);
            }
        } catch (Throwable t) {
	    //处理读操作异常
            handleReadException(pipeline, byteBuf, t, close, allocHandle);
        } finally {
            // Check if there is a readPending which was not processed yet.
            // This could be for two reasons:
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
            //
            // See https://github.com/netty/netty/issues/2254
            if (!readPending && !config.isAutoRead()) {
	        //读操作完毕,且没有配置自动读,则从选择key兴趣集中移除读操作事件
                removeReadOp();
            }
        }
    }
     //关闭读操作
    private void closeOnRead(ChannelPipeline pipeline) {
        if (!isInputShutdown0()) {
            if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
	        //关闭通道输入流
                shutdownInput();
		//触发通道输入关闭事件
                pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
            } else {
                close(voidPromise());
            }
        } else {
	    //触发通道输入关闭没有数据可读取事件
            pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
        }
    }
    //处理读操作异常
     private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
            RecvByteBufAllocator.Handle allocHandle) {
        if (byteBuf != null) {
            if (byteBuf.isReadable()) {
                readPending = false;
		//字节buf中还有数据没处理,则继续处理
                pipeline.fireChannelRead(byteBuf);
            } else {
                byteBuf.release();
            }
        }
	//读取操作完毕,触发Channel管道线的fireChannelReadComplete事件及fireExceptionCaught事件
        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();
        pipeline.fireExceptionCaught(cause);
        if (close || cause instanceof IOException) { 
	   //如果需要,关闭读操作
            closeOnRead(pipeline);
        }
    } 
}

从上面可以看出,nio字节Unsafe读操作,从通道接收缓冲区读取数据,
通知通道处理读取数据,触发Channel管道线的fireChannelRead事件,待数据读取完毕,
触发Channel管道线的fireChannelReadComplete事件,如果在读数据的过程中,通道关闭,则
触发通道输入关闭事件(fireUserEventTriggered),如果在读数据的过程中,发生异常,则
读取缓存区中没有读完的数据,并通道通道处理剩余数据。

总结:

写通道Outbound缓冲区,即遍历刷新链上的写请求,如果写请求消息为字节buf,则调用doWriteBytes完成实际数据发送操作,待子类扩展,如果写请求消息为文件Region,调用doWriteFileRegion完成实际数据发送操作,待子类扩展,数据发送,则更新通道的数据发送进度,并从刷新链上移除写请求;如果所有写请求发送完毕,则重新添加写操作事件到选择key兴趣事件集,否则继续刷新通道Outbound缓冲区中的写请求。

nio字节Unsafe读操作,从通道接收缓冲区读取数据,通知通道处理读取数据,触发Channel管道线的fireChannelRead事件,待数据读取完毕,触发Channel管道线的fireChannelReadComplete事件,如果在读数据的过程中,通道关闭,则触发通道输入关闭事件(fireUserEventTriggered),如果在读数据的过程中,发生异常,则读取缓存区中没有读完的数据,并通道通道处理剩余数据。


附:

//ChannelMetadata
/**
 * Represents the properties of a {@link Channel} implementation.
 */
public final class ChannelMetadata {

    private final boolean hasDisconnect;//是否基于无连接的通信,fasle-》TCP,true-》UDP
    private final int defaultMaxMessagesPerRead;//每次读取,允许读取的最大消息数
    /**
     * Create a new instance
     *
     * @param hasDisconnect     {@code true} if and only if the channel has the {@code disconnect()} operation
     *                          that allows a user to disconnect and then call {@link Channel#connect(SocketAddress)}
     *                          again, such as UDP/IP.
     * @param defaultMaxMessagesPerRead If a {@link MaxMessagesRecvByteBufAllocator} is in use, then this value will be
     * set for {@link MaxMessagesRecvByteBufAllocator#maxMessagesPerRead()}. Must be {@code > 0}.
     */
    public ChannelMetadata(boolean hasDisconnect, int defaultMaxMessagesPerRead) {
        if (defaultMaxMessagesPerRead <= 0) {
            throw new IllegalArgumentException("defaultMaxMessagesPerRead: " + defaultMaxMessagesPerRead +
                                               " (expected > 0)");
        }
        this.hasDisconnect = hasDisconnect;
        this.defaultMaxMessagesPerRead = defaultMaxMessagesPerRead;
    }
    ...
}

//ChannelInputShutdownReadComplete
package io.netty.channel.socket;

/**
 * User event that signifies the channel's input side is shutdown, 
 and we tried to shut it down again. This typically
 * indicates that there is no more data to read.
 */
public final class ChannelInputShutdownReadComplete {
    public static final ChannelInputShutdownReadComplete INSTANCE = new ChannelInputShutdownReadComplete();

    private ChannelInputShutdownReadComplete() {
    }
}


//ChannelInputShutdownEvent
package io.netty.channel.socket;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;

/**
 * Special event which will be fired and passed to the
 * {@link ChannelInboundHandler#userEventTriggered(ChannelHandlerContext, Object)} methods once the input of
 * a {@link SocketChannel} was shutdown and the {@link SocketChannelConfig#isAllowHalfClosure()} method returns
 * {@code true}.
 */
public final class ChannelInputShutdownEvent {

    /**
     * Instance to use
     */
    @SuppressWarnings("InstantiationOfUtilityClass")
    public static final ChannelInputShutdownEvent INSTANCE = new ChannelInputShutdownEvent();

    private ChannelInputShutdownEvent() { }
}
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics