`
Donald_Draper
  • 浏览: 950739 次
社区版块
存档分类
最新评论

netty 抽象nio消息通道

阅读更多
netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
netty 通道Outbound缓冲区:http://donald-draper.iteye.com/blog/2393098
netty 抽象通道后续:http://donald-draper.iteye.com/blog/2393166
netty 抽象nio通道:http://donald-draper.iteye.com/blog/2393269
netty 抽象nio字节通道:http://donald-draper.iteye.com/blog/2393323
引言
上一篇文章我们看了抽象nio字节通道,先来回顾一下:
写通道Outbound缓冲区,即遍历刷新链上的写请求,如果写请求消息为字节buf,则调用doWriteBytes完成实际数据发送操作,待子类扩展,如果写请求消息为文件Region,调用doWriteFileRegion完成实际数据发送操作,待子类扩展,数据发送,则更新通道的数据发送进度,并从刷新链上移除写请求;如果所有写请求发送完毕,则重新添加写操作事件到选择key兴趣事件集,否则继续刷新通道Outbound缓冲区中的写请求。

nio字节Unsafe读操作,从通道接收缓冲区读取数据,通知通道处理读取数据,触发Channel管道线的fireChannelRead事件,待数据读取完毕,触发Channel管道线的fireChannelReadComplete事件,如果在读数据的过程中,通道关闭,则触发通道输入关闭事件(fireUserEventTriggered),如果在读数据的过程中,发生异常,则读取缓存区中没有读完的数据,并通道通道处理剩余数据。
抽象nio字节通道是面向字节的通道,为Socket通道的父类,
今天我们来看ServerSocket通道的父类AbstractNioMessageChannel,面向消息的通道:
package io.netty.channel.nio;

import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.ServerChannel;

import java.io.IOException;
import java.net.PortUnreachableException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.util.ArrayList;
import java.util.List;

/**
 * {@link AbstractNioChannel} base class for {@link Channel}s that operate on messages.
 */
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
    boolean inputShutdown;//是否关闭输入流

    /**
     * @see AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int)
     */
    protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent, ch, readInterestOp);
    }
}

来看实际写操作:
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    final SelectionKey key = selectionKey();
    final int interestOps = key.interestOps();

    for (;;) {
        Object msg = in.current();
        if (msg == null) {
            // Wrote all messages.
            if ((interestOps & SelectionKey.OP_WRITE) != 0) {
	       //消息已发送完,从选择key兴趣集中移除写操作事件
                key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
            }
            break;
        }
        try {
            boolean done = false;
            for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
	        //写消息
                if (doWriteMessage(msg, in)) {
                    done = true;
                    break;
                }
            }

            if (done) {//如果读完消息,则从通道刷新链上移除写请求
                in.remove();
            } else {
                // Did not write all messages.
                if ((interestOps & SelectionKey.OP_WRITE) == 0) {
		    //消息没发送完,如果需要添加写事件到选择key的兴趣事件集
                    key.interestOps(interestOps | SelectionKey.OP_WRITE);
                }
                break;
            }
        } catch (IOException e) {
            if (continueOnWriteError()) {
	        //如果写异常时需要移除写请求,则移除
                in.remove(e);
            } else {
                throw e;
            }
        }
    }
}

/**
 * Write a message to the underlying {@link java.nio.channels.Channel}.
 *写一个消息到底层通道,待子类扩展
 * @return {@code true} if and only if the message has been written
 */
protected abstract boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception;
/**

 * Returns {@code true} if we should continue the write loop on a write error.
 */
protected boolean continueOnWriteError() {
    return false;
}

从上面可以看出 抽象Nio消息通道,写通道Outbound缓冲区消息,即遍历通道Outbound缓冲区刷新链,当写消息请求为空时,从选择key兴趣集中移除写操作事件,否则,委托doWriteMessage方法,将消息写到底层通道,doWriteMessage方法待子类扩展,写完,将写请求从刷新链上移除,否则,如果需要,添加写事件到选择key的兴趣事件集。

再来看其他方法:
//开始读操作
@Override
protected void doBeginRead() throws Exception {
    if (inputShutdown) {
        return;
    }
    super.doBeginRead();
}
//创建与底层通道交流的Unsafe
@Override
protected AbstractNioUnsafe newUnsafe() {
    return new NioMessageUnsafe();
}


从上面的方法可以看出,实际返回的为nio消息Unsafe,我们来看NioMessageUnsafe,
private final class NioMessageUnsafe extends AbstractNioUnsafe {
    private final List<Object> readBuf = new ArrayList<Object>();
    @Override
    public void read() {
        assert eventLoop().inEventLoop();
	//获取通道配置,Channel管道,接受字节buf分配器Handle
        final ChannelConfig config = config();
        final ChannelPipeline pipeline = pipeline();
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        allocHandle.reset(config);
        boolean closed = false;
        Throwable exception = null;
        try {
            try {
                do {
		    //从通道缓冲区读取数据
                    int localRead = doReadMessages(readBuf);
                    if (localRead == 0) {
		       //没有数据可读取
                        break;
                    }
                    if (localRead < 0) {
		       //通道已关闭
                        closed = true;
                        break;
                    }
                    //更新读取消息计数器
                    allocHandle.incMessagesRead(localRead);
                } while (allocHandle.continueReading());
            } catch (Throwable t) {
                exception = t;
            }
            
            int size = readBuf.size();
	    //遍历读取的消息集,通知通道处理消息,即触发管道fireChannelRead事件
            for (int i = 0; i < size; i ++) {
                readPending = false;
                pipeline.fireChannelRead(readBuf.get(i));
            }
            readBuf.clear();
            allocHandle.readComplete();
	    //读取完毕,触发管道fireChannelReadComplete事件
            pipeline.fireChannelReadComplete();

            if (exception != null) {
	        //根据异常判断是否需要,关闭读任务
                closed = closeOnReadError(exception);
                //触发通道fireExceptionCaught事件
                pipeline.fireExceptionCaught(exception);
            }

            if (closed) {
	        //关闭读任务
                inputShutdown = true;
                if (isOpen()) {
                    close(voidPromise());
                }
            }
        } finally {
            // Check if there is a readPending which was not processed yet.
            // This could be for two reasons:
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
            //
            // See https://github.com/netty/netty/issues/2254
            if (!readPending && !config.isAutoRead()) {
	        //如果读任务完毕,且不需自动读,则从选择key兴趣事件集移除读操作事件
                removeReadOp();
            }
        }
    }
}

//AbstractNioMessageChannel

/**
 * Read messages into the given array and return the amount which was read.
 从通道缓冲区读取消息,方法指定的buf集合中,并返回读取的消息数量,待子类扩展
 */
protected abstract int doReadMessages(List<Object> buf) throws Exception;


//判断异常发生时,是否需要关闭读任务
protected boolean closeOnReadError(Throwable cause) {
    // ServerChannel should not be closed even on IOException because it can often continue
    // accepting incoming connections. (e.g. too many open files)
    return cause instanceof IOException &&
            !(cause instanceof PortUnreachableException) &&
            !(this instanceof ServerChannel);
}

从上面可以看出:nio消息Unsafe读操作,从通道接收缓冲区读取数据,
通知通道处理读取数据,触发Channel管道线的fireChannelRead事件,待数据读取完毕,
触发Channel管道线的fireChannelReadComplete事件,如果在读数据的过程中,通道关闭,则
触发通道输入关闭事件(fireUserEventTriggered),如果在读数据的过程中,发生异常,
则触发通道fireExceptionCaught事件,如果读任务完毕,且不需自动读,
则从选择key兴趣事件集移除读操作事件



总结:
抽象Nio消息通道AbstractNioMessageChannel,写通道Outbound缓冲区消息,即遍历通道Outbound缓冲区刷新链,当写消息请求为空时,从选择key兴趣集中移除写操作事件,否则,委托doWriteMessage方法,将消息写到底层通道,doWriteMessage方法待子类扩展,写完,将写请求从刷新链上移除,否则,如果需要,添加写事件到选择key的兴趣事件集。

nio消息Unsafe(NioMessageUnsafe)读操作,从通道接收缓冲区读取数据,通知通道处理读取数据,触发Channel管道线的fireChannelRead事件,待数据读取完毕,触发Channel管道线的fireChannelReadComplete事件,如果在读数据的过程中,通道关闭,则触发通道输入关闭事件(fireUserEventTriggered),如果在读数据的过程中,发生异常,则触发通道fireExceptionCaught事件,如果读任务完毕,且不需自动读,则从选择key兴趣事件集移除读操作事件

0
0
分享到:
评论

相关推荐

    netty-all-4.1.29.Final-sources.jar 最新版netty源码

    Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programm ...

    netty-all-4.1.97.Final-20230731.020124-2.jar netty最新jar包,可直接导入使用

    Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming

    netty-netty-4.1.96.Final.tar.gz 官网最新版Netty Project

    Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programm ing

    netty-all-4.1.29.Final-javadoc.jar 压缩包

    Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming ...

    netty-all-4.1.29.Final.jar最新版导入直接用

    Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming ...

    netty in action 中文版

    作为一个NIO client-server框架,Netty 提供了这样的一个间接的解决方法。Netty提供了高层次的抽象来简化TCP和UDP服务器的编程,但是你仍然可以使用底层地 API。

    netty权威指南 pdf

    文摘:13.2.1 Netty服务端创建时序图 下面我们对Netty服务端创建的关键步骤和原理进行讲解。...底层通过门面模式对各种能力进行抽象和封装,尽量不需要用户跟过多的底层API打交道,降低用户的开发难度。

    netty3.10.6稳定版

    它基于Java NIO进行设计,因此提供了高层次的抽象和简化的API,使得开发者可以更加方便地构建网络应用。 Netty在多个领域都有广泛的应用。例如,在大数据领域中,Hadoop的高性能通信和序列化组件Avro的RPC框架默认...

    netty4.0.56稳定版本

    它基于Java NIO进行设计,因此提供了高层次的抽象和简化的API,使得开发者可以更加方便地构建网络应用。 Netty在多个领域都有广泛的应用。例如,在大数据领域中,Hadoop的高性能通信和序列化组件Avro的RPC框架默认...

    Netty In Action中文版

    ”作为一个NIO client-server框架,Netty提供了这样的一个间接的解决方法。Netty提供了高层次的抽象来简化TCP和UDP服务器的编程,但是你仍然可以使用底层地API。(David John Wheeler有一句名言“计算机科学中的任何...

    高级java笔试题-ShiftJava:学到头秃的Java的小笔记

    对象与类:继承、接口、抽象类、内部类、枚举、常用类、常用接口、注解等。 异常:异常体系等。 泛型:用法、类型参数、通配符等。 2. 集合与容器 集合容器类:队列、列表、栈、Map、Set 等结构,含 ArrayList、...

Global site tag (gtag.js) - Google Analytics