`

Netty5源码分析--1.服务端启动过程详解

 
阅读更多

实例

样例代码来自于io.netty.example.telnet.TelnetServer,完整样例请参考NettyExample工程。

01 public class TelnetServer {
02   
03 private final int port;
04   
05 public TelnetServer(int port) {
06     this.port = port;
07 }
08   
09 public void run() throws Exception {
10     EventLoopGroup bossGroup = new NioEventLoopGroup();//bossGroup线程池用来接受客户端的连接请求
11     EventLoopGroup workerGroup = new NioEventLoopGroup();//workerGroup线程池用来处理boss线程池里面的连接的数据
12     try {
13         ServerBootstrap b = new ServerBootstrap();
14         b.group(bossGroup, workerGroup)
15          .channel(NioServerSocketChannel.class)
16          .childHandler(new TelnetServerInitializer());//ChannelInitializer是一个特殊的handler,用来初始化ChannelPipeline里面的handler链。 这个特殊的ChannelInitializer在加入到pipeline后,在initChannel调用结束后,自身会被remove掉,从而完成初始化的效果(后文会详述)。
17   
18 //AbstractBootstrap.option()用来设置ServerSocket的参数,AbstractBootstrap.childOption()用来设置Socket的参数。
19   
20         b.bind(port).sync().channel().closeFuture().sync();
21     } finally {
22         bossGroup.shutdownGracefully();
23         workerGroup.shutdownGracefully();
24     }
25 }
26   
27 public static void main(String[] args) throws Exception {
28     int port;
29     if (args.length > 0) {
30         port = Integer.parseInt(args[0]);
31     } else {
32         port = 8080;
33     }
34     new TelnetServer(port).run();
35 }
36 }

针对上述代码,还需要补充介绍一些内容:

在调用ctx.write(Object)后需要调用ctx.flush()方法,这样才能将数据发出去。或者直接调用 ctx.writeAndFlush(msg)方法。

通常使用这种方式来实例化ByteBuf:final ByteBuf time = ctx.alloc().buffer(4); ,而不是直接使用ByteBuf子类的构造方法

另外,还需要在处理基于流的传输协议TCP/IP的数据时,注意报文和业务程序实际能够接收到的数据之间的关系。 假如你发送了2个报文,底层是发送了两组字节。但是操作系统的TCP栈是有缓存的,它可能把这两组字节合并成一组字节,然后再给业务程序使用。但是业务程序往往需要根据把这一组字节还原成原来的两组字节,但是不幸的是,业务程序往往无法直接还原,除非在报文上做了些特殊的约定。比如报文是定长的或者有明确的分隔符。

服务端启动服务

当`TelnetServer启动时,依次完成如下步骤:

NioEventLoopGroup初始化

NioEventLoopGroup构造方法被调用时,首先初始化父类MultithreadEventLoopGroup,触发父类获得默认的线程数,其值默认是Runtime.getRuntime().availableProcessors() * 2

1 static {
2     DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
3             "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
4   
5     if (logger.isDebugEnabled()) {
6         logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
7     }
8 }

接着调用NioEventLoopGroup自身的构造器,依次执行下面的构造器。

01 public NioEventLoopGroup() {
02     this(0);
03 }
04   
05 public NioEventLoopGroup(int nThreads, Executor executor) {
06     this(nThreads, executor, SelectorProvider.provider());
07 }
08   
09 public NioEventLoopGroup(int nThreads) {
10     this(nThreads, (Executor) null);
11 }
12   
13 public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
14     this(nThreads, threadFactory, SelectorProvider.provider());
15 }
16   
17 public NioEventLoopGroup(
18         int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
19     super(nThreads, threadFactory, selectorProvider);
20 }

继续调用父类MultithreadEventLoopGroup的构造器,该构造器又调用了父类构造器。

1 protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args)    {
2     super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
3 }

下面的构造方法主要完成以下几件事情:

  1. 设置默认DefaultThreadFactory线程工厂,主要做了2件事,设置线程池名称和线程名称

  2. 初始化children数组,然后通过调用NioEventLoopGroup.newChild方法完成child属性设置。

    01 protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    02 if (nThreads <= 0) {
    03     throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    04 }
    05   
    06 if (executor == null) {
    07     executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    08 }
    09   
    10 children = new EventExecutor[nThreads];
    11 for (int i = 0; i < nThreads; i ++) {
    12     boolean success = false;
    13     try {
    14         children[i] = newChild(executor, args);//tag
    15         success = true;
    16     } catch (Exception e) {
    17         // TODO: Think about if this is a good exception type
    18         throw new IllegalStateException("failed to create a child event loop", e);
    19     } finally {
    20         if (!success) {
    21             for (int j = 0; j < i; j ++) {
    22                 children[j].shutdownGracefully();
    23             }
    24   
    25             for (int j = 0; j < i; j ++) {
    26                 EventExecutor e = children[j];
    27                 try {
    28                     while (!e.isTerminated()) {
    29                         e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
    30                     }
    31                 } catch (InterruptedException interrupted) {
    32                     Thread.currentThread().interrupt();
    33                     break;
    34                 }
    35             }
    36         }
    37     }
    38 }

    在newChild方法中,主要完成构建NioEventLoop实例

    1 @Override
    2 protected EventLoop newChild(Executor executor, Object... args) throws  Exception {
    3 return new NioEventLoop(this, executor, (SelectorProvider) args[0]);
    4 }

    下面的super(parent, executor, false);主要是设置NioEventLoopGroup是NioEventLoop的parent。然后调用openSelector()创建Selector对象。

    1 NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider) {
    2 super(parent, executor, false);
    3 if (selectorProvider == null) {
    4     throw new NullPointerException("selectorProvider");
    5 }
    6 provider = selectorProvider;
    7 selector = openSelector();
    8 }

首先,先初始化Selector对象,然后再初始化SelectedSelectionKeySet,设置其属性keysA = new SelectionKey[1024]; keysB = keysA.clone();。进行了一个优化,设置了sun.nio.ch.SelectorImplselectedKeyspublicSelectedKeys属性。根据NioEventLoop.run()方法内部直接调用 processSelectedKeysOptimized(selectedKeys.flip());并且没有直接使用selector.selectedKeys()这两处代码,笔者猜测正是因为在此时通过反射设置了属性,所以NioEventLoop.run()才能正常工作。

01 private Selector NioEventLoop.openSelector() {
02     final Selector selector;
03     try {
04         selector = provider.openSelector();
05     } catch (IOException e) {
06         throw new ChannelException("failed to open a new selector", e);
07     }
08   
09     if (DISABLE_KEYSET_OPTIMIZATION) {
10         return selector;
11     }
12   
13     try {
14         SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
15   
16         Class<?> selectorImplClass =
17                 Class.forName("sun.nio.ch.SelectorImpl", false, ClassLoader.getSystemClassLoader());
18   
19         // Ensure the current selector implementation is what we can instrument.
20         if (!selectorImplClass.isAssignableFrom(selector.getClass())) {
21             return selector;
22         }
23   
24         Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
25         Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
26   
27         selectedKeysField.setAccessible(true);
28         publicSelectedKeysField.setAccessible(true);
29   
30         selectedKeysField.set(selector, selectedKeySet);
31         publicSelectedKeysField.set(selector, selectedKeySet);
32   
33         selectedKeys = selectedKeySet;
34         logger.trace("Instrumented an optimized java.util.Set into: {}", selector);
35     } catch (Throwable t) {
36         selectedKeys = null;
37         logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, t);
38     }
39   
40     return selector;
41 }

最后循环完成children数组的初始化children[i] = newChild(executor, args);,进而完成NioEventLoopGroup对象初始化。

小结

此时再结合Eclipse的DEBUG视图,观察bossGroup的属性,可以基本看到完成如下几个事情

  • 创建NioEventLoopGroup对象
  • 获得默认线程池数目大小,数值为N
  • 设置线程池名称和线程名称
  • 循环创建出来 N个NioEventLoop对象,每个NioEventLoop都设置了相同的parent,executor和不同的selector实例。

ServerBootstrap 初始化

1 ServerBootstrap b = new ServerBootstrap();

上面这段代码内涵平平,主要设置group属性是bossGroup,childGroup属性是workerGroup。
没啥其他复杂属性赋值。主要值得一提的就是channel方法的设计,通过传递class对象,然后通过反射来实例化具体的Channel实例。

b <br />.bind(port) <br />.sync() <br />.channel() <br />.closeFuture() <br />.sync();,这个方法的内容很多,详见下述分析。

b.bind(port)方法会调用下面的doBind方法,在doBind方法中会完成Channel的初始化和绑定端口。有2个方法需要tag,分别是 tag1 和 tag2

01 private ChannelFuture doBind(final SocketAddress localAddress) {
02     final ChannelFuture regFuture = initAndRegister();//tag1
03     final Channel channel = regFuture.channel();
04     if (regFuture.cause() != null) {
05         return regFuture;
06     }
07   
08     final ChannelPromise promise;
09     if (regFuture.isDone()) {
10         promise = channel.newPromise();
11         doBind0(regFuture, channel, localAddress, promise);//tag2
12     } else {
13         // Registration future is almost always fulfilled already, but just in case it's not.
14         promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE);
15         regFuture.addListener(new ChannelFutureListener() {
16             @Override
17             public void operationComplete(ChannelFuture future) throws Exception {
18                 doBind0(regFuture, channel, localAddress, promise);
19             }
20         });
21     }
22   
23     return promise;
24 }

tag1 initAndRegister,里面完成Channel实例创建,实例化和注册channel到selector上。

01 final ChannelFuture initAndRegister() {
02     Channel channel;
03     try {
04         channel = createChannel();//tag1.1
05     } catch (Throwable t) {
06         return VoidChannel.INSTANCE.newFailedFuture(t);
07     }
08   
09     try {
10         init(channel);//tag1.2
11     } catch (Throwable t) {
12         channel.unsafe().closeForcibly();
13         return channel.newFailedFuture(t);
14     }
15   
16     ChannelPromise regFuture = channel.newPromise();
17     channel.unsafe().register(regFuture);//tag1.3
18     if (regFuture.cause() != null) {
19         if (channel.isRegistered()) {
20             channel.close();
21         } else {
22             channel.unsafe().closeForcibly();
23         }
24     }

tag1.1,调用ServerBootstrap.createChannel() ,通过反射完成Channel实例创建。这里使用了childGroup这个属性,即workGroup线程池。

1 @Override
2 Channel createChannel() {
3     EventLoop eventLoop = group().next();
4     return channelFactory().newChannel(eventLoop, childGroup);//tag1.1.1
5   
6 }

tag1.1.1,此时将断点打到NioServerSocketChannel的构造方法上

1 public NioServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
2     super(null, eventLoop, childGroup, newSocket(), SelectionKey.OP_ACCEPT);//tag1.1.1.1
3     config = new DefaultServerSocketChannelConfig(this, javaChannel().socket());//tag1.1.1.2
4   
5 }

tag1.1.1.1,这段代码主要完成3件事。

第一个是在NioServerSocketChannel.newSocket()调用了ServerSocketChannel.open(),完成了javaChannel的创建

1 private static ServerSocketChannel newSocket() {
2     try {
3         return ServerSocketChannel.open();
4     } catch (IOException e) {
5         throw new ChannelException(
6                 "Failed to open a server socket.", e);
7     }
8 }

第二个是在AbstractNioChannel的构造方法中调用了ch.configureBlocking(false)方法

01 protected AbstractNioChannel(Channel parent, EventLoop eventLoop, SelectableChannel ch, int readInterestOp) {
02     super(parent, eventLoop);//tag1.1.1.1.1
03     this.ch = ch;
04     this.readInterestOp = readInterestOp;
05     try {
06         ch.configureBlocking(false);
07     } catch (IOException e) {
08         try {
09             ch.close();
10         } catch (IOException e2) {
11             if (logger.isWarnEnabled()) {
12                 logger.warn(
13                         "Failed to close a partially initialized socket.", e2);
14             }
15         }
16   
17         throw new ChannelException("Failed to enter non-blocking mode.", e);
18     }
19 }

tag1.1.1.1.1中,在AbstractChannel(Channel parent, EventLoop eventLoop)中,进行了两个重要操作:unsafe = newUnsafe();pipeline = new DefaultChannelPipeline(this);

1 protected AbstractChannel(Channel parent, EventLoop eventLoop) {
2     this.parent = parent;
3     this.eventLoop = validate(eventLoop);
4     unsafe = newUnsafe();
5     pipeline = new DefaultChannelPipeline(this);//tag1.1.1.1.1.1
6 }

tag1.1.1.1.1.1,设置了HeadHandler和TailHandler。这两个类也比较重要。

public DefaultChannelPipeline(AbstractChannel channel) {

01 if (channel == null) {
02         throw new NullPointerException("channel");
03     }
04     this.channel = channel;
05   
06     TailHandler tailHandler = new TailHandler();
07     tail = new DefaultChannelHandlerContext(this, null, generateName(tailHandler), tailHandler);//tag1.1.1.1.1.1.1
08   
09     HeadHandler headHandler = new HeadHandler(channel.unsafe());
10     head = new DefaultChannelHandlerContext(this, null, generateName(headHandler), headHandler);
11   
12     head.next = tail;
13     tail.prev = head;
14 }

tag1.1.1.1.1.1.1,这个方法完成了DefaultChannelHandlerContext的对象的初始化。这个类也是核心类,先暂时把它当成个黑盒,会在后面重点分析。

此时,我们方法调用栈结束,然后回到 tag1.1.1.2 这段代码上来。 在DefaultServerSocketChannelConfig中构造方法中完成了channel的参数设置

至此,才完成tag1.1 AbstractBootstrap.createChannel()方法的执行。现在又开始 tag1.2的代码片段。该 AbstractBootstrap.init(Channel channel) 方法里面主要涉及到Parent Channel 和 Child Channel的option和attribute 设置,并将客户端设置的参数覆盖到默认参数中;最后,还将childHandler(new TelnetServerInitializer())中设置的handler加入到pipeline()中。代码见下。

void init(Channel channel) throws Exception {

01 final Map<ChannelOption<?>, Object> options = options();
02     synchronized (options) {
03         channel.config().setOptions(options);
04     }
05   
06     final Map<AttributeKey<?>, Object> attrs = attrs();
07     synchronized (attrs) {
08         for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
09             @SuppressWarnings("unchecked")
10             AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
11             channel.attr(key).set(e.getValue());
12         }
13     }
14   
15     ChannelPipeline p = channel.pipeline();
16     if (handler() != null) {
17         p.addLast(handler());
18     }
19   
20     final ChannelHandler currentChildHandler = childHandler;
21     final Entry<ChannelOption<?>, Object>[] currentChildOptions;
22     final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
23     synchronized (childOptions) {
24         currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
25     }
26     synchronized (childAttrs) {
27         currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
28     }
29   
30     p.addLast(new ChannelInitializer<Channel>() {//tag1.2.1
31         @Override
32         public void initChannel(Channel ch) throws Exception {
33             ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildHandler, currentChildOptions,
34                     currentChildAttrs));
35         }
36     });
37 }

tag1.2.1中,此时pipeline中又多了一个handler:内部类ServerBootstrap$1,此时数组的链表情况如下:HeadHandler,ServerBootstrap$1和TailHandler。另外,再额外吐槽一句,p.addLast方法并不是把ServerBootstrap$1放到tail上,而是放到tail的前一个节点上。所以,这个addLast方法命名很是误解。

至此完成tag1.2执行,开始执行tag1.3 channel.unsafe().register(regFuture);这段代码。该方法内部接着执行执行tag1.3.1的代码。

01 public final void register(final ChannelPromise promise) {
02         if (eventLoop.inEventLoop()) {
03             register0(promise);
04         } else {
05             try {
06                 eventLoop.execute(new Runnable() {
07                     @Override
08                     public void run() {//tag1.3.1
09                         register0(promise);
10                     }
11                 });
12             } catch (Throwable t) {
13                 logger.warn(
14                         "Force-closing a channel whose registration task was not accepted by an event loop: {}",
15                         AbstractChannel.this, t);
16                 closeForcibly();
17                 closeFuture.setClosed();
18                 promise.setFailure(t);
19             }
20         }
21     }

tag1.3.1,该片段主要执行doRegister();pipeline.fireChannelRegistered();//tag1.3.1.2

01 private void register0(ChannelPromise promise) {
02         try {
03             // check if the channel is still open as it could be closed in the mean time when the register
04             // call was outside of the eventLoop
05             if (!ensureOpen(promise)) {
06                 return;
07             }
08             doRegister();//tag1.3.1.1
09             registered = true;
10             promise.setSuccess();
11             pipeline.fireChannelRegistered();//tag1.3.1.2
12             if (isActive()) {
13                 pipeline.fireChannelActive();
14             }
15         } catch (Throwable t) {
16             // Close the channel directly to avoid FD leak.
17             closeForcibly();
18             closeFuture.setClosed();
19             if (!promise.tryFailure(t)) {
20                 logger.warn(
21                         "Tried to fail the registration promise, but it is complete already. " +
22                                 "Swallowing the cause of the registration failure:", t);
23             }
24         }
25     }

tag1.3.1.1 将代码片段将javachannel注册到selector上,并把selectionKey属性赋值

01 protected void AbstractNioChannel.doRegister() throws Exception {
02     boolean selected = false;
03     for (;;) {
04         try {
05             selectionKey = javaChannel().register(eventLoop().selector, 0, this);
06             return;
07         } catch (CancelledKeyException e) {
08             if (!selected) {
09                 // Force the Selector to select now as the "canceled" SelectionKey may still be
10                 // cached and not removed because no Select.select(..) operation was called yet.
11                 eventLoop().selectNow();
12                 selected = true;
13             } else {
14                 // We forced a select operation on the selector before but the SelectionKey is still cached
15                 // for whatever reason. JDK bug ?
16                 throw e;
17             }
18         }
19     }
20 }

tag1.3.1.2,这个方法里面有一堆事情要讲。先暂且放下,在后文讲到ChannelPipeline时会再次回来看这段代码。

public ChannelPipeline DefaultChannelPipeline.fireChannelRegistered() {

1 head.fireChannelRegistered();
2     return this;
3 }

此时终于完成 tag1 代码片段执行,开始执行 tag2 的代码片段。

private static void doBind0(

01 final ChannelFuture regFuture, final Channel channel,
02         final SocketAddress localAddress, final ChannelPromise promise) {
03   
04     // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
05     // the pipeline in its channelRegistered() implementation.
06     channel.eventLoop().execute(new Runnable() {
07         @Override
08         public void run() {
09             if (regFuture.isSuccess()) {
10                 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);//tag2.1
11             } else {
12                 promise.setFailure(regFuture.cause());
13             }
14         }
15     });
16 }

tag2.1,该方法内部有调用了pipeline的方法了(在tag1.3.1.2 中也出现了pipeline调用)。 好吧,是时候介绍pipeline了。

public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {

1 return pipeline.bind(localAddress, promise);//tag2.1.1
2 }

ChannelPipeline

DefaultChannelPipelineChannelPipeline的实现类,DefaultChannelPipeline内部维护了两个指针:final DefaultChannelHandlerContext head; final DefaultChannelHandlerContext tail;,分别指向链表的头部和尾部;而DefaultChannelHandlerContext内部是一个链表结构:volatile DefaultChannelHandlerContext next;volatile DefaultChannelHandlerContext prev;,而每个DefaultChannelHandlerContextChannelHandler实例一一对应。

从上面可以看到,这是个经典的Intercepting Filter模式实现。下面我们再接着从tag1.3.1.2代码看起,pipeline.fireChannelRegistered();依次执行如下两个方法。上文也已经说明,此时handler链是HeadHandler,ServerBootstrap$1和TailHandler。

01 @Override
02 public ChannelPipeline DefaultChannelPipeline.fireChannelRegistered() {
03     head.fireChannelRegistered();
04     return this;
05 }
06   
07 public ChannelHandlerContext ChannelHandlerContext.fireChannelRegistered() {
08     DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_REGISTERED); //tag 1.3.1.2.1
09     next.invoker.invokeChannelRegistered(next); //tag1.3.1.2.2
10   
11     return this;
12 }
13   
14 private DefaultChannelHandlerContext DefaultChannelHandlerContext.findContextInbound(int mask) {
15     DefaultChannelHandlerContext ctx = this;
16     do {
17         ctx = ctx.next;
18     } while ((ctx.skipFlags & mask) != 0);
19     return ctx;
20 }

tag 1.3.1.2.1,针对这个findContextInbound方法需要再补充下,里面ServerBootstrap$1是继承自ChannelInitializer,而ChannelInitializer.channelRegistered是没有@Skip注解的。呃,@Skip注解又有何用。这个要结合DefaultChannelHandlerContext.skipFlags0(Class<? extends ChannelHandler> handlerType)。这个skipFlags0方法返回一个整数,如果该方法上标记了@Skip注解,那么表示该方法在Handler被执行时,需要被忽略。所以,此时do {ctx = ctx.next;} while ((ctx.skipFlags & mask) != 0);片段的执行结果返回的是ServerBootstrap$1这个Handler。

这里在额外说一句,这个ChannelHandlerAdapter里面的方法几乎都被加了@Skip标签。

01 private static int skipFlags0(Class<? extends ChannelHandler> handlerType) {
02     int flags = 0;
03     try {
04         if (handlerType.getMethod(
05                 "handlerAdded", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
06             flags |= MASK_HANDLER_ADDED;
07         }
08         if (handlerType.getMethod(
09                 "handlerRemoved", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
10             flags |= MASK_HANDLER_REMOVED;
11         }
12         if (handlerType.getMethod(
13                 "exceptionCaught", ChannelHandlerContext.class, Throwable.class).isAnnotationPresent(Skip.class)) {
14             flags |= MASK_EXCEPTION_CAUGHT;
15         }
16         if (handlerType.getMethod(
17                 "channelRegistered", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
18             flags |= MASK_CHANNEL_REGISTERED;
19         }
20         if (handlerType.getMethod(
21                 "channelActive", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
22             flags |= MASK_CHANNEL_ACTIVE;
23         }
24         if (handlerType.getMethod(
25                 "channelInactive", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
26             flags |= MASK_CHANNEL_INACTIVE;
27         }
28         if (handlerType.getMethod(
29                 "channelRead", ChannelHandlerContext.class, Object.class).isAnnotationPresent(Skip.class)) {
30             flags |= MASK_CHANNEL_READ;
31         }
32         if (handlerType.getMethod(
33                 "channelReadComplete", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
34             flags |= MASK_CHANNEL_READ_COMPLETE;
35         }
36         if (handlerType.getMethod(
37                 "channelWritabilityChanged", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
38             flags |= MASK_CHANNEL_WRITABILITY_CHANGED;
39         }
40         if (handlerType.getMethod(
41                 "userEventTriggered", ChannelHandlerContext.class, Object.class).isAnnotationPresent(Skip.class)) {
42             flags |= MASK_USER_EVENT_TRIGGERED;
43         }
44         if (handlerType.getMethod(
45                 "bind", ChannelHandlerContext.class,
46                 SocketAddress.class, ChannelPromise.class).isAnnotationPresent(Skip.class)) {
47             flags |= MASK_BIND;
48         }
49         if (handlerType.getMethod(
50                 "connect", ChannelHandlerContext.class, SocketAddress.class, SocketAddress.class,
51                 ChannelPromise.class).isAnnotationPresent(Skip.class)) {
52             flags |= MASK_CONNECT;
53         }
54         if (handlerType.getMethod(
55                 "disconnect", ChannelHandlerContext.class, ChannelPromise.class).isAnnotationPresent(Skip.class)) {
56             flags |= MASK_DISCONNECT;
57         }
58         if (handlerType.getMethod(
59                 "close", ChannelHandlerContext.class, ChannelPromise.class).isAnnotationPresent(Skip.class)) {
60             flags |= MASK_CLOSE;
61         }
62         if (handlerType.getMethod(
63                 "read", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
64             flags |= MASK_READ;
65         }
66         if (handlerType.getMethod(
67                 "write", ChannelHandlerContext.class,
68                 Object.class, ChannelPromise.class).isAnnotationPresent(Skip.class)) {
69             flags |= MASK_WRITE;
70   
71             // flush() is skipped only when write() is also skipped to avoid the situation where
72             // flush() is handled by the event loop before write() in staged execution.
73             if (handlerType.getMethod(
74                     "flush", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
75                 flags |= MASK_FLUSH;
76             }
77         }
78     } catch (Exception e) {
79         // Should never reach here.
80         PlatformDependent.throwException(e);
81     }
82   
83     return flags;
84 }

此时,tag1.3.1.2.1 代码片段执行完毕,现在开始tag1.3.1.2.2 执行。

01 @Override
02 public void DefaultChannelHandlerInvoker.invokeChannelRegistered(final ChannelHandlerContext ctx) {
03     if (executor.inEventLoop()) {
04         invokeChannelRegisteredNow(ctx);
05     } else {
06         executor.execute(new Runnable() {
07             @Override
08             public void run() {
09                 invokeChannelRegisteredNow(ctx);
10             }
11         });
12     }
13 }
14   
15 public static void invokeChannelRegisteredNow(ChannelHandlerContext ctx) {
16     try {
17         ctx.handler().channelRegistered(ctx);
18     } catch (Throwable t) {
19         notifyHandlerException(ctx, t);
20     }
21 }
22   
23 由于ServerBootstrap$1(ChannelInitializer<C>)这个类继承了ChannelInitializer,所以会执行了ChannelInitializer.channelRegistered这个方法。
24   
25 @Override
26 @SuppressWarnings("unchecked")
27 public final void ChannelInitializer.channelRegistered(ChannelHandlerContext ctx) throws Exception {
28     ChannelPipeline pipeline = ctx.pipeline();
29     boolean success = false;
30     try {
31         initChannel((C) ctx.channel());//tag1.3.1.2.2.1
32         pipeline.remove(this);//tag1.3.1.2.2.2
33         ctx.fireChannelRegistered();//tag1.3.1.2.2.3
34         success = true;
35     } catch (Throwable t) {
36         logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
37     } finally {
38         if (pipeline.context(this) != null) {
39             pipeline.remove(this);
40         }
41         if (!success) {
42             ctx.close();
43         }
44     }
45 }

在tag1.3.1.2.2.1里,又回调了下面的initChannel方法。该方法把ServerBootstrapAcceptor这个Handler加入到Pipeline中;此时handler链情况如下:HeadHandler,ServerBootstrap$1,ServerBootstrap$ServerBootstrapAcceptor和TailHandler

1 p.addLast(new ChannelInitializer<Channel>() {
2         @Override
3         public void initChannel(Channel ch) throws Exception {
4             ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildHandler, currentChildOptions,
5                     currentChildAttrs));
6         }
7     });

在 tag1.3.1.2.2.2里,通过执行pipeline.remove(this);又把ServerBootstrap$1这个Handler给删除了,从而完成初始化的效果。需要提醒的是,ServerBootstrapAcceptor的currentChildHandler属性包含了在客户端代码注册的TelnetServerInitializer类。

在tag1.3.1.2.2.3里,通过执行ctx.fireChannelRegistered();又找到了下一个handler,

public ChannelHandlerContext DefaultChannelHandlerContext.fireChannelRegistered() {

1 DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_REGISTERED);
2     next.invoker.invokeChannelRegistered(next);
3     return this;
4 }

这段逻辑和上述基本一样, findContextInbound内部执行时,会跳过ServerBootstrapAcceptor这个handler,最终找到找到tailHandler,并执行channelRegistered()这个方法。就这样,最终完成了整个 pipeline.fireChannelRegistered();执行。

static final class TailHandler extends ChannelHandlerAdapter {

1 @Override
2 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {}
3   
4 //省略下面的方法

}

下面我们再趁热打铁,回头看看 tag2.1代码的执行逻辑。

01 public ChannelFuture AbstractChannel.bind(SocketAddress localAddress, ChannelPromise promise)  {
02     return pipeline.bind(localAddress, promise);//tag2.1.1
03
04   
05   
06  @Override
07 public ChannelFuture DefaultChannelPipeline.bind(SocketAddress localAddress, ChannelPromise promise) {
08     return pipeline.bind(localAddress, promise);
09 }
10   
11  @Override
12 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
13     return tail.bind(localAddress, promise); //tag2.1.1.1
14 }

tag2.1.1.1,执行到这里,发现是tail.bind,而不是head.bind。

1 @Override
2 public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
3     DefaultChannelHandlerContext next = findContextOutbound(MASK_BIND);
4     next.invoker.invokeBind(next, localAddress, promise);
5     return promise;
6 }

@Override

01 public void DefaultChannelHandlerInvokerinvokeBind(
02         final ChannelHandlerContext ctx, final SocketAddress localAddress, final ChannelPromise promise) {
03     if (localAddress == null) {
04         throw new NullPointerException("localAddress");
05     }
06     validatePromise(ctx, promise, false);
07   
08     if (executor.inEventLoop()) {
09         invokeBindNow(ctx, localAddress, promise);
10     } else {
11         safeExecuteOutbound(new Runnable() {
12             @Override
13             public void run() {
14                 invokeBindNow(ctx, localAddress, promise);
15             }
16         }, promise);
17     }
18 }
19   
20   
21  public static void ChannelHandlerInvokerUtil.invokeBindNow(
22         final ChannelHandlerContext ctx, final SocketAddress localAddress, final ChannelPromise promise) {
23     try {
24         ctx.handler().bind(ctx, localAddress, promise);
25     } catch (Throwable t) {
26         notifyOutboundHandlerException(t, promise);
27     }
28 }
29   
30 @Override
31 public void DefaultChannelPipeline.HeadHandler.bind(
32             ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
33             throws Exception {
34         unsafe.bind(localAddress, promise);
35     }
36   
37  @Override
38  public final void AbstractChannel.AbstractUnsafe.bind(final SocketAddress localAddress, final ChannelPromise promise) {
39         if (!ensureOpen(promise)) {
40             return;
41         }
42   
43         // See: https://github.com/netty/netty/issues/576
44         if (!PlatformDependent.isWindows() && !PlatformDependent.isRoot() &&
45             Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
46             localAddress instanceof InetSocketAddress &&
47             !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress()) {
48             // Warn a user about the fact that a non-root user can't receive a
49             // broadcast packet on *nix if the socket is bound on non-wildcard address.
50             logger.warn(
51                     "A non-root user can't receive a broadcast packet if the socket " +
52                     "is not bound to a wildcard address; binding to a non-wildcard " +
53                     "address (" + localAddress + ") anyway as requested.");
54         }
55   
56         boolean wasActive = isActive();
57         try {
58             doBind(localAddress);//tag2.1.1.1.1
59          } catch (Throwable t) {
60             promise.setFailure(t);
61             closeIfClosed();
62             return;
63         }
64         if (!wasActive && isActive()) {
65             invokeLater(new Runnable() {//tag2.1.1.1.2
66                 @Override
67                 public void run() {
68                     pipeline.fireChannelActive();//tag2.1.1.1.3
69                 }
70             });
71         }
72         promise.setSuccess();//tag2.1.1.1.4
73     }

在tag2.1.1.1.1里,执行真正的bind端口。

01 protected void doBind(SocketAddress localAddress) throws Exception {
02     javaChannel().socket().bind(localAddress, config.getBacklog());
03 }    
04   
05 在tag2.1.1.1.2里,执行如下方法,`eventLoop().execute(task); `在后续分析。现在暂时忽略。    
06 private void invokeLater(Runnable task) {
07         // This method is used by outbound operation implementations to trigger an inbound event later.
08         // They do not trigger an inbound event immediately because an outbound operation might have been
09         // triggered by another inbound event handler method.  If fired immediately, the call stack
10         // will look like this for example:
11         //
12         //   handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
13         //   -> handlerA.ctx.close()
14         //      -> channel.unsafe.close()
15         //         -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
16         //
17         // which means the execution of two inbound handler methods of the same handler overlap undesirably.
18         eventLoop().execute(task);
19     }

这里需要说一下,虽然先执行了invokeLater该方法,但是仅仅是把给task加入到队列中,然后等 tag2.1.1.1.4 方法执行后,在下一个循环中再继续执行。

01 @Override
02 public ChannelPromise DefaultChannelPromise.setSuccess() {
03     return setSuccess(null);
04 }
05   
06 @Override
07 public ChannelPromise setSuccess(Void result) {
08     super.setSuccess(result);
09     return this;
10 }
11   
12 @Override
13 public Promise<V> setSuccess(V result) {
14     if (setSuccess0(result)) {// tag2.1.1.1.4.1
15         notifyListeners();// tag2.1.1.1.4.2
16         return this;
17     }
18     throw new IllegalStateException("complete already: " + this);
19 }
20   
21  private boolean setSuccess0(V result) {
22     if (isDone()) {
23         return false;
24     }
25   
26     synchronized (this) {
27         // Allow only once.
28         if (isDone()) {
29             return false;
30         }
31         if (result == null) {
32             this.result = SUCCESS;// tag2.1.1.1.4.1.1
33         } else {
34             this.result = result;
35         }
36         if (hasWaiters()) {
37             notifyAll();
38         }
39     }
40     return true;
41 }

在 tag2.1.1.1.4.1.1 设置了成功状态,然后该方法返回,继续执行了tag2.1.1.1.4.2方法。由于listeners为 null,所以直接返回。

1 private void notifyListeners() {
2   
3     Object listeners = this.listeners;
4     if (listeners == null) {
5         return;
6     }
7     // 省略XXXXXX
8 }

此时,程序完成了tag2.1 代码执行,开始继续循环。此时执行 tag2.1.1.1.3里的代码,即执行pipeline.fireChannelActive();方法。

1 public ChannelPipeline fireChannelActive() {
2     head.fireChannelActive();//tag2.1.1.1.3.1
3   
4     if (channel.config().isAutoRead()) {
5         channel.read();//tag2.1.1.1.3.2
6     }
7   
8     return this;
9 }

在tag2.1.1.1.3.1里,和上述逻辑一样,最终执行到TailHandler这里。

static final class TailHandler extends ChannelHandlerAdapter {

1 @Override
2     public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }
3   
4     @Override
5     public void channelActive(ChannelHandlerContext ctx) throws Exception { }
6   
7     //下省略方法

}

在tag2.1.1.1.3.2里,由于channel.config().isAutoRead()默认返回true;

01 @Override
02 public ChannelPipeline read() {
03     tail.read();
04     return this;
05 }
06   
07  @Override
08     public void DefaultChannelPipeline.HeadHandler.read(ChannelHandlerContext ctx) {
09         unsafe.beginRead();
10     }
11   
12   
13  @Override
14  public void AbstractChannel.AbstractUnsafe.beginRead() {
15         if (!isActive()) {
16             return;
17         }
18   
19         try {
20             doBeginRead();
21         } catch (final Exception e) {
22             invokeLater(new Runnable() {
23                 @Override
24                 public void run() {
25                     pipeline.fireExceptionCaught(e);
26                 }
27             });
28             close(voidPromise());
29         }
30     }

此属性 readInterestOp值为16,interestOps & readInterestOp值为0,所以执行了selectionKey.interestOps(interestOps | readInterestOp);,等同于执行了selectionKey.interestOps(SelectionKey.OP_ACCEPT);

01 protected void AbstractNioChannel.doBeginRead() throws Exception {
02     if (inputShutdown) {
03         return;
04     }
05   
06     final SelectionKey selectionKey = this.selectionKey;
07     if (!selectionKey.isValid()) {
08         return;
09     }
10   
11     final int interestOps = selectionKey.interestOps();
12     if ((interestOps & readInterestOp) == 0) {
13         selectionKey.interestOps(interestOps | readInterestOp);
14     }
15 }

至此,整个DefaultPromise.bind方法执行完毕,下面开始执行DefaultPromise.sync()。而此时在 tag2.1.1.1.4.1.1 已经将值设为SUCCESS了,所以不需要等待,直接返回。

01 @Override
02 public Promise<V> DefaultPromise.sync() throws InterruptedException {
03     await();
04     rethrowIfFailed();
05     return this;
06 }
07   
08  @Override
09 public Promise<V> DefaultPromise.await() throws InterruptedException {
10     if (isDone()) {
11         return this;
12     }
13   
14     if (Thread.interrupted()) {
15         throw new InterruptedException(toString());
16     }
17   
18     synchronized (this) {
19         while (!isDone()) {
20             checkDeadLock();
21             incWaiters();
22             try {
23                 wait();
24             } finally {
25                 decWaiters();
26             }
27         }
28     }
29     return this;
30 }

然后系统接着执行了 b.bind(port).sync().channel().closeFuture().sync();的后半截方法“channel().closeFuture().sync()”方法。而由于closeFuture这个属性的执行结果一直没有赋值,所以被wait了,从而一直处于wait状态。

至此,主线程处于wait状态,并通过子线程无限循环,来完成客户端请求。


小结

通过channel方法设置不同的通道类型,通过childHandler设置SocketChannel的Handler链

bind(port)完成的职责很多,远不同于ServerSocket.bind方法。具体包含:initAndRegister和doBind0。

其中initAndRegister又细化了createChannel() 和init(channel)以及channel.unsafe().register(regFuture)这3个大步骤。

  • createChannel内部 使用了childGroup,group().next(),ServerSocketChannel.open()这3个属性来创建NioServerSocketChannel实例,并初始化了默认参数DefaultServerSocketChannelConfig和DefaultChannelPipeline对象。DefaultChannelPipeline对象默认包含设置了HeadHandler和TailHandler。然后设置了ch.configureBlocking(false)模式,并将readInterestOp赋值为SelectionKey.OP_ACCEPT。

  • init(channel方法里面主要涉及到将Parent Channel 和 Child Channel的option和attribute 设值,并将客户端设置的参数覆盖到默认参数中;最后,还将childHandler(new TelnetServerInitializer())中设置的handler加入到pipeline()中。

  • channel.unsafe().register(regFuture) 把ServerBootstrapAcceptor这个Handler加入到Pipeline中

doBind0方法内部执行了javaChannel().register(eventLoop().selector, 0, this); 触发了服务端的channelActive() 事件,并设置了 selectionKey.interestOps(SelectionKey.OP_ACCEPT);


分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics