`
Donald_Draper
  • 浏览: 950844 次
社区版块
存档分类
最新评论

netty NioServerSocketChannel解析

阅读更多
netty ServerBootStrap解析:http://donald-draper.iteye.com/blog/2392572
netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
netty 通道Outbound缓冲区:http://donald-draper.iteye.com/blog/2393098
netty 抽象通道后续:http://donald-draper.iteye.com/blog/2393166
netty 抽象nio通道:http://donald-draper.iteye.com/blog/2393269
netty 抽象nio字节通道:http://donald-draper.iteye.com/blog/2393323
netty 抽象nio消息通道:http://donald-draper.iteye.com/blog/2393364
引言
上一篇我们看了抽象nio消息通道,先来回顾一下:
抽象Nio消息通道AbstractNioMessageChannel,写通道Outbound缓冲区消息,即遍历通道Outbound缓冲区刷新链,当写消息请求为空时,从选择key兴趣集中移除写操作事件,否则,委托doWriteMessage方法,将消息写到底层通道,doWriteMessage方法待子类扩展,写完,将写请求从刷新链上移除,否则,如果需要,添加写事件到选择key的兴趣事件集。

nio消息Unsafe(NioMessageUnsafe)读操作,从通道接收缓冲区读取数据,通知通道处理读取数据,触发Channel管道线的fireChannelRead事件,待数据读取完毕,触发Channel管道线的fireChannelReadComplete事件,如果在读数据的过程中,通道关闭,则触发通道输入关闭事件(fireUserEventTriggered),如果在读数据的过程中,发生异常,则触发通道fireExceptionCaught事件,如果读任务完毕,且不需自动读,则从选择key兴趣事件集移除读操作事件

今天终于到我们的目的了nio 服务端socket通道,NioServerSocketChannel,
package io.netty.channel.socket.nio;

import io.netty.channel.ChannelException;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.util.internal.SocketUtils;
import io.netty.channel.nio.AbstractNioMessageChannel;
import io.netty.channel.socket.DefaultServerSocketChannelConfig;
import io.netty.channel.socket.ServerSocketChannelConfig;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.List;

/**
 * A {@link io.netty.channel.socket.ServerSocketChannel} implementation which uses
 * NIO selector based implementation to accept new connections.
 */
public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {
    private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);//通道元数据
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();//选择器提供者
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketChannel.class);
    private final ServerSocketChannelConfig config;//通道配置
}

从上面来看,nio服务端socket通道内部有两个变量,一个为选择器提供者,一个为通道配置。
来看构造
/**
 * Create a new instance
 */
public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

/**
 * Create a new instance using the given {@link SelectorProvider}.
 */
public NioServerSocketChannel(SelectorProvider provider) {
    this(newSocket(provider));
}

/**
 * Create a new instance using the given {@link ServerSocketChannel}.
 */
public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    //创建通道配置
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}


来看创建socket通道,
private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        /**
         *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
         *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
         *
         *  See [url=https://github.com/netty/netty/issues/2308]#2308[/url].
	 委托给选择器提供者,打开一个通道
         */
        return provider.openServerSocketChannel();
    } catch (IOException e) {
        throw new ChannelException(
                "Failed to open a server socket.", e);
    }
}

关于通道配置NioServerSocketChannelConfig,我们nio服务端socket通道的内部方法看完,再来看
回头看通道配置,


来看其他方法:

//获取本地socket地址
@Override
public InetSocketAddress localAddress() {
    return (InetSocketAddress) super.localAddress();
}
//获取通道元数据
@Override
public ChannelMetadata metadata() {
    return METADATA;
}
//获取通道配置
@Override
public ServerSocketChannelConfig config() {
    return config;
}
//判断通道,是否激活,主要通过通道关联socket的isBound方法判断
@Override
public boolean isActive() {
    return javaChannel().socket().isBound();
}
//获取关联socket通道
@Override
protected ServerSocketChannel javaChannel() {
    return (ServerSocketChannel) super.javaChannel();
}
//远端地址为空
@Override
public InetSocketAddress remoteAddress() {
    return null;
}
//安全获取本地socket地址
@Override
protected SocketAddress localAddress0() {
    return SocketUtils.localSocketAddress(javaChannel().socket());
}
//关闭通道
@Override
protected void doClose() throws Exception {
    javaChannel().close();
}

//绑定socket地址
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        //如果jdk版本大于1.7 则使用通道bind方法,绑定socket地址
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        //否则使用通道关联Socket的bind方法,绑定socket地址
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

从上面来看,通道实际绑定socket地址,首先判断jdk版本信息,如果jdk版本大于1.7 则使用通道bind方法,绑定socket地址,
否则为通道关联Socket的bind方法。

@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    //接受通道连接,并创建与客户端交互的socket通道
    SocketChannel ch = SocketUtils.accept(javaChannel());

    try {
        if (ch != null) {
	    //将创建的与客户端交互的socket通道,添加到结果集
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }

    return 0;
}
//SocketUtils
//安全接受socket连接
 public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
    try {
        return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
            @Override
            public SocketChannel run() throws IOException {
                return serverSocketChannel.accept();
            }
        });
    } catch (PrivilegedActionException e) {
        throw (IOException) e.getCause();
    }
}


读取的消息委托给谁来处理呢,这要回到SeverBootStrap这篇文章,主要是ServerBootstrapAcceptor。
netty ServerBootStrap解析:http://donald-draper.iteye.com/blog/2392572

下面我们来看简单说一下,从ServerBootStrap初始化通道开始:

下面我们来看初始化通道,这个是重点:
//SeverBootStrap
@Override  
void init(Channel channel) throws Exception {  
    final Map<ChannelOption<?>, Object> options = options0();  
    synchronized (options) {  
        //设置父Server通道选项  
        setChannelOptions(channel, options, logger);  
    }  
  
    final Map<AttributeKey<?>, Object> attrs = attrs0();  
    synchronized (attrs) {  
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {  
            @SuppressWarnings("unchecked")  
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();  
            //设置父Server通道属性  
            channel.attr(key).set(e.getValue());  
        }  
    }  
   //获取Server通道的Channel管道  
    ChannelPipeline p = channel.pipeline();  
    final EventLoopGroup currentChildGroup = childGroup;  
    final ChannelHandler currentChildHandler = childHandler;  
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;  
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;  
  
    synchronized (childOptions) {  
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));  
    }  
  
    synchronized (childAttrs) {  
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));  
    }  
  
    p.addLast(new ChannelInitializer<Channel>() {  
        @Override  
        public void initChannel(final Channel ch) throws Exception {  
            final ChannelPipeline pipeline = ch.pipeline();  
            ChannelHandler handler = config.handler();  
            if (handler != null) {  
            //将通道处理器添加到通道内部的Channel管道内  
                pipeline.addLast(handler);  
            }  
            ch.eventLoop().execute(new Runnable() {  
                @Override  
                public void run() {  
            //将Server引导配置监听器添加到通道内部的Channel管道内 ,这个是重点
                    pipeline.addLast(new ServerBootstrapAcceptor(  
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));  
                }  
            });  
        }  
    });  
}  

我们来看引导配置监听器,实际为一个Inbound通道处理器

  
 private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {  
        private final EventLoopGroup childGroup;//与客户端交互通道注册的事件循环组  
        private final ChannelHandler childHandler;//与客户端交互通道的通道处理器  
        private final Entry<ChannelOption<?>, Object>[] childOptions;//与客户端交互通道的选项配置  
        private final Entry<AttributeKey<?>, Object>[] childAttrs;//与客户端交互通道的属性  
        private final Runnable enableAutoReadTask;  
      
        ServerBootstrapAcceptor(  
                final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,  
                Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {  
            this.childGroup = childGroup;  
            this.childHandler = childHandler;  
            this.childOptions = childOptions;  
            this.childAttrs = childAttrs;  
      
            // Task which is scheduled to re-enable auto-read.  
            // It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may  
            // not be able to load the class because of the file limit it already reached.  
            // 此任务用于开启通道自动读取配置,将会被所在的事件循环调度。  
            // See https://github.com/netty/netty/issues/1328  
            enableAutoReadTask = new Runnable() {  
                @Override  
                public void run() {  
             //开启通道自动读取配置  
                    channel.config().setAutoRead(true);  
                }  
            };  
        }  
       //通道读取操作
        @Override  
        @SuppressWarnings("unchecked")  
        public void channelRead(ChannelHandlerContext ctx, Object msg) {  
            //与客户端交互通道 ,这个就是在nio服务端socket通道中,doReadMessages方法接受客户端连接,
	    //创建的客户端交互socket通道
            final Channel child = (Channel) msg;  
            //配置与客户端交互通道的通道处理器  
            child.pipeline().addLast(childHandler);  
            //配置与客户端交互通道的选项  
            setChannelOptions(child, childOptions, logger);  
            for (Entry<AttributeKey<?>, Object> e: childAttrs) {  
        //配置与客户端交互通道的属性  
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());  
            }  
      
            try {  
        //注册与客户端交互通道到childGroup事件循环组  
                childGroup.register(child).addListener(new ChannelFutureListener() {  
                    @Override  
                    public void operationComplete(ChannelFuture future) throws Exception {  
                        if (!future.isSuccess()) {  
                 //注册失败,则关闭通道  
                            forceClose(child, future.cause());  
                        }  
                    }  
                });  
            } catch (Throwable t) {  
                forceClose(child, t);  
            }  
        }  
         //关闭通道  
        private static void forceClose(Channel child, Throwable t) {  
            child.unsafe().closeForcibly();  
            logger.warn("Failed to register an accepted channel: {}", child, t);  
        }  
      
        @Override  
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {  
            final ChannelConfig config = ctx.channel().config();  
            if (config.isAutoRead()) {  
                // stop accept new connections for 1 second to allow the channel to recover  
         //发生异常,则停止接受连接请求1秒钟,允许通道恢复  
                // See https://github.com/netty/netty/issues/1328  
                config.setAutoRead(false);  
                ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);  
            }  
            // still let the exceptionCaught event flow through the pipeline to give the user  
            // a chance to do something with it  
     //触发异常  
            ctx.fireExceptionCaught(cause);  
        }  
    }  


从上面来看,doReadMessages方法,实际为当接受客户端的连接请求时,创建一个与客户端交互的socket通道,并添加到读操作结果集中,实际为socket通道集。并将socket通道集交给ServerBootStrap的引导配置监听器ServerBootstrapAcceptor处理,Server引导配置监听器实际为一个Inbound通道处理器,每当有客户端连接请求时,则创建一个与客户端交互的通道,将child通道选项及属性配置给通道,并将通道注册到childGroup事件循环组,然后将通道处理器添加到与客户端交互的通道内部的Channel管道中。 客户端连接服务端时,首先向服务端发送连接请求数据,服务端接受到连接请求时,创建一个与客户端交互的socket通道。

再来看其他方法
// Unnecessary stuff
//由于服务端通道用于接受客户端的请求,所有不支持连接,写消息,消息过滤等等操作
@Override
protected boolean doConnect(
        SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    throw new UnsupportedOperationException();
}

@Override
protected void doFinishConnect() throws Exception {
    throw new UnsupportedOperationException();
}

@Override
protected SocketAddress remoteAddress0() {
    return null;
}

@Override
protected void doDisconnect() throws Exception {
    throw new UnsupportedOperationException();
}

@Override
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
    throw new UnsupportedOperationException();
}

@Override
protected final Object filterOutboundMessage(Object msg) throws Exception {
    throw new UnsupportedOperationException();
}

我们再回到Nio服务端通道配置,
//Nio服务端通道配置, 为NioServerSocketChannel的内部类,这个我们单独列一篇文章来说
private final class NioServerSocketChannelConfig extends DefaultServerSocketChannelConfig {
    private NioServerSocketChannelConfig(NioServerSocketChannel channel, ServerSocket javaSocket) {
        super(channel, javaSocket);
    }

    @Override
    protected void autoReadCleared() {
        clearReadPending();
    }
}


总结:

nio服务端socket通道NioServerSocketChannel内部有两个变量,一个为选择器提供者SelectorProvider,一个为通道配置ServerSocketChannelConfig。

通道实际绑定socket地址,首先判断jdk版本信息,如果jdk版本大于1.7 则使用通道bind方法,绑定socket地址,否则为通道关联Socket的bind方法。

doReadMessages方法,实际为当接受客户端的连接请求时,创建一个与客户端交互的socket通道,并添加到读操作结果集中,实际为socket通道集。并将socket通道集交给ServerBootStrap的引导配置监听器ServerBootstrapAcceptor处理,Server引导配置监听器实际为一个Inbound通道处理器,每当有客户端连接请求时,则创建一个与客户端交互的通道,将child通道选项及属性配置给通道,并将通道注册到childGroup事件循环组,然后将通道处理器添加到与客户端交互的通道内部的Channel管道中。 客户端连接服务端时,首先向服务端发送连接请求数据,服务端接受到连接请求时,创建一个与客户端交互的socket通道。

由于服务端通道用于接受客户端的请求,所有不支持连接,写消息,消息过滤等等操作。

0
1
分享到:
评论

相关推荐

    NIO netty开发

    netty开发之nio netty开发之nio netty开发之nio netty开发之nio netty开发之nio netty开发之nio netty开发之nio netty开发之nio netty开发之nio netty开发之nio netty开发之nio netty开发之nio netty开发之nio netty...

    netty nio 技术文档

    netty nio 封装java nio用于网络分布式数据传输。

    Java高并发编程代码(Netty NIO 实例)

    Java高并发编程代码(Netty NIO 实例)

    Netty nio protocolbuf视频课程

    包含了Netty,NIO AIO,Mina知识的详解以及netty结合spring protocolbuf的源码

    基于netty的nio使用demo源码

    基于netty的nio使用demo源码

    netty服务器解析16进制数据

    netty服务器解析16进制数据

    Java视频教程 Java游戏服务器端开发 Netty NIO AIO Mina视频教程

    jaca视频教程 jaca游戏服务器端开发 Netty NIO AIO Mina视频教程 课程目录: 一、Netty快速入门教程 01、第一课NIO 02、第二课netty服务端 03、第三课netty客户端 04、第四课netty线程模型源码分析(一) 05、...

    netty解析报文,解决粘包拆包

    注:下载前请查看本人博客文章,看是否...里面包含模拟TCP客户端发送报文工具,硬件厂商提供的协议,服务端(springboot+netty)解析报文源码,源码里整合了redis,不需要可自行删除,如有需要客户端代码,可联系我。

    netty服务器son解析

    netty服务器son解析

    Java_NIO框架Netty教程

    资源名称:Java_NIO框架Netty教程资源截图: 资源太大,传百度网盘了,链接在附件中,有需要的同学自取。

    netty示例NIO示例

    myeclipse开发通信示例,框架netty,代码本人写的,而且已测试通过,先运行NettyService,再运行NettyClient即可看到效果。nio示例也有,原理一样,运行先后顺序同netty.

    netty源码解析视频

    netty源码解析视频教程,深入浅出netty源码视频

    netty源码解析视频.txt

    mu ke 网 netty源码解析视频,你值得下载

    Netty源码解析.pdf

    netty源码解析PDF,网络编程

    Netty全套PDF

    Netty 5 全套PDF Netty 5用户指南 -Netty_in_Action(第五版-目录修正版)

    jvm、nio、netty优化使用.txt

    Netty是一个NIO客户端服务器框架,可以快速轻松地开发网络应用程序,例如协议服务器和客户端。它极大地简化和简化了TCP和UDP套接字服务器等网络编程。 “快速简便”并不意味着最终的应用程序将遭受可维护性或性能...

    Java Netty 版 Jt808协议解析工程

    Java Netty版完全符合JT808部标文档的开发规范。直接可以下载使用,不信拉倒。

    netty权威文档

    如何全面系统地掌握Netty,进行Netty NIO开发、Netty编解码开发、Netty多协议开发?如何通过对Netty源码的学习获得更深入地知识?掌握了Netty后,如何将其应用到实际架构中?Netty工程师的就业前景和可涉足的行业是怎样...

    java基于netty NIO的简单聊天室的实现

    主要介绍了java基于netty NIO的简单聊天室的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

    t-io是基于aio(nio2)的网络编程框架和netty属于同类

    t-io是基于aio(nio2)的网络编程框架,和netty属于同类,但t-io更注重开发一线工程师的感受,提供了大量和业务相关的API。基于t-io来开发IM、TCP私有协议、RPC、游戏服务器端、推送服务、实时监控、物联网、UDP、...

Global site tag (gtag.js) - Google Analytics