`
海浪儿
  • 浏览: 271566 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论
阅读更多

本文为原创,转载请注明出处

netty4源码分析-socket

       服务端启动的第一步必须先创建一个监听套接字ServerSocketChannel,该过程是由ChannelFuture f = b.bind(port)中的bind触发。下面详细分析其过程:

       Bind源码如下,代码位于ServerBootstrap的父类AbstractBootstrap

//AbstractBootstrap
public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
}

public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);
    }

      validate()方法的作用为:校验:bossGroup、BootstrapChannelFactory、childHandler非空。如果childGroup为空,则复用bossGroup,将bossGroup赋值给childGroup。

 

      接着来看doBind的逻辑:

//AbstractBootstrap
private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regPromise = initAndRegister();
        final Channel channel = regPromise.channel();
        final ChannelPromise promise = channel.newPromise();
        if (regPromise.isDone()) {
            doBind0(regPromise, channel, localAddress, promise);
        } else {
            regPromise.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    doBind0(future, channel, localAddress, promise);
                }
            });
        }
        return promise;
    }

     

         重点分析里面的initAndRegister()方法

//AbstractBootstrap
final ChannelFuture initAndRegister() {
        final Channel channel = channelFactory().newChannel();
        try {
            init(channel);
        } catch (Throwable t) {
            channel.unsafe().closeForcibly();
            return channel.newFailedFuture(t);
        }

        ChannelPromise regPromise = channel.newPromise();
        group().register(channel, regPromise);
        if (regPromise.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regPromise;
    }

     

a)首先分析以下代码:

final Channel channel = channelFactory().newChannel()

      channelFactory()方法返回之前创建的BootstrapChannelFactory,里面的newChannel()方法会根据反射创建一个ServerSocketChannel

//BootstrapChannelFactory       
public T newChannel() {
            try {
                return clazz.newInstance();
            } catch (Throwable t) {
                throw new ChannelException("Unable to create Channel from class " + clazz, t);
            }
        }

        注:clazz是在服务端启动的这段代码(b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)中设置的。

        clazz.newInstance()会调用NioServerSocketChannel的默认构造函数

// NioServerSocketChannel
public NioServerSocketChannel() {
        super(null, newSocket(), SelectionKey.OP_ACCEPT);
        config = new DefaultServerSocketChannelConfig(this, javaChannel().socket());
}
private static ServerSocketChannel newSocket() {
        try {
            return ServerSocketChannel.open();
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }

       注意newSocket中的这行代码:

return ServerSocketChannel.open();

       此处就是服务端监听套接字ServerSocketChannel创建的地方。

 

       既然是使用NIO,那么设置创建的ServerSocketChannel为非阻塞是在哪个地方发生的呢?看下这行代码

super(null, newSocket(), SelectionKey.OP_ACCEPT);

       它会对NioServerSocketChannel的父类进行初始化:NioServerSocketChannel的父类是AbstractNioMessageChannel,其构造方法仅仅初始化其父类AbstractNioChannel,父类构造方法如下:

//AbstractNioChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }
            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }

       ch.configureBlocking(false)此处就将之前创建的ServerSocketChannel设置为非阻塞模式。

       

       该方法里还有三点需要注意:

       1、super(parent)会调用AbstractNioChannel的父类AbstractChannel的构造方法

// AbstractChannel.java
protected AbstractChannel(Channel parent) {
        this.parent = parent;
        unsafe = newUnsafe();
        pipeline = new DefaultChannelPipeline(this);
    }

       newUnsafe()是由子类AbstractNioMessageChannel实现的,里面实例化了一个内部类NioMessageUnsafe(注:该类很重要,里面定义了read方法,会触发accept的调用,后面对其重点分析)。

      2、this.readInterestOp = readInterestOp:设置channel的ops为SelectionKey.OP_ACCEPT(值为16)

      3、pipeline = new DefaultChannelPipeline(this),创建作用于ServerSocketChannel的管道Pipeline

// DefaultChannelPipeline
public DefaultChannelPipeline(Channel channel) {
        if (channel == null) {
            throw new NullPointerException("channel");
        }
        this.channel = channel;

        TailHandler tailHandler = new TailHandler();
        tail = new DefaultChannelHandlerContext(this, null, generateName(tailHandler), tailHandler);

        HeadHandler headHandler = new HeadHandler(channel.unsafe());
        head = new DefaultChannelHandlerContext(this, null, generateName(headHandler), headHandler);

        head.next = tail;
        tail.prev = head;
    }

       DefaultChannelPipeline维护了一个以DefaultChannelHandlerContext为元素的双向链表结构,Head是一个Outbound处理器,而tail是一个Inbound处理器。经过此步骤后,管道中的处理器链表为:Head->tail。

 

b)再来分析以下代码

init(channel)

       该方法由子类ServerBootstrap实现

// ServerBootstrap.java
void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options();
        synchronized (options) {
            channel.config().setOptions(options);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        ChannelPipeline p = channel.pipeline();
        if (handler() != null) {
            p.addLast(handler());
        }

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(Channel ch) throws Exception {
                ch.pipeline().addLast(new ServerBootstrapAcceptor(
                        currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            }
        });
    }

       该方法主要做了两件事:

       1、设置NioServerSocketChannel的options和attrs,并存储之后用于SocketChannel的options和attrs。

       2、为NioServerSocketChannel对应的管道增加一个Inbound处理器ChannelInitializer。经过此步骤后,管道中的处理器链表为:head(outbound)->ChannelInitializer(inbound)->tail(inbound)。注意ChannelInitializer的实现方法initChannel,里面会当channelRegistered事件发生时将ServerBootstrapAcceptor加入到管道中。
 
c) 最后分析 以下代码:
group().register(channel, regPromise);

       实际是调用MultithreadEventLoopGroup的register方法

//MultithreadEventLoopGroup
public ChannelFuture register(Channel channel, ChannelPromise promise) {
        return next().register(channel, promise);
    }

    next方法从bossGroup中选择一个EventExecutor(它实际是一个SingleThreadEventLoop),然后执行register方法

//SingleThreadEventLoop
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
        if (channel == null) {
            throw new NullPointerException("channel");
        }
        if (promise == null) {
            throw new NullPointerException("promise");
        }
        channel.unsafe().register(this, promise);
        return promise;
    }

     channel.unsafe().register(this, promise)这里会调用AbstractChannel的内部类AbstractUnsafe的register方法

//AbstractUnsafe
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }
            AbstractChannel.this.eventLoop = eventLoop;
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    closeForcibly();
                    promise.setFailure(t);
                }
            }
        }

        此处开启了eventloop中的线程(即启动了boss线程),并将register0任务加入到boss线程的队列中。经过此步骤后,boss线程的任务队列仅含有一个任务,即register0任务,且正在被执行。

        接着分析register0任务具体干了什么事情

//AbstractUnsafe
private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!ensureOpen(promise)) {
                    return;
                }
                Runnable postRegisterTask = doRegister();
                registered = true;
                promise.setSuccess();
                pipeline.fireChannelRegistered();
                if (postRegisterTask != null) {
                    postRegisterTask.run();
                }
                if (isActive()) {
                    pipeline.fireChannelActive();
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                if (!promise.tryFailure(t)) {
                    logger.warn(
                            "Tried to fail the registration promise, but it is complete already. " +
                                    "Swallowing the cause of the registration failure:", t);
                }
                closeFuture.setClosed();
            }
        }

protected Runnable doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return null;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    throw e;
                }
            }
        }
    }

       看一下doRegister代码,看到这行代码了吧

selectionKey = javaChannel().register(eventLoop().selector, 0, this);

       它会调用java.nio.channels.spi. AbstractSelectableChannel的register方法, 将ServerSocketChannel、0、以及this注册到selector中并得到对应的selectionkey。

       接着再看register0方法中的promise.setSuccess(),将promise设置为success,就会触发异步回调,回调之前main函数所在的线程中为ChannelPromise添加的listner,即AbstractBootstrap的以下代码:

//AbstractBootstrap.java
private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regPromise = initAndRegister();
        final Channel channel = regPromise.channel();
        final ChannelPromise promise = channel.newPromise();
        if (regPromise.isDone()) {
            doBind0(regPromise, channel, localAddress, promise);
        } else {
            regPromise.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    doBind0(future, channel, localAddress, promise);
                }
            });
        }
        return promise;
}

private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.

        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

       经过此步骤后,boss线程的任务队列数量由原来的1个增加到了2个,即正在执行的register0任务以及本次新增的bind任务。Bind任务在下一篇文章中进行分析。

 

       再接着分析register0任务中的此行代码pipeline.fireChannelRegistered()

//DefaultChannelPipeline
public ChannelPipeline fireChannelRegistered() {
        head.fireChannelRegistered();
        return this;
    }

 

//DefaultChannelHandlerContext
public ChannelHandlerContext fireChannelRegistered() {
        final DefaultChannelHandlerContext next = findContextInbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRegistered();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRegistered();
                }
            });
        }
        return this;
}

private void invokeChannelRegistered() {
        try {
            ((ChannelInboundHandler) handler).channelRegistered(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    }

       ChannelRegistered是一个Inbound事件,因此会按照head->tail的顺序执行所有的inbound处理器,目前有三个处理器:head-> ChannelInitializer ->tail->,ChannelInitializer和tail都是inbound处理器,所以看一下ChannelInitializer的invokeChannelRegistered方法.

//ChannelInitializer
public final void channelRegistered(ChannelHandlerContext ctx)
            throws Exception {
        boolean removed = false;
        boolean success = false;
        try {
            initChannel((C) ctx.channel());
            ctx.pipeline().remove(this);
            removed = true;
            ctx.fireChannelRegistered();
            success = true;
        } catch (Throwable t) {
            logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
        } finally {
            if (!removed) {
                ctx.pipeline().remove(this);
            }
            if (!success) {
                ctx.close();
            }
        }
    }
}

        该方法主要做了以下几件事:

        1、initChannel方法是在ServerBootstrap中执行Init方法时,实例化内部类ChannelInitializer实现的

p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(Channel ch) throws Exception {
                ch.pipeline().addLast(new ServerBootstrapAcceptor(
                        currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            }
        });

         其功能就是将ServerBootstrapAcceptor作为一个inbound处理器加入到管道中,此时的处理器链表为: Head->ChannelInitializer->ServerBootstrapAcceptor->tail

       2、 然后ChannelInitializer将自己从管道中删除,此时的处理器链表变为:Head->ServerBootstrapAcceptor->tail

        3、 接着调用ServerBootstrapAcceptor和tail的channelRegistered方法,都没有做啥实质性的事情,最后以tail的空实现结束。

     

      再分析register0任务中的以下代码

//AbstractUnsafe
if (isActive()) 
{
   pipeline.fireChannelActive();
 }

// NioServerSocketChannel
public boolean isActive() {
        return javaChannel().socket().isBound();
    }

        由于对于监听套接字还没有执行bind操作,所以isActive返回false,不会执行pipeline.fireChannelActive()该行代码。执行完此代码后,register0任务就执行完了,boss线程中的任务队列中仅剩下bind任务。

 

 

       总结:initAndRegister()方法主要做了以下几件事情:

   1、 创建服务端监听套接字ServerSocketChannel

        2、 设置监听套接字为非阻塞

        3、 设置channel当前感兴趣的事件为SelectionKey.OP_ACCEPT(值为16)

        4、 创建作用于ServerSocketChannel的管道Pipeline,该管道中此时的处理器链表为:Head(outbound)->tail(inbound)。

        5、 设置NioServerSocketChannel的options和attrs,并存储之后用于SocketChannel的options和attrs

        6、 为NioServerSocketChannel对应的管道增加一个Inbound处理器ChannelInitializer。经过此步骤后,管道中的处理器链表为:head(outbound)->ChannelInitializer(inbound)->tail(inbound)。注意ChannelInitializer的实现方法initChannel,里面会当channelRegisgered事件发生时将ServerBootstrapAcceptor加入到管道中。

        7、 启动了boss线程,并将register0任务加入到boss线程的队列中。而register0做的事情为:将ServerSocketChannel、0、注册到selector中并得到对应的selectionkey。然后触发绑定端口的操作,bind任务加入到boss线程的任务队列中,该内容在下一篇文章中分析。

        8、通过channelRegistered事件,将ServerBootstrapAcceptor加入到管道中,并移除ChannelInitializer,经过此步骤后,管道中的处理器链表为:head(outbound)-> ServerBootstrapAcceptor (inbound)->tail(inbound)

       管道从创建到现在这段时间内,处理器链表的变化历史为:

        head->tail

        head->ChannelInitializer(inbound)->tail

        head->ServerBootstrapAcceptor(inbound)->tail

分享到:
评论
6 楼 zhenxing_zr 2017-01-19  
看了这么多文章,就只有看这个才真正把netty流程弄清楚,多谢大牛
5 楼 pengyue77 2015-01-21  
这个源码分析写得非常好,思路清晰,非常有条理。博主的文字组织能力相当到位,受教了
4 楼 langwolf 2014-08-14  
  思路清晰,分析严谨!   受益了。
3 楼 rentaoshu 2013-12-09  
2 楼 lxzh504 2013-08-14  
1 楼 lxzh504 2013-08-14  
网上关于netty4final版本的源码分析很少,这是我看到的最好的对服务端源码的分析。

相关推荐

    高清Netty5.0架构剖析和源码解读

    NIO客户端13 3.Netty源码分析16 3.1. 服务端创建16 3.1.1. 服务端启动辅助类ServerBootstrap16 3.1.2. NioServerSocketChannel 的注册21 3.1.3. 新的客户端接入25 3.2. 客户端创建28 3.2.1. 客户端连接辅助类...

    java后端源码部署-fpush::antenna_bars:即时消息推送服务(即时通讯),基于Netty+protobuf--InstantMessagingpush

    java后端源码部署 ...源码和技术架构分析详情见 :backhand_index_pointing_right: 运行 eclipse/IDEA里 Step1 右键run as--java application-- FpushServerApp.java Step2 右键run as--java application-- FpushCl

    精通并发与netty视频教程(2018)视频教程

    47_Netty服务器与客户端编码模式回顾及源码分析准备 48_Netty与NIO系统总结及NIO与Netty之间的关联关系分析 49_零拷贝深入剖析及用户空间与内核空间切换方式 50_零拷贝实例深度剖析 51_NIO零拷贝彻底分析与Gather...

    基于javatcpsocket通信的拆包和装包源码-NettyTree:网状树

    6)Netty源码分析 ByteBuf工作原理 Channel, Unsafe ChannelPipline, ChannelHandler EventLoop, EventLoopGroup Future, Promise 7) Netty逻辑架构 8)Netty中的多线程编程 9)Netty与RPC 10)Netty的可靠性 ...

    93个netty高并发教学视频下载.txt

    88_ReplayingDecoder源码分析与特性解读;89_Netty常见且重要编解码器详解;90_TCP粘包与拆包实例演示及分析;91_Netty自定义协议与TCP粘包拆包问题解决之道;92_精通并发与Netty课程总结与展望

    基于javatcpsocket通信的拆包和装包源码-Netty-practice:Netty学习实践

    源码分析 BIO/NIO/AIO基础 阻塞I/O 非阻塞I/O I/O复用 信号驱动的I/O 异步I/O Java I/O模型 同步阻塞IO 1:1同步阻塞IO通信模型 M:N形式的同步阻塞IO通信模型 非阻塞式IO模型(NIO) NIO+单线程Reactor模型 NIO+多线程...

    精通并发与netty 无加密视频

    第47讲:Netty服务器与客户端编码模式回顾及源码分析准备 第48讲:Netty与NIO系统总结及NIO与Netty之间的关联关系分析 第49讲:零拷贝深入剖析及用户空间与内核空间切换方式 第50讲:零拷贝实例深度剖析 第51讲...

    Java视频教程 Java游戏服务器端开发 Netty NIO AIO Mina视频教程

    04、第四课netty线程模型源码分析(一) 05、第五课netty线程模型源码分析(二) 06、第六课netty5案例学习 07、第七课netty学习之心跳 08、第八课protocol buff学习 09.第九课自定义序列化协议之自定义序列化协议 ...

    精通并发与 netty 视频教程(2018)视频教程

    52_NioEventLoopGroup源码分析与线程数设定 53_Netty对Executor的实现机制源码分析 54_Netty服务端初始化过程与反射在其中的应用分析 55_Netty提供的Future与ChannelFuture优势分析与源码讲解 56_Netty服务器地址...

    netty权威指南(第2版)

    学习netty必备资源,作者对于netty由简入深足部剖析,分析netty源码,以及各种netty应用场景,然你深入了解netty高性能通信框架。

    Netty教程:十二个实例带你轻松学习Netty

     每个实例简洁、清爽、实用,重点在“用”上,即培训大家如何熟练的使用Netty解决实际问题,抛弃以往边讲应用边分析源码的培训模式所带来的“高不成低不就”情况,在已经能够熟练使用、并且清楚开发流程的基础上再...

    深入理解Netty线程模型

    从这篇文章中,大家可以学习到如下知识:什么是I/O多路复用Reactor三种线程模型Netty线程模型NioEventLoop源码分析JDKepollbug学习I/O多路复用之前,我们先来了解如下几个概念:阻塞I/O:客户端从socket中读取数据或...

    基于javatcpsocket通信的拆包和装包源码-network_and_protocol:网络协议与Netty

    socket通信的拆包和装包源码 网络编程原理与Netty TCP连接的三次握手与四次挥手 图解分析TCP三次握手协议 为什么要三次握手,不能像http或者UDP一样直接传输 ​ 主要是为了防止已失效的连接请求报文段突然又传到了B,...

    JAVA上百实例源码以及开源项目源代码

    Java 源码包 Applet钢琴模拟程序java源码 2个目标文件,提供基本的音乐编辑功能。编辑音乐软件的朋友,这款实例会对你有所帮助。 Calendar万年历 1个目标文件 EJB 模拟银行ATM流程及操作源代码 6个目标文件,EJB来...

    JAVA上百实例源码以及开源项目

    笔者当初为了学习JAVA,收集了很多经典源码,源码难易程度分为初级、中级、高级等,详情看源码列表,需要的可以直接下载! 这些源码反映了那时那景笔者对未来的盲目,对代码的热情、执着,对IT的憧憬、向往!此时此...

Global site tag (gtag.js) - Google Analytics