事件模型
在分析Netty的事件模型之前,先回忆一下一种编程模型,不是OO编程,也不是函数式编程,而是一种基于事件驱动的编程方式。这种编程方式实际上我们经常接触到,比如UI编程,点击一个按钮就会触发一个动作响应,这都是事件驱动的编程实现。包括页面加载触发的动作,还有Select, Poll, EPoll这种基于事件驱动的IO机制。
一个事件模型包括事件源,触发的事件,以及事件触发的动作响应三部分。
1、事件源
指的就是谁触发的事件。比如点击按钮就会触发响应的动作,按钮就是一个事件源。页面加载也会触发相应的动作,页面也就是一个事件源。Select, Poll, EPoll中IO也是一个事件源,触发读写事件。
在Netty中我们还将讲到Channel,这也是一个事件源,Channel上产生事件或者IO操作。当然Netty中的事件模型并不只是体现在Channel上。
2、事件
3、事件触发的动作
Netty在实现Reactor的时候就采用了这种事件驱动模型。关于Reactor,可以参考POSA 2(Pattern-Oriented Software Architecture, Volume 2, 面向模式的软件架构 第2卷),这是一本关于面向模式的软件架构的书籍。
想要理解Netty的Reactor模式实现需要对事件驱动这种模型有一定的理解。还有就是多路复用这种技术,比如Java中的Selector,就是一种multiplexor多路复用这种技术。还有就是Linux下的EPoll这种机制。
Netty事件模型的核心组件包括EventLoop、EventLoopGroup、EventExecutor、EventExecutorGroup、Channel、ChannelPipeline以及ChannelHandler等。
在事件模型中,必须要有一个事件处理机制,这个事件处理机制负责接收触发的事件,并分发给对应的事件处理,以便响应相应的动作。
在Netty中,EventLoop就是这样的一个角色。
Netty中的事件处理机制
1、EventLoopGroup
2、EventLoop
3、EventExecutorGroup
4、EventExecutor
EventLoop和EventLoopGroup这两个接口不太好理解。从这两个接口定义来看,它们是is-a的关系,也就是说,EventLoop也是一个EventLoopGroup。除了这两个接口,另外还有一个EventExecutorGroup接口。EventLoopGroup和EventExecutorGroup这两个接口也不太好理解。从这两个接口定义来看,它们也是is-a的关系,也就是说,EventLoopGroup也是一个EventExecutorGroup。和EventExecutorGroup接口对应的,还有一个EventExecutor。这两个也是is-a的关系,也就是说,EventExecutor也是一个EventExecutorGroup。
这几个接口之间的关系
EventLoop is-a EventLoopGroup is-a EventExecutorGroup
EventLoop is-a OrderedEventExecutor is-a EventExecutor is-a EventExecutorGroup
EventLoopGroup中有一个重要的next方法,它返回(或者选择)的是一个EventLoop,而在EventExecutorGroup中也有一个重要的next方法,但它返回(或者选择)的是一个EventExecutor。
Netty的这几个核心组件关系看着合理,但挺不好理解的。而且在具体实现的时候有些地方挺怪异的,比如EventLoopGroup的next方法是一个挺重要的方法,它返回一个EventLoop,但在ThreadPerChannelEventLoopGroup的实现中不支持,直接抛出UnsupportedOperationException异常,在OioEventLoopGroup中也是这样。这个方法只在MultithreadEventLoopGroup多线程EventLoopGroup中才有用。
因为EventExecutor也是一个EventExecutorGroup,而EventLoop是一个EventLoopGroup,同时EventLoopGroup也是一个EventExecutorGroup,所以在注册Channel时,
想要理解Netty的这几个核心组件需要对Java的Executor、ExecutorService、ScheduledExecutorService这几个标准接口以及AbstractExecutorService要有个深入的理解。可以参考Java的线程池实现。
EventLoop
EventLoop其实就是一个Reactor。简单的理解就是它会去轮训事件或者IO操作,如果Channel有事件或者IO操作产生,比如有连接过来,或者有数据可读或可写,它负责将这些请求分发给对应的ChannelHandler处理.
Netty4定义了一个EventLoop接口,用于处理Channel产生的IO操作。
EventLoop有一个inEventLoop方法,从EventExecutor接口中继承过来的,用于判断是否当前线程。
如果不是当前线程,EventLoop会启动一个线程任务去负责轮训事件或者IO操作,参考SingleThreadEventExecutor类的execute方法。这里会调用一个startThread方法,这个方法会调用doStartThread方法,在这个方法中,会调用run去不断的轮训。
public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { for (;;) { int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this); if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) { break; } } // Check if confirmShutdown() was called at the end of the loop. if (success && gracefulShutdownStartTime == 0) { logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " + "before run() implementation terminates."); } try { // Run all remaining tasks and shutdown hooks. for (;;) { if (confirmShutdown()) { break; } } } finally { try { cleanup(); } finally { STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); threadLock.release(); if (!taskQueue.isEmpty()) { logger.warn( "An event executor terminated with " + "non-empty task queue (" + taskQueue.size() + ')'); } terminationFuture.setSuccess(null); } } } } }); }
NioEventLoop
非阻塞式IO场景下使用。
NioEventLoop继承了SingleThreadEventLoop,所以NioEventLoop是一个单线程的EventLoop。
EpollEventLoop
基于Epoll的IO事件处理机制的IO多路复用场景下使用。Epoll是Linux下提供的基于事件的IO处理机制,所以这个只能在Linux下采用使用。而且Java提供的IO多路复用技术在不同OS下所采用的机制不一样。如果是Linux,采用的是Epoll,如果是Windows,采用的是完成端口。
具体EpollEventLoop采用的什么样的机制后面再详细说明。
EventLoopGroup
对EventLoop进行分组,定义了一个EventLoopGroup接口,这个接口继承了EventExecutorGroup接口,定义也比较简单。
1、next
EventLoop next();
2、register
ChannelFuture register(Channel channel);
ChannelFuture register(ChannelPromise promise);
ChannelFuture register(Channel channel, ChannelPromise promise);
其中next方法用于从Group中返回(或者选择)一个EventLoop进行Looping。一旦Channel产生了事件或者IO操作,就可以Looping到产生的事件或者IO操作,进而进行处理。register方法用于注册一个Channel和(或者)一个ChannelPromise,在注册的时候,会选择一个EventLoop进行注册,后面在看具体实现的时候可以分析下具体的选择策略。
EventLoopGroup接口有一个AbstractEventLoopGroup抽象实现。不过并没有具体实现继承了这个抽象类。仅仅定义了一个next抽象方法。这个抽象方法覆盖了父接口EventExecutorGroup的next方法,而对父接口EventLoopGroup的next方法进行了抽象定义。
1、next
public abstract EventLoop next();
这个设计挺怪异的。虽说Java类并没有像C++那样支持多继承,但接口支持多继承,从这里来看,Java接口的多继承机制也挺怪异的。
和EventLoopGroup对应的,还有一个EventExecutorGroup。EventLoopGroup也是一个EventExecutorGroup。
EventExecutorGroup
EventExecutorGroup接口扩展了Java的ScheduledExecutorService接口,对于Java程序员来说,应该对这个是了解的,Java中的线程池实现一个是ThreadPoolExecutor,另一个就是ScheduledThreadPoolExecutor,这个是一个可调度的线城池实现,就实现了ScheduledExecutorService接口。
ScheduledExecutorService接口定义很简单,就几个方法:
1、schedule
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
2、scheduleAtFixedRate
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
3、scheduleWithFixedDelay
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
ScheduledExecutorService接口扩展了ExecutorService接口
1、shutdown
void shutdown();
2、shutdownNow
List<Runnable> shutdownNow();
3、isShutdown
boolean isShutdown();
4、isTerminated
boolean isTerminated();
5、awaitTermination
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
6、submit
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
7、invokeAll
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
8、invokeAny
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
ExecutorService接口又扩展了Executor接口
1、execute
void execute(Runnable command);
EventExecutorGroup接口也扩展了自己的一些定义
1、isShuttingDown
boolean isShuttingDown();
2、shutdownGracefully
Future<?> shutdownGracefully();
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
3、terminationFuture
Future<?> terminationFuture();
4、shutdown
void shutdown();
5、shutdownNow
List<Runnable> shutdownNow();
6、next
EventExecutor next();
7、iterator
Iterator<EventExecutor> iterator();
8、submit
Future<?> submit(Runnable task);
<T> Future<T> submit(Runnable task, T result);
<T> Future<T> submit(Callable<T> task);
9、schedule
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
10、scheduleAtFixedRate
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
11、scheduleWithFixedDelay
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
说到线城池,我们也可以通过实现Executor、ExecutorService或者ScheduledExecutorService接口来实现自己的线城池。且不说实现一个线程池难度如何,从ExecutorService接口定义来看,就要实现不少接口方法。不过可以参考ThreadPoolExecutor、ScheduledThreadPoolExecutor实现,Java中的这两个线城池实现的也不错。
EventExecutor
MultithreadEventLoopGroup
多线程EventLoopGroup。包括:
1、DefaultEventLoopGroup(LocalEventLoopGroup)
2、NioEventLoopGroup
3、EpollEventLoopGroup
NioEventLoopGroup
这是一个多线程模型的EventLoopGroup。NioEventLoopGroup会创建指定线程数的EventExecutor,参考MultithreadEventExecutorGroup,NioEventLoopGroup间接继承了MultithreadEventExecutorGroup。
默认情况下,也就是在创建NioEventLoopGroup对象的时候使用无参构造方法或者线程数指定0的时候,线程数为逻辑核数*2。
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
Channel
Channel是一个很重要的概念,这是一个很抽象的概念。通常理解来说,Channel是一个类似于通信的信道。
从通信的角度看,通信的双方通过这个Channel进行对话,双方通过这个Channel发送和接受对方的信息。从这个角度看,双方进行通信似乎只需要这一个Channel就可以通信,这的确是可以的,不过在同一时刻只能是单向通信,如果需要同时双向通信的话,从逻辑上来讲,需要两个Channel。
如果两台机器需要进行通信,需要通过一个Socket进行通信。从Socket角度看,双方通过一个Socket发送和接收对方的信息,所以Socket就是这样的一个Channel。另外对于TCP来说,通信的双方如果需要通信的话,在通信之前需要建立一个连接。如果两台机器需要进行TCP通信的话,也就需要两个Socket,一台机器对应有一个Socket。实际上至少会有两个Socket,通常会有三个Socket,一方发起连接,另一方接收到这个连接后,也会有一个Socket,这个Socket代表连接方的Socket。在客户端服务器编程中,也就是我们习惯说的服务端Socket和客户端Socket。这两个概念比较容易混淆,需要注意的是,通信的双方并不是在各自创建的Socket上发送和接收对方的信息,一方接收到另一方发起的连接,会得到一个代表另一方连接的Socket,连接方通过自己创建的Socket和对方通信,这个Socket代表的是对方的Socket,或者说其实代表的是服务端Socket,在这个Socket上发送和接收另一方的信息,另一方通过得到的这个代表连接方连接的Socket和连接方通信,发送和接收连接方的信息。
另外需要注意的是,通信的双方Socket其实是对等的,并不存在所谓的客户端和服务端Socket的区别。Linux在实现Socket这块体现的非常好的。
在一些Socket实现中,尤其是在一些编程语言中,在实现Socket这块把Socket分为客户端和服务端Socket,比如Java,这其实是结合了面向对象思想,从OO的角度看待Socket。引入了客户端/服务端这种模式思想。
Channel是Netty中的一个核心组件。Netty定义了一个Channel接口。Channel打开后需要注册到EventLoop中,先是调用EventLoopGroup的register注册方法,这个方法会调用next方法,选择一个EventLoop,然后调用EventLoop的register注册方法将这个Channel封装成一个ChannelPromise进行注册。最后还是会调回到Channel的register注册方法。参考AbstractChannel的register方法。
public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
这里会调用AbstractChannel的register0方法。在这个方法中会调用doRegister方法,最后会调用到具体的Channel的doRegister方法进行注册。比如在NioServerSocketChannel、NioSocketChannel这些非阻塞式(NIO)Channel中,doRegister方法实现统一在AbstractNioChannel中实现。参考AbstractNioChannel的doRegister方法。
protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
这里会调用Java的Channel(这里应该是SelectableChannel)的register方法进行注册。初始注册时,interest set指定为0,attachment保存的是Netty的要注册的Channel。
Channel注册后,如果有注册的ChannelHandler还有未触发调用的动作还在pending等待队列中,会调用DefaultChannelPipeline的invokeHandlerAddedIfNeeded方法,这个方法会调用callHandlerAddedForAllHandlers方法按照顺序执行这些触发调用的动作。同时还会触发Channel注册时要调用的动作,这里Channel注册也是一个事件,Channel注册后,会触发Channel注册事件。这里会调用ChannelPipeline的fireChannelRegistered方法,这个方法会调用注册的ChannelHandler的channelRegistered方法。
Unsafe
在Channel接口内还定义了一个Unsafe接口。这个接口不可以在外部用户程序中调用。
* <em>Unsafe</em> operations that should <em>never</em> be called from user-code. These methods
* are only provided to implement the actual transport, and must be invoked from an I/O thread except for the
* following methods:
NioServerSocketChannel
这其实是一个连接通道,对应接收方的Socket,负责建立和连接方的连接。需要注意的是,Java在实现Socket这块把Socket分为客户端和服务端Socket,NioServerSocketChannel对应的是服务器端的ServerSocket。
这个Channel其实是一个比较特殊的Channel。首先它是非阻塞的(NIO),另外它是一个ServerSocket的Channel,这个Channel只能在这个上面接收客户端的连接,获取和客户端通信的Channel。不过Netty在实现NioServerSocketChannel的时候,确是直接继承了AbstractNioMessageChannel类。这样就得在实现AbstractNioMessageChannel的doWriteMessage方法,另外AbstractNioMessageChannel又继承了AbstractNioChannel类,就得实现AbstractNioChannel的doFinishConnect和doConnect方法,还有AbstractNioChannel又继承了AbstractChannel类,又得实现AbstractChannel的doDisconnect方法,Netty不得不在这些方法中直接报UnsupportedOperationException。
NioServerSocketChannel中主要就一个doReadMessages方法。这个方法调用Java的ServerSocketChannel的accept方法获取和客户端连接的通道SocketChannel,通过这个Channel和客户端通信。
protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }
ChannelPipeline
在DefaultChannelPipeline中还维护了一个注册添加、删除ChannelHandler的等待队列,也就是一个pending队列,这个等待队列是一个单链表。在对应的Channel还没有被注册时,向ChannelPipeline中注册添加、删除ChannelHandler后,不会调用对应ChannelHandler的handlerAdded和handlerRemoved方法,而是把这个要触发调用的动作放在一个pending等待队列中。将这个要触发调用的动作封装成一个PendingHandlerCallback放在这个等待队列中,如果是注册添加ChannelHandler,会将这个ChannelHandler对应的ChannelHandlerContext封装成一个PendingHandlerAddedTask,如果是删除注册的ChannelHandler,会将这个ChannelHandler对应的ChannelHandlerContext封装成一个PendingHandlerRemovedTask,然后放在队列中。
这个等待队列只有一个对头指向而没有一个队尾,在向这个队列中添加时,需要遍历找到队尾,才能在队尾进行添加。
ChannelHandler
1、handlerAdded
2、handlerRemoved
3、exceptionCaught
在注册添加、删除ChannelHandler或者报异常时会触发调用这几个方法。比如通过ChannelPipeline注册添加ChannelHandler时,会调用ChannelHandler的handlerAdded方法。在删除注册的ChannelHandler时,会调用handlerRemoved方法。
ByteBuf
Netty实现了自己的一套Buffer。
Netty的Buffer定义为一个可以同时随机和线性访问的有限序列。
* A random and sequential accessible sequence of zero or more bytes (octets).
* This interface provides an abstract view for one or more primitive byte
* arrays ({@code byte[]}) and {@linkplain ByteBuffer NIO buffers}.
Netty的Buffer支持扩容或者缩容。
支持discard。
支持线性读写操作:readerIndex和writerIndex
和Java中的ByteBuffer比较
Java中的Buffer就是我们说的读写缓冲。
我们在进行IO操作的时候,其实就涉及到这个东西了,只是这个IO缓冲一般我们不会直接去操作,所以平常我们不去深究的话,就没有感觉到读写缓冲的存在。这个东西其实在我们调用printf标准函数或者Java中的System.out.println的时候就涉及到写缓冲了,以Java为例,调用println打印输出的时候,输出的内容首先写入到一个写缓冲中,然后才会flush刷入输出到控制台上。
还有我们在读取数据的时候,比如我们读取文件,或者接收从客户端发送过来的数据,我们通常也会申请一块缓冲,将读取出来的数据缓存在这块缓冲区中。
这些都是我们说的缓冲,不过我们说的这些都比较简单,基本上仅仅是申请了一块内存缓存区域。
Java中的Buffer跟我们说的缓冲没有本质区别。不过在我们的基础上,有了更明确的定义和实现。
* A container for data of a specific primitive type.
*
* <p> A buffer is a linear, finite sequence of elements of a specific
* primitive type. Aside from its content, the essential properties of a
* buffer are its capacity, limit, and position: </p>
*
* <blockquote>
*
* <p> A buffer's <i>capacity</i> is the number of elements it contains. The
* capacity of a buffer is never negative and never changes. </p>
*
* <p> A buffer's <i>limit</i> is the index of the first element that should
* not be read or written. A buffer's limit is never negative and is never
* greater than its capacity. </p>
*
* <p> A buffer's <i>position</i> is the index of the next element to be
* read or written. A buffer's position is never negative and is never
* greater than its limit. </p>
*
* </blockquote>
Java中的Buffer定义为一个线性的有限序列。
Buffer的几个重要属性
1、capacity
2、position
3、limit
4、mark
其中capacity表示Buffer的容量大小,position表示开始读写的位置,比如读的时候从哪里开始读,写入的时候从哪里开始写入,每次读写后都会更新position的值。limit表示读写的时候不能超过limit的限制,mark记录一个标记位置,如果reset的话,position将被重新设置为mark的值。
这几个属性必须满足以下条件:
mark <= position <= limit <= capacity
Java中的Buffer不支持扩容或者缩容,容量在初始化的时候就是指定了的。
mark
记录当前位置,将mark设置为当前位置。后面调用reset的话,position将被重新设置为mark的值。
flip
将limit设置为当前position的值,position重置为0,mark重置为-1。
rewind
调用rewind将使Buffer倒回到一个"初始"的状态,这个"初始"的状态并不是初始化时候的状态,而是会position重置为0,mark重置为-1。
ByteBuffer
这个是Buffer的一个字节序的抽象实现。
slice
调用slice会创建一个新的ByteBuffer实例,但这个ByteBuffer实例会共享之前这个ByteBuffer的内容。但新的ByteBuffer实例内容从之前这个ByteBuffer的position开始,也就是新的ByteBuffer实例的limit和capacity从前这个ByteBuffer的position开始算起(newLimit = limit - position, newCapacity = capacity - position),且新的ByteBuffer实例的offset被设置为之前的ByteBuffer的position。
public class NioEventLoop_ServerTest { private Selector selector(NioEventLoop eventLoop) throws NoSuchFieldException { Class<?> clazz = eventLoop.getClass(); Field field = null; try { field = clazz.getDeclaredField("selector"); } catch (NoSuchFieldException e) { field = clazz.getDeclaredField("unwrappedSelector"); } field.setAccessible(true); try { return (Selector) field.get(eventLoop); } catch (IllegalAccessException e) { throw new NoSuchFieldException(e.getMessage()); } } private class SocketChannelHandler implements NioTask<SelectableChannel> { private NioEventLoop eventLoop; public SocketChannelHandler(NioEventLoop eventLoop) { this.eventLoop = eventLoop; } @Override public void channelReady(SelectableChannel ch, SelectionKey key) throws Exception { System.out.println("channelReady..."); if (key.isAcceptable()) { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); try { SocketChannel socketChannel = serverSocketChannel.accept(); if (socketChannel != null) { socketChannel.configureBlocking(false); eventLoop.register(socketChannel, SelectionKey.OP_READ, this); } } catch (ClosedChannelException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } if (key.isReadable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(256); try { int nbytes = socketChannel.read(buffer); System.out.println(nbytes + ", " + buffer.toString()); buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); System.out.println(new String(bytes)); } catch (IOException e) { e.printStackTrace(); } } } @Override public void channelUnregistered(SelectableChannel ch, Throwable cause) throws Exception { System.out.println("channelUnregistered..."); } } @Test public void test() throws Exception { System.setProperty("io.netty.noKeySetOptimization", "true"); SelectorProvider selectorProvider = SelectorProvider.provider(); SelectStrategyFactory selectStrategyFactory = DefaultSelectStrategyFactory.INSTANCE; RejectedExecutionHandler rejectedExecutionHandler = RejectedExecutionHandlers.reject(); Executor executor = new ThreadPerTaskExecutor(new DefaultThreadFactory(getClass())); final NioEventLoop eventLoop = new NioEventLoop(null, executor, selectorProvider, selectStrategyFactory.newSelectStrategy(), rejectedExecutionHandler); Selector selector = selector(eventLoop); System.out.println(selector.getClass()); ServerSocketChannel channel0 = null; try { channel0 = ServerSocketChannel.open(); System.out.println(channel0.getClass().getName()); } catch (IOException e) { e.printStackTrace(); } try { channel0.configureBlocking(false); } catch (IOException e) { e.printStackTrace(); } try { channel0.socket().bind(new InetSocketAddress(1239)); } catch (IOException e) { e.printStackTrace(); } // try { // SelectionKey key = channel0.register(selector, SelectionKey.OP_ACCEPT); // System.out.println(key.interestOps()); // } catch (ClosedChannelException e) { // e.printStackTrace(); // } eventLoop.register(channel0, SelectionKey.OP_ACCEPT, new SocketChannelHandler(eventLoop)); Runnable worker = new Runnable() { @Override public void run() { System.out.println("running..."); } }; eventLoop.execute(worker); synchronized (this) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
相关推荐
Netty服务器线程模型概览_线程模型
默认支持TCP, 可扩展的编解码器支持基于责任链模型零依赖文献资料例子 tcp_server redis_cli 上网样本 快速开始package mainimport ("fmt""strings""github....{// child pipeline initializervar childInitializer = ...
深入Hotspot源码与Linux内核理解NIO与Netty线程模型
study-netty netty 学习之路 项目笔记 项目说明 1、所谓BIO编程,就是使用JDK1.4之前的...这实际上就是最简化的reactor线程模型,实际上netty使用也是这种模型,只不过稍微复杂了一点点。 accpetor thread只负责与clie
Netty框架之异步事件驱动模型
timplus-smack-tcp-netty 使用Netty网络框架的TCP smack连接的实现。 该库的目的是用于在现有实现与TIM +服务提供者之间建立... 使用Netty asncy / non-blocking模型,可以大大减少线程,并更有效地使用系统资源。
Netty的线程模型,将IO操作和业务逻辑清晰地分离,使开发者可以专注于业务逻辑的开发。此外,Netty还提供了丰富的组件和工具,如Bootstrap、Channel、Selector、NioEventLoop等,让网络编程更加简单和高效。通过学习...
简介:基于netty异步库开发,one epoll per thread模型,性能强悍。支持随机请求、支持配置线程数、客户端个数、支持压力山大发送模式。支持返回结果验证。支持qps、延迟、最大连接数等统计。支持测试报告邮件自动...
说明网络线程模型这个工程主要是 用java nio 实现了netty框架的线程模型。依赖jdk1.5 以上测试下载代码git clone https://github.com/zhang-xiang-career/s-netty.git用IDE打开运行study.netty.Start.main() telnet ...
Netty核心精讲之Reactor线程模型源码分析 Netty核心精讲之Reactor线程模型源码分析
第一章:Netty介绍 第二章:第一个Netty程序 第三章:Netty核心概念 ...第十三章:通过UDP广播事件 第十四章:实现自定义的编码解码器 第十五章:选择正确的线程模型 第十六章:从EventLoop取消注册和重新注册
第72讲:Netty线程模型深度解读与架构设计原则 第73讲:Netty底层架构系统总结与应用实践 第74讲:Netty对于异步读写操作的架构思想与观察者模式的重要应用 第75讲:适配器模式与模板方法模式在入站处理器中的...
再根据本人实际学习体验总结而成。本部分内容可能不那么全面,但是我尽量挑选Netty中我认为比较重要的部分做讲解。
这是一本详细介绍了netty线程模型的书籍,包括 nio核心
05.Netty线程模型.rar
Netty 现在都在用的是4.x,5.x版本已经废弃,Netty 4.x 需要JDK 6以上版本支持Netty的使用场景:1)互联网行业:在分布式系统中,各个节
第一部分:现在开始 1. Netty 与Java NIO APIs ...13. 通过UDP协议广播事件 第四部分:高级应用 14. 实现一个自定义的编解码器 15. 选择适当的线程模型 16. 采用EventLoop实现注销与再次注册 17. 研究案例
NULL 博文链接:https://qq54903099.iteye.com/blog/1947916
在理论方面,讲解了Netty的逻辑架构模型和核心类库的设计原理,而且对Netty在大数据、互联网、游戏行业的应用进行了深入分析;实战方面,从第一个Netty入门程序到私有协议栈的设计和开发,通过实际例程,由浅入深地...
在理论方面,讲解了Netty的逻辑架构模型和核心类库的设计原理,而且对Netty在大数据、互联网、游戏行业的应用进行了深入分析;实战方面,从第一个Netty入门程序到私有协议栈的设计和开发,通过实际例程,由浅入深地...