`
ahua186186
  • 浏览: 554070 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

summercool-hsf &Netty3.X总结3--服务端启动环节

 
阅读更多
1.初始化NioServerSocketChannelFactory:

(1)初始化并启动NioServerBoss线程:默认是1个:

第一步:通过构造函数里面的openSelector()方法创建Selector并启动一个boss线程:
   /**
     * Start the {@link AbstractNioWorker} and return the {@link Selector} that will be used for
     * the {@link AbstractNioChannel}'s when they get registered
     */
    private void openSelector(ThreadNameDeterminer determiner) {
        try {
            selector = SelectorUtil.open();
        } catch (Throwable t) {
            throw new ChannelException("Failed to create a selector.", t);
        }

        // Start the worker thread with the new Selector.
        boolean success = false;
        try {
            DeadLockProofWorker.start(executor, newThreadRenamingRunnable(id, determiner));
            success = true;
        } finally {
            if (!success) {
                // Release the Selector if the execution fails.
                try {
                    selector.close();
                } catch (Throwable t) {
                    logger.warn("Failed to close a selector.", t);
                }
                selector = null;
                // The method will return to the caller at this point.
            }
        }
        assert selector != null && selector.isOpen();
    }



第二步:在线程的run()方法中轮询Selector的select()操作,检查是否有准备就绪的通道,
然后执行任务:processTaskQueue()以及 process(selector)(注册NioAcceptedSocketChannel到nioworker线程):

 public void run() {
        thread = Thread.currentThread();
        startupLatch.countDown();

        int selectReturnsImmediately = 0;
        Selector selector = this.selector;

        if (selector == null) {
            return;
        }
        // use 80% of the timeout for measure
        final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;
        boolean wakenupFromLoop = false;
        for (;;) {
            wakenUp.set(false);

            try {
                long beforeSelect = System.nanoTime();
                int selected = select(selector);
                if (selected == 0 && !wakenupFromLoop && !wakenUp.get()) {
                    long timeBlocked = System.nanoTime() - beforeSelect;
                    if (timeBlocked < minSelectTimeout) {
                        boolean notConnected = false;
                        // loop over all keys as the selector may was unblocked because of a closed channel
                        for (SelectionKey key: selector.keys()) {
                            SelectableChannel ch = key.channel();
                            try {
                                if (ch instanceof DatagramChannel && !ch.isOpen() ||
                                        ch instanceof SocketChannel && !((SocketChannel) ch).isConnected() &&
                                                // Only cancel if the connection is not pending
                                                // See https://github.com/netty/netty/issues/2931
                                                !((SocketChannel) ch).isConnectionPending()) {
                                    notConnected = true;
                                    // cancel the key just to be on the safe side
                                    key.cancel();
                                }
                            } catch (CancelledKeyException e) {
                                // ignore
                            }
                        }
                        if (notConnected) {
                            selectReturnsImmediately = 0;
                        } else {
                            if (Thread.interrupted() && !shutdown) {
                                // Thread was interrupted but NioSelector was not shutdown.
                                // As this is most likely a bug in the handler of the user or it's client
                                // library we will log it.
                                //
                                // See https://github.com/netty/netty/issues/2426
                                if (logger.isDebugEnabled()) {
                                    logger.debug("Selector.select() returned prematurely because the I/O thread " +
                                            "has been interrupted. Use shutdown() to shut the NioSelector down.");
                                }
                                selectReturnsImmediately = 0;
                            } else {
                                // Returned before the minSelectTimeout elapsed with nothing selected.
                                // This may be because of a bug in JDK NIO Selector provider, so increment the counter
                                // which we will use later to see if it's really the bug in JDK.
                                selectReturnsImmediately ++;
                            }
                        }
                    } else {
                        selectReturnsImmediately = 0;
                    }
                } else {
                    selectReturnsImmediately = 0;
                }

                if (SelectorUtil.EPOLL_BUG_WORKAROUND) {
                    if (selectReturnsImmediately == 1024) {
                        // The selector returned immediately for 10 times in a row,
                        // so recreate one selector as it seems like we hit the
                        // famous epoll(..) jdk bug.
                        rebuildSelector();
                        selector = this.selector;
                        selectReturnsImmediately = 0;
                        wakenupFromLoop = false;
                        // try to select again
                        continue;
                    }
                } else {
                    // reset counter
                    selectReturnsImmediately = 0;
                }

                // 'wakenUp.compareAndSet(false, true)' is always evaluated
                // before calling 'selector.wakeup()' to reduce the wake-up
                // overhead. (Selector.wakeup() is an expensive operation.)
                //
                // However, there is a race condition in this approach.
                // The race condition is triggered when 'wakenUp' is set to
                // true too early.
                //
                // 'wakenUp' is set to true too early if:
                // 1) Selector is waken up between 'wakenUp.set(false)' and
                //    'selector.select(...)'. (BAD)
                // 2) Selector is waken up between 'selector.select(...)' and
                //    'if (wakenUp.get()) { ... }'. (OK)
                //
                // In the first case, 'wakenUp' is set to true and the
                // following 'selector.select(...)' will wake up immediately.
                // Until 'wakenUp' is set to false again in the next round,
                // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                // any attempt to wake up the Selector will fail, too, causing
                // the following 'selector.select(...)' call to block
                // unnecessarily.
                //
                // To fix this problem, we wake up the selector again if wakenUp
                // is true immediately after selector.select(...).
                // It is inefficient in that it wakes up the selector for both
                // the first case (BAD - wake-up required) and the second case
                // (OK - no wake-up required).

                if (wakenUp.get()) {
                    wakenupFromLoop = true;
                    selector.wakeup();
                } else {
                    wakenupFromLoop = false;
                }

                cancelledKeys = 0;
                processTaskQueue();
                selector = this.selector; // processTaskQueue() can call rebuildSelector()

                if (shutdown) {
                    this.selector = null;

                    // process one time again
                    processTaskQueue();

                    for (SelectionKey k: selector.keys()) {
                        close(k);
                    }

                    try {
                        selector.close();
                    } catch (IOException e) {
                        logger.warn(
                                "Failed to close a selector.", e);
                    }
                    shutdownLatch.countDown();
                    break;
                } else {
                    process(selector);
                }
            } catch (Throwable t) {
                logger.warn(
                        "Unexpected exception in the selector loop.", t);

                // Prevent possible consecutive immediate failures that lead to
                // excessive CPU consumption.
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // Ignore.
                }
            }
        }
    }








(2)初始化并启动NioWorker线程:大于1个,可配置,一般配置为:Runtime.getRuntime().availableProcessors() + 1
该线程执行的任务为::processTaskQueue()以及 process(selector)(处理read和write事件)

2.初始化DefaultChannelPipeline的3个属性head,tail,name2ctx。

3.然后通过bind(SocketAddress localAddress)方法初始化一个ServerSocketChannel,并注册到boss线程,注意:注册到boss线程是通过Binder  hanlder实现的:

关键代码:

Channel channel = getFactory().newChannel(bossPipeline);

evt.getChannel().bind(localAddress).addListener(new ChannelFutureListener() {
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        bindFuture.setSuccess();
                    } else {
                        bindFuture.setFailure(future.getCause());
                    }
                }
            });



最后在NioServerSocketPipelineSink中注册到boss线程

public ChannelFuture bindAsync(final SocketAddress localAddress) {
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        Binder binder = new Binder(localAddress);
        ChannelHandler parentHandler = getParentHandler();

        ChannelPipeline bossPipeline = pipeline();
        bossPipeline.addLast("binder", binder);
        if (parentHandler != null) {
            bossPipeline.addLast("userHandler", parentHandler);
        }

        Channel channel = getFactory().newChannel(bossPipeline);
        final ChannelFuture bfuture = new DefaultChannelFuture(channel, false);
        binder.bindFuture.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    bfuture.setSuccess();
                } else {
                    // Call close on bind failure
                    bfuture.getChannel().close();
                    bfuture.setFailure(future.getCause());
                }
            }
        });
        return bfuture;
    }

private final class Binder extends SimpleChannelUpstreamHandler {

        private final SocketAddress localAddress;
        private final Map<String, Object> childOptions =
            new HashMap<String, Object>();
        private final DefaultChannelFuture bindFuture = new DefaultChannelFuture(null, false);
        Binder(SocketAddress localAddress) {
            this.localAddress = localAddress;
        }

        @Override
        public void channelOpen(
                ChannelHandlerContext ctx,
                ChannelStateEvent evt) {

            try {
                evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());

                // Split options into two categories: parent and child.
                Map<String, Object> allOptions = getOptions();
                Map<String, Object> parentOptions = new HashMap<String, Object>();
                for (Entry<String, Object> e: allOptions.entrySet()) {
                    if (e.getKey().startsWith("child.")) {
                        childOptions.put(
                                e.getKey().substring(6),
                                e.getValue());
                    } else if (!"pipelineFactory".equals(e.getKey())) {
                        parentOptions.put(e.getKey(), e.getValue());
                    }
                }

                // Apply parent options.
                evt.getChannel().getConfig().setOptions(parentOptions);
            } finally {
                ctx.sendUpstream(evt);
            }

            evt.getChannel().bind(localAddress).addListener(new ChannelFutureListener() {
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        bindFuture.setSuccess();
                    } else {
                        bindFuture.setFailure(future.getCause());
                    }
                }
            });
        }

        @Override
        public void childChannelOpen(
                ChannelHandlerContext ctx,
                ChildChannelStateEvent e) throws Exception {
            // Apply child options.
            try {
                e.getChildChannel().getConfig().setOptions(childOptions);
            } catch (Throwable t) {
                fireExceptionCaught(e.getChildChannel(), t);
            }
            ctx.sendUpstream(e);
        }

        @Override
        public void exceptionCaught(
                ChannelHandlerContext ctx, ExceptionEvent e)
                throws Exception {
            bindFuture.setFailure(e.getCause());
            ctx.sendUpstream(e);
        }
    }



    NioServerSocketChannel(
            ChannelFactory factory,
            ChannelPipeline pipeline,
            ChannelSink sink, Boss boss, WorkerPool<NioWorker> workerPool) {

        super(factory, pipeline, sink);
        this.boss = boss;
        this.workerPool = workerPool;
        try {
            socket = ServerSocketChannel.open();
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }

        try {
            socket.configureBlocking(false);
        } catch (IOException e) {
            try {
                socket.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }

        config = new DefaultServerSocketChannelConfig(socket.socket());

        fireChannelOpen(this);
    }
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics