- 浏览: 950829 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
netty Inboudn/Outbound通道Invoker:http://donald-draper.iteye.com/blog/2388233
netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
netty 通道Outbound缓冲区:http://donald-draper.iteye.com/blog/2393098
引言:
前一篇文章我们看了通道Outbound缓存区ChannelOutboundBuffer ,先来回顾一下:
通道Outbound缓存区内部关联一个通道,同时有一个线程本地buf数组,一个未刷新的buf链表和一个刷新buf链表。通道写消息时,消息将会被包装成写请求Entry。
添加消息到通道Outbound缓冲区,首先包装消息为写请求Entry,将写请求Entry添加到未刷新写请求链表上,并更新通道当前待发送的字节数据,如果通道待发送的字节数大于通道写bufsize,则更新通道写状态,并触发ChannelWritabilityChanged事件。触发事件实际操作委托给通道的Channel管道。
添加刷新操作,即遍历未刷新写请求链表,将写请求添加到刷新链表中,如果写请求取消,则更新通道待发送字节数,如果待发送字节数消息,小于通道配置的写buf size,则更新通道可写状态。
移除操作,主要是从刷新写请求链移除链头写请求,并则释放写请求消息,更新写请求任务结果,当前通道待发送字节数和可写状态,并触发相应的事件
从刷新写请求链表,移除writtenBytes个字节数方法removeBytes,自旋,直至从刷新链中移除writtenBytes个字节数,如果链头消息的可读字节数小于writtenBytes,则移除写请求Entry,否则更新writtenBytes,继续从刷新链中的写请求消息中移除writtenBytes个字节数。
将刷新链上的写请求消息,添加到nio buffer数组中方法nioBuffers,主要是将刷新链上的写请求消息包装成direct buf添加到通道Outbound缓存区的nio buf数组中,这个方法主要在NioSocketChannel#doWrite方法重用。方法调用后,#nioBufferCount和#nioBufferSize,将返回当前nio buf数组的长度和可读字节数。
看完了抽象Unsafe定义和通道Outbound缓冲区,今天我们回到抽象通道,接着抽象通道初始化,继续看抽象通道的其他方法
先把抽象通道的变量声明贴出来及构造,以便理解:
上面抽象通道的变量和构造有单独的文章讲述,这里不再说:
来看其他方法
在上面我们看到好多方法时委托给通道的Channel管道,这是因为:
即通道实际上是一个ChannelOutboundInvoker,当通道相关操作发生时,触发通道相关操作事件。
从上面可以看出,通道的绑定操作、连接,写消息,读操作,刷新操作,反注册、断开连接,关闭通道等操作事件实际调用通道的Channel管道的相关方法,即触发通道相关事件,这些方法是重写了通道OutboundInvoker的相关方法。在抽象Unsafe那篇文章中,我们看到其内部也有绑定、注册,读操作,写操作和关闭操作,这些是通道的实际操作方法。
总结:
通道的绑定操作、连接,写消息,读操作,刷新操作,反注册、断开连接,关闭通道等操作事件实际调用通道的Channel管道的相关方法,即触发通道相关事件,这些方法是重写了通道OutboundInvoker的相关方法。在抽象Unsafe那篇文章中,我们看到其内部也有绑定、注册,读操作,写操作和关闭操作,这些是通道的实际操作方法。
netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
netty 通道Outbound缓冲区:http://donald-draper.iteye.com/blog/2393098
引言:
前一篇文章我们看了通道Outbound缓存区ChannelOutboundBuffer ,先来回顾一下:
通道Outbound缓存区内部关联一个通道,同时有一个线程本地buf数组,一个未刷新的buf链表和一个刷新buf链表。通道写消息时,消息将会被包装成写请求Entry。
添加消息到通道Outbound缓冲区,首先包装消息为写请求Entry,将写请求Entry添加到未刷新写请求链表上,并更新通道当前待发送的字节数据,如果通道待发送的字节数大于通道写bufsize,则更新通道写状态,并触发ChannelWritabilityChanged事件。触发事件实际操作委托给通道的Channel管道。
添加刷新操作,即遍历未刷新写请求链表,将写请求添加到刷新链表中,如果写请求取消,则更新通道待发送字节数,如果待发送字节数消息,小于通道配置的写buf size,则更新通道可写状态。
移除操作,主要是从刷新写请求链移除链头写请求,并则释放写请求消息,更新写请求任务结果,当前通道待发送字节数和可写状态,并触发相应的事件
从刷新写请求链表,移除writtenBytes个字节数方法removeBytes,自旋,直至从刷新链中移除writtenBytes个字节数,如果链头消息的可读字节数小于writtenBytes,则移除写请求Entry,否则更新writtenBytes,继续从刷新链中的写请求消息中移除writtenBytes个字节数。
将刷新链上的写请求消息,添加到nio buffer数组中方法nioBuffers,主要是将刷新链上的写请求消息包装成direct buf添加到通道Outbound缓存区的nio buf数组中,这个方法主要在NioSocketChannel#doWrite方法重用。方法调用后,#nioBufferCount和#nioBufferSize,将返回当前nio buf数组的长度和可读字节数。
看完了抽象Unsafe定义和通道Outbound缓冲区,今天我们回到抽象通道,接着抽象通道初始化,继续看抽象通道的其他方法
先把抽象通道的变量声明贴出来及构造,以便理解:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class); private static final ClosedChannelException FLUSH0_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace( new ClosedChannelException(), AbstractUnsafe.class, "flush0()");//flush0方法调用时,通道关闭异常 private static final ClosedChannelException ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace( new ClosedChannelException(), AbstractUnsafe.class, "ensureOpen(...)");//确保通道打开方法调用时,通道关闭异常 private static final ClosedChannelException CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace( new ClosedChannelException(), AbstractUnsafe.class, "close(...)");//close方法调用时,通道关闭异常 private static final ClosedChannelException WRITE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace( new ClosedChannelException(), AbstractUnsafe.class, "write(...)");//write方法调用时,通道关闭异常 private static final NotYetConnectedException FLUSH0_NOT_YET_CONNECTED_EXCEPTION = ThrowableUtil.unknownStackTrace( new NotYetConnectedException(), AbstractUnsafe.class, "flush0()");//flush0方法调用时,通道还未连接异常 private final Channel parent;//所属通道 private final ChannelId id;//通道id private final Unsafe unsafe;//硬件底层操作类 private final DefaultChannelPipeline pipeline;//Channel管道 private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);//空异步任务 private final CloseFuture closeFuture = new CloseFuture(this);//异步关闭任务 private volatile SocketAddress localAddress;//本地socket地址 private volatile SocketAddress remoteAddress;//远端socket地址 private volatile EventLoop eventLoop;//通道注册的事件循环 private volatile boolean registered;//是否注册 /** Cache for the string representation of this channel */ private boolean strValActive; private String strVal; ... /** * Creates a new instance. * * @param parent * the parent of this channel. {@code null} if there's no parent. */ protected AbstractChannel(Channel parent) { this.parent = parent; //创建通道id id = newId(); //创建底层操作类unsafe unsafe = newUnsafe(); //新建Channel管道 pipeline = newChannelPipeline(); } /** * Creates a new instance. * * @param parent * the parent of this channel. {@code null} if there's no parent. */ protected AbstractChannel(Channel parent, ChannelId id) { this.parent = parent; this.id = id; unsafe = newUnsafe(); pipeline = newChannelPipeline(); } }
上面抽象通道的变量和构造有单独的文章讲述,这里不再说:
来看其他方法
//判断通道是否可写 @Override public boolean isWritable() { //如果unsafe关联的通道Outbound 缓冲区不为空,且可写返回true ChannelOutboundBuffer buf = unsafe.outboundBuffer(); return buf != null && buf.isWritable(); } //直到通道可写前,通道Outbound 缓冲区的字节数。如果通道不可写,则返回0 @Override public long bytesBeforeUnwritable() { ChannelOutboundBuffer buf = unsafe.outboundBuffer(); // isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable. // We should be consistent with that here. return buf != null ? buf.bytesBeforeUnwritable() : 0; } //获取直到通道可写,通道底层buf有多少字节数据需要发送。如果可写返回0 @Override public long bytesBeforeWritable() { ChannelOutboundBuffer buf = unsafe.outboundBuffer(); // isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable. // We should be consistent with that here. return buf != null ? buf.bytesBeforeWritable() : Long.MAX_VALUE; } //获取所属通道 @Override public Channel parent() { return parent; } //获取通道内部Channel管道 @Override public ChannelPipeline pipeline() { return pipeline; } //获取通道字节buf分配器 @Override public ByteBufAllocator alloc() { return config().getAllocator(); } //获取通道所在的事件循环 @Override public EventLoop eventLoop() { EventLoop eventLoop = this.eventLoop; if (eventLoop == null) { throw new IllegalStateException("channel not registered to an event loop"); } return eventLoop; } //获取通道本地地址 @Override public SocketAddress localAddress() { SocketAddress localAddress = this.localAddress; if (localAddress == null) { try { this.localAddress = localAddress = unsafe().localAddress(); } catch (Throwable t) { // Sometimes fails on a closed socket in Windows. return null; } } return localAddress; } /** * @deprecated no use-case for this. */ @Deprecated protected void invalidateLocalAddress() { localAddress = null; } //获取远端socket地址 @Override public SocketAddress remoteAddress() { SocketAddress remoteAddress = this.remoteAddress; if (remoteAddress == null) { try { this.remoteAddress = remoteAddress = unsafe().remoteAddress(); } catch (Throwable t) { // Sometimes fails on a closed socket in Windows. return null; } } return remoteAddress; } /** * @deprecated no use-case for this. */ @Deprecated protected void invalidateRemoteAddress() { remoteAddress = null; } //判断通道是否注册到事件循环 @Override public boolean isRegistered() { return registered; } //绑定本地socket地址 @Override public ChannelFuture bind(SocketAddress localAddress) { return pipeline.bind(localAddress); } //连接远端socket地址 @Override public ChannelFuture connect(SocketAddress remoteAddress) { return pipeline.connect(remoteAddress); } //上面两个方法的合体 @Override public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { return pipeline.connect(remoteAddress, localAddress); } //断开通道连接 @Override public ChannelFuture disconnect() { return pipeline.disconnect(); } //关闭通道 @Override public ChannelFuture close() { return pipeline.close(); } //从事件循环反注册 @Override public ChannelFuture deregister() { return pipeline.deregister(); } //刷新通道 @Override public Channel flush() { pipeline.flush(); return this; } 下面几个方法与上面不同是,带了一个异步任务结果,在操作完成时,通道任务结果 @Override public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); } @Override public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { return pipeline.connect(remoteAddress, promise); } @Override public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { return pipeline.connect(remoteAddress, localAddress, promise); } @Override public ChannelFuture disconnect(ChannelPromise promise) { return pipeline.disconnect(promise); } @Override public ChannelFuture close(ChannelPromise promise) { return pipeline.close(promise); } @Override public ChannelFuture deregister(ChannelPromise promise) { return pipeline.deregister(promise); } //通道读操作 @Override public Channel read() { pipeline.read(); return this; } //写消息 @Override public ChannelFuture write(Object msg) { return pipeline.write(msg); } @Override public ChannelFuture write(Object msg, ChannelPromise promise) { return pipeline.write(msg, promise); } //写并刷新消息 @Override public ChannelFuture writeAndFlush(Object msg) { return pipeline.writeAndFlush(msg); } @Override public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { return pipeline.writeAndFlush(msg, promise); } //创建异步可写任务 @Override public ChannelPromise newPromise() { return pipeline.newPromise(); } //创建异步可写进度任务 @Override public ChannelProgressivePromise newProgressivePromise() { return pipeline.newProgressivePromise(); } //创建已经成功的任务 @Override public ChannelFuture newSucceededFuture() { return pipeline.newSucceededFuture(); } //创建已经失败的任务 @Override public ChannelFuture newFailedFuture(Throwable cause) { return pipeline.newFailedFuture(cause); } //获取关闭异步任务 @Override public ChannelFuture closeFuture() { return closeFuture; } //获取通道unsafe @Override public Unsafe unsafe() { return unsafe; } //创建空异步任务 @Override public final ChannelPromise voidPromise() { return pipeline.voidPromise(); }
在上面我们看到好多方法时委托给通道的Channel管道,这是因为:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
即通道实际上是一个ChannelOutboundInvoker,当通道相关操作发生时,触发通道相关操作事件。
从上面可以看出,通道的绑定操作、连接,写消息,读操作,刷新操作,反注册、断开连接,关闭通道等操作事件实际调用通道的Channel管道的相关方法,即触发通道相关事件,这些方法是重写了通道OutboundInvoker的相关方法。在抽象Unsafe那篇文章中,我们看到其内部也有绑定、注册,读操作,写操作和关闭操作,这些是通道的实际操作方法。
总结:
通道的绑定操作、连接,写消息,读操作,刷新操作,反注册、断开连接,关闭通道等操作事件实际调用通道的Channel管道的相关方法,即触发通道相关事件,这些方法是重写了通道OutboundInvoker的相关方法。在抽象Unsafe那篇文章中,我们看到其内部也有绑定、注册,读操作,写操作和关闭操作,这些是通道的实际操作方法。
发表评论
-
netty NioSocketChannel解析
2017-09-29 12:50 1231netty 抽象BootStrap定义:http://dona ... -
netty Pooled字节buf分配器
2017-09-28 13:00 1970netty 字节buf定义:http://donald-dra ... -
netty Unpooled字节buf分配器
2017-09-26 22:01 2361netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1255netty 字节buf定义:http:// ... -
netty 复合buf概念
2017-09-25 22:31 1253netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1519netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1759netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1325netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2748netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置后续
2017-09-18 08:36 2095netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 1939netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1012netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1806netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1159netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1141netty 通道接口定义:http:/ ... -
netty 抽象nio通道解析
2017-09-14 17:23 890netty 通道接口定义:http://donald-drap ... -
netty 通道Outbound缓冲区
2017-09-13 14:31 2125netty 通道接口定义:http:/ ... -
netty 抽象Unsafe定义
2017-09-12 21:24 992netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1789netty 管道线定义-ChannelPipeline:htt ... -
netty 通道接口定义
2017-09-10 15:36 1791netty Inboudn/Outbound通道Invoker ...
相关推荐
学习netty源码,为后续rocketmq等学习打下基础
在公司做项目的时候发现用Netty进行TCP/IP通信的Netty客户端接收到的数据进制乱码,经过摸索,终于成功解决了这个鸡肋的问题
Netty基础,用于学习Netty,参考黑马程序员的netty教程
springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合...
netty-3.2.5.Final.jar netty包
Netty入门教程
Netty (netty-netty-4.0.56.Final.tar.gz)是一个 NIO 客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化和流线了网络编程,例如 TCP 和 UDP 套接字服务器。 “快速和简单”并...
它活跃和成长于用户社区,像大型公司 Facebook 和 Instagram 以及流行 开源项目如 Infinispan, HornetQ, Vert.x, Apache Cassandra 和 Elasticsearch 等,都利用其强大的对于网络抽象的核心代码。 Netty is a NIO ...
注:下载前请查看本人博客文章,看是否...里面包含模拟TCP客户端发送报文工具,硬件厂商提供的协议,服务端(springboot+netty)解析报文源码,源码里整合了redis,不需要可自行删除,如有需要客户端代码,可联系我。
netty-buffer-4.1.32.Final-sources.jar netty-buffer-4.1.32.Final.jar netty-build-22-sources.jar netty-build-22.jar netty-codec-4.1.32.Final-sources.jar netty-codec-4.1.32.Final.jar netty-codec-...
Netty (netty-netty-3.10.6.Final.tar.gz)是一个 NIO 客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化和流线了网络编程,例如 TCP 和 UDP 套接字服务器。 “快速和简单”并...
Netty 教程 Netty权威指南 507页完整版
Netty (netty-netty-5.0.0.Alpha2.tar.gz)是一个 NIO 客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化和流线了网络编程,例如 TCP 和 UDP 套接字服务器。 “快速和简单”并...
跟闪电侠学Netty:Netty即时聊天实战与底层原理-book-netty
Netty大纲-同步netty专栏
netty学习,netty进阶之路:实战,案列,进阶知识
netty代码demo
Netty进阶之路,跟着案例学Netty,李林峰大神新作。值得一读。
netty+4G DTU
Netty实战-Netty