`
lobin
  • 浏览: 386788 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Netty:事件模型

 
阅读更多

事件模型

在分析Netty的事件模型之前,先回忆一下一种编程模型,不是OO编程,也不是函数式编程,而是一种基于事件驱动的编程方式。这种编程方式实际上我们经常接触到,比如UI编程,点击一个按钮就会触发一个动作响应,这都是事件驱动的编程实现。包括页面加载触发的动作,还有Select, Poll, EPoll这种基于事件驱动的IO机制。

 

一个事件模型包括事件源,触发的事件,以及事件触发的动作响应三部分。

 

1、事件源

指的就是谁触发的事件。比如点击按钮就会触发响应的动作,按钮就是一个事件源。页面加载也会触发相应的动作,页面也就是一个事件源。Select, Poll, EPoll中IO也是一个事件源,触发读写事件。

 

在Netty中我们还将讲到Channel,这也是一个事件源,Channel上产生事件或者IO操作。当然Netty中的事件模型并不只是体现在Channel上。

2、事件

3、事件触发的动作

 

Netty在实现Reactor的时候就采用了这种事件驱动模型。关于Reactor,可以参考POSA 2(Pattern-Oriented Software Architecture, Volume 2, 面向模式的软件架构 第2卷),这是一本关于面向模式的软件架构的书籍。

 

想要理解Netty的Reactor模式实现需要对事件驱动这种模型有一定的理解。还有就是多路复用这种技术,比如Java中的Selector,就是一种multiplexor多路复用这种技术。还有就是Linux下的EPoll这种机制。

 

Netty事件模型的核心组件包括EventLoop、EventLoopGroup、EventExecutor、EventExecutorGroup、Channel、ChannelPipeline以及ChannelHandler等。

 

在事件模型中,必须要有一个事件处理机制,这个事件处理机制负责接收触发的事件,并分发给对应的事件处理,以便响应相应的动作。

 

在Netty中,EventLoop就是这样的一个角色。

 

Netty中的事件处理机制

1、EventLoopGroup

2、EventLoop

3、EventExecutorGroup

4、EventExecutor

 

EventLoop和EventLoopGroup这两个接口不太好理解。从这两个接口定义来看,它们是is-a的关系,也就是说,EventLoop也是一个EventLoopGroup。除了这两个接口,另外还有一个EventExecutorGroup接口。EventLoopGroup和EventExecutorGroup这两个接口也不太好理解。从这两个接口定义来看,它们也是is-a的关系,也就是说,EventLoopGroup也是一个EventExecutorGroup。和EventExecutorGroup接口对应的,还有一个EventExecutor。这两个也是is-a的关系,也就是说,EventExecutor也是一个EventExecutorGroup。

 

这几个接口之间的关系

EventLoop is-a EventLoopGroup is-a EventExecutorGroup

 

EventLoop is-a OrderedEventExecutor is-a EventExecutor is-a EventExecutorGroup

 

EventLoopGroup中有一个重要的next方法,它返回(或者选择)的是一个EventLoop,而在EventExecutorGroup中也有一个重要的next方法,但它返回(或者选择)的是一个EventExecutor。

 

Netty的这几个核心组件关系看着合理,但挺不好理解的。而且在具体实现的时候有些地方挺怪异的,比如EventLoopGroup的next方法是一个挺重要的方法,它返回一个EventLoop,但在ThreadPerChannelEventLoopGroup的实现中不支持,直接抛出UnsupportedOperationException异常,在OioEventLoopGroup中也是这样。这个方法只在MultithreadEventLoopGroup多线程EventLoopGroup中才有用。

 

因为EventExecutor也是一个EventExecutorGroup,而EventLoop是一个EventLoopGroup,同时EventLoopGroup也是一个EventExecutorGroup,所以在注册Channel时,

 

想要理解Netty的这几个核心组件需要对Java的Executor、ExecutorService、ScheduledExecutorService这几个标准接口以及AbstractExecutorService要有个深入的理解。可以参考Java的线程池实现。

 

EventLoop

EventLoop其实就是一个Reactor。简单的理解就是它会去轮训事件或者IO操作,如果Channel有事件或者IO操作产生,比如有连接过来,或者有数据可读或可写,它负责将这些请求分发给对应的ChannelHandler处理.

 

Netty4定义了一个EventLoop接口,用于处理Channel产生的IO操作。

 

EventLoop有一个inEventLoop方法,从EventExecutor接口中继承过来的,用于判断是否当前线程。

 

如果不是当前线程,EventLoop会启动一个线程任务去负责轮训事件或者IO操作,参考SingleThreadEventExecutor类的execute方法。这里会调用一个startThread方法,这个方法会调用doStartThread方法,在这个方法中,会调用run去不断的轮训。

    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

 

 

 

    private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    for (;;) {
                        int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }

                    // Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) {
                        logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                "before run() implementation terminates.");
                    }

                    try {
                        // Run all remaining tasks and shutdown hooks.
                        for (;;) {
                            if (confirmShutdown()) {
                                break;
                            }
                        }
                    } finally {
                        try {
                            cleanup();
                        } finally {
                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.release();
                            if (!taskQueue.isEmpty()) {
                                logger.warn(
                                        "An event executor terminated with " +
                                                "non-empty task queue (" + taskQueue.size() + ')');
                            }

                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });
    }

 

 

NioEventLoop

非阻塞式IO场景下使用。

NioEventLoop继承了SingleThreadEventLoop,所以NioEventLoop是一个单线程的EventLoop。

 

EpollEventLoop

基于Epoll的IO事件处理机制的IO多路复用场景下使用。Epoll是Linux下提供的基于事件的IO处理机制,所以这个只能在Linux下采用使用。而且Java提供的IO多路复用技术在不同OS下所采用的机制不一样。如果是Linux,采用的是Epoll,如果是Windows,采用的是完成端口。

 

具体EpollEventLoop采用的什么样的机制后面再详细说明。

 

EventLoopGroup

对EventLoop进行分组,定义了一个EventLoopGroup接口,这个接口继承了EventExecutorGroup接口,定义也比较简单。

1、next

    EventLoop next();

 

2、register

    ChannelFuture register(Channel channel);

 

    ChannelFuture register(ChannelPromise promise);

 

    ChannelFuture register(Channel channel, ChannelPromise promise);

 

其中next方法用于从Group中返回(或者选择)一个EventLoop进行Looping。一旦Channel产生了事件或者IO操作,就可以Looping到产生的事件或者IO操作,进而进行处理。register方法用于注册一个Channel和(或者)一个ChannelPromise,在注册的时候,会选择一个EventLoop进行注册,后面在看具体实现的时候可以分析下具体的选择策略。

 

 

EventLoopGroup接口有一个AbstractEventLoopGroup抽象实现。不过并没有具体实现继承了这个抽象类。仅仅定义了一个next抽象方法。这个抽象方法覆盖了父接口EventExecutorGroup的next方法,而对父接口EventLoopGroup的next方法进行了抽象定义。

1、next

    public abstract EventLoop next();

这个设计挺怪异的。虽说Java类并没有像C++那样支持多继承,但接口支持多继承,从这里来看,Java接口的多继承机制也挺怪异的。

 

 

和EventLoopGroup对应的,还有一个EventExecutorGroup。EventLoopGroup也是一个EventExecutorGroup。

EventExecutorGroup

EventExecutorGroup接口扩展了Java的ScheduledExecutorService接口,对于Java程序员来说,应该对这个是了解的,Java中的线程池实现一个是ThreadPoolExecutor,另一个就是ScheduledThreadPoolExecutor,这个是一个可调度的线城池实现,就实现了ScheduledExecutorService接口。

 

ScheduledExecutorService接口定义很简单,就几个方法:

1、schedule

    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

 

    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

 

2、scheduleAtFixedRate

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);

 

3、scheduleWithFixedDelay

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);

 

ScheduledExecutorService接口扩展了ExecutorService接口

1、shutdown

    void shutdown();

 

2、shutdownNow

    List<Runnable> shutdownNow();

 

3、isShutdown

    boolean isShutdown();

 

4、isTerminated

    boolean isTerminated();

 

5、awaitTermination

    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

 

6、submit

    <T> Future<T> submit(Callable<T> task);

 

    <T> Future<T> submit(Runnable task, T result);

 

    Future<?> submit(Runnable task);

 

7、invokeAll

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;

 

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;

 

8、invokeAny

    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

 

    <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

 

ExecutorService接口又扩展了Executor接口

1、execute

    void execute(Runnable command);

 

EventExecutorGroup接口也扩展了自己的一些定义

1、isShuttingDown

    boolean isShuttingDown();

 

2、shutdownGracefully

    Future<?> shutdownGracefully();

 

    Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);

 

3、terminationFuture

    Future<?> terminationFuture();

 

4、shutdown

    void shutdown();

 

5、shutdownNow

    List<Runnable> shutdownNow();

 

6、next

    EventExecutor next();

 

7、iterator

    Iterator<EventExecutor> iterator();

 

8、submit

    Future<?> submit(Runnable task);

 

    <T> Future<T> submit(Runnable task, T result);

 

    <T> Future<T> submit(Callable<T> task);

 

9、schedule

    ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

 

    <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

 

10、scheduleAtFixedRate

    ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);

 

11、scheduleWithFixedDelay

    ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);

 

说到线城池,我们也可以通过实现Executor、ExecutorService或者ScheduledExecutorService接口来实现自己的线城池。且不说实现一个线程池难度如何,从ExecutorService接口定义来看,就要实现不少接口方法。不过可以参考ThreadPoolExecutor、ScheduledThreadPoolExecutor实现,Java中的这两个线城池实现的也不错。

 

EventExecutor

 

 

MultithreadEventLoopGroup

多线程EventLoopGroup。包括:

1、DefaultEventLoopGroup(LocalEventLoopGroup)

2、NioEventLoopGroup

3、EpollEventLoopGroup

 

NioEventLoopGroup

这是一个多线程模型的EventLoopGroup。NioEventLoopGroup会创建指定线程数的EventExecutor,参考MultithreadEventExecutorGroup,NioEventLoopGroup间接继承了MultithreadEventExecutorGroup。

 

默认情况下,也就是在创建NioEventLoopGroup对象的时候使用无参构造方法或者线程数指定0的时候,线程数为逻辑核数*2。

        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(

                "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));

 

Channel

Channel是一个很重要的概念,这是一个很抽象的概念。通常理解来说,Channel是一个类似于通信的信道。

 

从通信的角度看,通信的双方通过这个Channel进行对话,双方通过这个Channel发送和接受对方的信息。从这个角度看,双方进行通信似乎只需要这一个Channel就可以通信,这的确是可以的,不过在同一时刻只能是单向通信,如果需要同时双向通信的话,从逻辑上来讲,需要两个Channel。

 

如果两台机器需要进行通信,需要通过一个Socket进行通信。从Socket角度看,双方通过一个Socket发送和接收对方的信息,所以Socket就是这样的一个Channel。另外对于TCP来说,通信的双方如果需要通信的话,在通信之前需要建立一个连接。如果两台机器需要进行TCP通信的话,也就需要两个Socket,一台机器对应有一个Socket。实际上至少会有两个Socket,通常会有三个Socket,一方发起连接,另一方接收到这个连接后,也会有一个Socket,这个Socket代表连接方的Socket。在客户端服务器编程中,也就是我们习惯说的服务端Socket和客户端Socket。这两个概念比较容易混淆,需要注意的是,通信的双方并不是在各自创建的Socket上发送和接收对方的信息,一方接收到另一方发起的连接,会得到一个代表另一方连接的Socket,连接方通过自己创建的Socket和对方通信,这个Socket代表的是对方的Socket,或者说其实代表的是服务端Socket,在这个Socket上发送和接收另一方的信息,另一方通过得到的这个代表连接方连接的Socket和连接方通信,发送和接收连接方的信息。

 

另外需要注意的是,通信的双方Socket其实是对等的,并不存在所谓的客户端和服务端Socket的区别。Linux在实现Socket这块体现的非常好的。

 

在一些Socket实现中,尤其是在一些编程语言中,在实现Socket这块把Socket分为客户端和服务端Socket,比如Java,这其实是结合了面向对象思想,从OO的角度看待Socket。引入了客户端/服务端这种模式思想。

 

Channel是Netty中的一个核心组件。Netty定义了一个Channel接口。Channel打开后需要注册到EventLoop中,先是调用EventLoopGroup的register注册方法,这个方法会调用next方法,选择一个EventLoop,然后调用EventLoop的register注册方法将这个Channel封装成一个ChannelPromise进行注册。最后还是会调回到Channel的register注册方法。参考AbstractChannel的register方法。

 

        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

 

这里会调用AbstractChannel的register0方法。在这个方法中会调用doRegister方法,最后会调用到具体的Channel的doRegister方法进行注册。比如在NioServerSocketChannel、NioSocketChannel这些非阻塞式(NIO)Channel中,doRegister方法实现统一在AbstractNioChannel中实现。参考AbstractNioChannel的doRegister方法。

    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }

 

这里会调用Java的Channel(这里应该是SelectableChannel)的register方法进行注册。初始注册时,interest set指定为0,attachment保存的是Netty的要注册的Channel。

 

Channel注册后,如果有注册的ChannelHandler还有未触发调用的动作还在pending等待队列中,会调用DefaultChannelPipeline的invokeHandlerAddedIfNeeded方法,这个方法会调用callHandlerAddedForAllHandlers方法按照顺序执行这些触发调用的动作。同时还会触发Channel注册时要调用的动作,这里Channel注册也是一个事件,Channel注册后,会触发Channel注册事件。这里会调用ChannelPipeline的fireChannelRegistered方法,这个方法会调用注册的ChannelHandler的channelRegistered方法。

 

Unsafe

在Channel接口内还定义了一个Unsafe接口。这个接口不可以在外部用户程序中调用。

     * <em>Unsafe</em> operations that should <em>never</em> be called from user-code. These methods

     * are only provided to implement the actual transport, and must be invoked from an I/O thread except for the

     * following methods:

 

NioServerSocketChannel

这其实是一个连接通道,对应接收方的Socket,负责建立和连接方的连接。需要注意的是,Java在实现Socket这块把Socket分为客户端和服务端Socket,NioServerSocketChannel对应的是服务器端的ServerSocket。

 

这个Channel其实是一个比较特殊的Channel。首先它是非阻塞的(NIO),另外它是一个ServerSocket的Channel,这个Channel只能在这个上面接收客户端的连接,获取和客户端通信的Channel。不过Netty在实现NioServerSocketChannel的时候,确是直接继承了AbstractNioMessageChannel类。这样就得在实现AbstractNioMessageChannel的doWriteMessage方法,另外AbstractNioMessageChannel又继承了AbstractNioChannel类,就得实现AbstractNioChannel的doFinishConnect和doConnect方法,还有AbstractNioChannel又继承了AbstractChannel类,又得实现AbstractChannel的doDisconnect方法,Netty不得不在这些方法中直接报UnsupportedOperationException。

 

NioServerSocketChannel中主要就一个doReadMessages方法。这个方法调用Java的ServerSocketChannel的accept方法获取和客户端连接的通道SocketChannel,通过这个Channel和客户端通信。

 

    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }

 

 

ChannelPipeline

在DefaultChannelPipeline中还维护了一个注册添加、删除ChannelHandler的等待队列,也就是一个pending队列,这个等待队列是一个单链表。在对应的Channel还没有被注册时,向ChannelPipeline中注册添加、删除ChannelHandler后,不会调用对应ChannelHandler的handlerAdded和handlerRemoved方法,而是把这个要触发调用的动作放在一个pending等待队列中。将这个要触发调用的动作封装成一个PendingHandlerCallback放在这个等待队列中,如果是注册添加ChannelHandler,会将这个ChannelHandler对应的ChannelHandlerContext封装成一个PendingHandlerAddedTask,如果是删除注册的ChannelHandler,会将这个ChannelHandler对应的ChannelHandlerContext封装成一个PendingHandlerRemovedTask,然后放在队列中。

 

这个等待队列只有一个对头指向而没有一个队尾,在向这个队列中添加时,需要遍历找到队尾,才能在队尾进行添加。

 

ChannelHandler

1、handlerAdded

2、handlerRemoved

3、exceptionCaught

在注册添加、删除ChannelHandler或者报异常时会触发调用这几个方法。比如通过ChannelPipeline注册添加ChannelHandler时,会调用ChannelHandler的handlerAdded方法。在删除注册的ChannelHandler时,会调用handlerRemoved方法。

 

ByteBuf

Netty实现了自己的一套Buffer。

Netty的Buffer定义为一个可以同时随机和线性访问的有限序列。

 

 * A random and sequential accessible sequence of zero or more bytes (octets).

 * This interface provides an abstract view for one or more primitive byte

 * arrays ({@code byte[]}) and {@linkplain ByteBuffer NIO buffers}.

 

Netty的Buffer支持扩容或者缩容。

支持discard。

支持线性读写操作:readerIndex和writerIndex

 

和Java中的ByteBuffer比较

Java中的Buffer就是我们说的读写缓冲。

 

我们在进行IO操作的时候,其实就涉及到这个东西了,只是这个IO缓冲一般我们不会直接去操作,所以平常我们不去深究的话,就没有感觉到读写缓冲的存在。这个东西其实在我们调用printf标准函数或者Java中的System.out.println的时候就涉及到写缓冲了,以Java为例,调用println打印输出的时候,输出的内容首先写入到一个写缓冲中,然后才会flush刷入输出到控制台上。

 

还有我们在读取数据的时候,比如我们读取文件,或者接收从客户端发送过来的数据,我们通常也会申请一块缓冲,将读取出来的数据缓存在这块缓冲区中。

 

这些都是我们说的缓冲,不过我们说的这些都比较简单,基本上仅仅是申请了一块内存缓存区域。

 

Java中的Buffer跟我们说的缓冲没有本质区别。不过在我们的基础上,有了更明确的定义和实现。

 * A container for data of a specific primitive type.

 *

 * <p> A buffer is a linear, finite sequence of elements of a specific

 * primitive type.  Aside from its content, the essential properties of a

 * buffer are its capacity, limit, and position: </p>

 *

 * <blockquote>

 *

 *   <p> A buffer's <i>capacity</i> is the number of elements it contains.  The

 *   capacity of a buffer is never negative and never changes.  </p>

 *

 *   <p> A buffer's <i>limit</i> is the index of the first element that should

 *   not be read or written.  A buffer's limit is never negative and is never

 *   greater than its capacity.  </p>

 *

 *   <p> A buffer's <i>position</i> is the index of the next element to be

 *   read or written.  A buffer's position is never negative and is never

 *   greater than its limit.  </p>

 *

 * </blockquote>

Java中的Buffer定义为一个线性的有限序列。

 

Buffer的几个重要属性

1、capacity

2、position

3、limit

4、mark

 

其中capacity表示Buffer的容量大小,position表示开始读写的位置,比如读的时候从哪里开始读,写入的时候从哪里开始写入,每次读写后都会更新position的值。limit表示读写的时候不能超过limit的限制,mark记录一个标记位置,如果reset的话,position将被重新设置为mark的值。

 

这几个属性必须满足以下条件:

mark <= position <= limit <= capacity

 

Java中的Buffer不支持扩容或者缩容,容量在初始化的时候就是指定了的。

 

mark

记录当前位置,将mark设置为当前位置。后面调用reset的话,position将被重新设置为mark的值。

 

flip

将limit设置为当前position的值,position重置为0,mark重置为-1。

 

rewind

调用rewind将使Buffer倒回到一个"初始"的状态,这个"初始"的状态并不是初始化时候的状态,而是会position重置为0,mark重置为-1。

 

ByteBuffer

这个是Buffer的一个字节序的抽象实现。

slice

调用slice会创建一个新的ByteBuffer实例,但这个ByteBuffer实例会共享之前这个ByteBuffer的内容。但新的ByteBuffer实例内容从之前这个ByteBuffer的position开始,也就是新的ByteBuffer实例的limit和capacity从前这个ByteBuffer的position开始算起(newLimit = limit - position, newCapacity = capacity - position),且新的ByteBuffer实例的offset被设置为之前的ByteBuffer的position。

 

 

public class NioEventLoop_ServerTest {

	private Selector selector(NioEventLoop eventLoop) throws NoSuchFieldException {
		Class<?> clazz = eventLoop.getClass();
		Field field = null;
		try {
			field = clazz.getDeclaredField("selector");
		} catch (NoSuchFieldException e) {
			field = clazz.getDeclaredField("unwrappedSelector");
		}

		field.setAccessible(true);
		try {
			return (Selector) field.get(eventLoop);
		} catch (IllegalAccessException e) {
			throw new NoSuchFieldException(e.getMessage());
		}
	}
	
	private class SocketChannelHandler implements NioTask<SelectableChannel> {

		private NioEventLoop eventLoop;
		
		public SocketChannelHandler(NioEventLoop eventLoop) {
			this.eventLoop = eventLoop;
		}
		
		@Override
		public void channelReady(SelectableChannel ch, SelectionKey key) throws Exception {
			System.out.println("channelReady...");
			
			if (key.isAcceptable()) {
				ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
				try {
					SocketChannel socketChannel = serverSocketChannel.accept();
					if (socketChannel != null) {
						socketChannel.configureBlocking(false);

						eventLoop.register(socketChannel, SelectionKey.OP_READ, this);
					}
				} catch (ClosedChannelException e) {
					e.printStackTrace();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
			
			if (key.isReadable()) {
				SocketChannel socketChannel = (SocketChannel) key.channel();
				ByteBuffer buffer = ByteBuffer.allocate(256);
				try {
					int nbytes = socketChannel.read(buffer);
					System.out.println(nbytes + ", " + buffer.toString());
					buffer.flip();

					byte[] bytes = new byte[buffer.remaining()];
					buffer.get(bytes);
					System.out.println(new String(bytes));
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}

		@Override
		public void channelUnregistered(SelectableChannel ch, Throwable cause) throws Exception {
			System.out.println("channelUnregistered...");
		}
		
	}

	@Test
	public void test() throws Exception {
		System.setProperty("io.netty.noKeySetOptimization", "true");

		SelectorProvider selectorProvider = SelectorProvider.provider();
		SelectStrategyFactory selectStrategyFactory = DefaultSelectStrategyFactory.INSTANCE;
		RejectedExecutionHandler rejectedExecutionHandler = RejectedExecutionHandlers.reject();

		Executor executor = new ThreadPerTaskExecutor(new DefaultThreadFactory(getClass()));
		final NioEventLoop eventLoop = new NioEventLoop(null, executor, selectorProvider,
				selectStrategyFactory.newSelectStrategy(), rejectedExecutionHandler);

		Selector selector = selector(eventLoop);
		System.out.println(selector.getClass());

		ServerSocketChannel channel0 = null;
		try {
			channel0 = ServerSocketChannel.open();
			System.out.println(channel0.getClass().getName());
		} catch (IOException e) {
			e.printStackTrace();
		}

		try {
			channel0.configureBlocking(false);
		} catch (IOException e) {
			e.printStackTrace();
		}
		try {
			channel0.socket().bind(new InetSocketAddress(1239));
		} catch (IOException e) {
			e.printStackTrace();
		}

//		try {
//			SelectionKey key = channel0.register(selector, SelectionKey.OP_ACCEPT);
//			System.out.println(key.interestOps());
//		} catch (ClosedChannelException e) {
//			e.printStackTrace();
//		}
		
		
		eventLoop.register(channel0, SelectionKey.OP_ACCEPT, new SocketChannelHandler(eventLoop));


		Runnable worker = new Runnable() {

			@Override
			public void run() {
				System.out.println("running...");
			}
		};
		eventLoop.execute(worker);

		synchronized (this) {
			try {
				wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

 

 

0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics