netty写数据的时候,会先放到一个缓存队列AbstractNioChannel.writeBufferQueue中,这个队列是WriteRequestQueue
public void eventSunk( ChannelPipeline pipeline, ChannelEvent e) throws Exception { if (e instanceof ChannelStateEvent) { …… } else if (e instanceof MessageEvent) { MessageEvent event = (MessageEvent) e; NioSocketChannel channel = (NioSocketChannel) event.getChannel(); boolean offered = channel.writeBufferQueue.offer(event);//写到channel的writeBufferQueue assert offered; channel.worker.writeFromUserCode(channel); } }
WriteRequestQueue的offer方法中会根据缓存消息的总大小(字节数)判断是否超过了高水位线highWaterMark,如果第一次超过了超过高水位线,就会fireChannelInterestChanged;后边如果仍然一直往队列放数据,缓存的消息的大小持续超过高水位线的时候,不会再fireChannelInterestChanged。
public boolean offer(MessageEvent e) { boolean success = queue.offer(e); assert success; int messageSize = getMessageSize(e); int newWriteBufferSize = writeBufferSize.addAndGet(messageSize); int highWaterMark = getConfig().getWriteBufferHighWaterMark(); if (newWriteBufferSize >= highWaterMark) { if (newWriteBufferSize - messageSize < highWaterMark) { highWaterMarkCounter.incrementAndGet(); if (!notifying.get()) { notifying.set(Boolean.TRUE); fireChannelInterestChanged(AbstractNioChannel.this); notifying.set(Boolean.FALSE); } } } return true; }
fireChannelInterestChanged这个会调到SimpleChannelUpstreamHandler.handleUpstream,触发SimpleChannelUpstreamHandler.channelInterestChanged,可以通过继承这个方法来自定义做些事情。高水位的值可以通过Bootstrap设置,最终会调到DefaultNioSocketChannelConfig.setOption。writeBufferHighWaterMark默认值为64K
public boolean setOption(String key, Object value) { if (super.setOption(key, value)) { return true; } if ("writeBufferHighWaterMark".equals(key)) { setWriteBufferHighWaterMark0(ConversionUtil.toInt(value)); } else if ("writeBufferLowWaterMark".equals(key)) { setWriteBufferLowWaterMark0(ConversionUtil.toInt(value)); } else if ("writeSpinCount".equals(key)) { setWriteSpinCount(ConversionUtil.toInt(value)); } else if ("receiveBufferSizePredictorFactory".equals(key)) { setReceiveBufferSizePredictorFactory((ReceiveBufferSizePredictorFactory) value); } else if ("receiveBufferSizePredictor".equals(key)) { setReceiveBufferSizePredictor((ReceiveBufferSizePredictor) value); } else { return false; } return true; }
然后在write0的时候会从队列拉数据,拉数据的时候,如果发现本次拉的数据会导致缓存的数据大小(字节)从低水位writeBufferLowWaterMark之上,掉到了低水位之下,即跨过了低水位,会再次触发fireChannelInterestChanged事件。writeBufferLowWaterMark默认值为32K
public MessageEvent poll() { MessageEvent e = queue.poll(); if (e != null) { int messageSize = getMessageSize(e); int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize); int lowWaterMark = getConfig().getWriteBufferLowWaterMark(); if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) { if (newWriteBufferSize + messageSize >= lowWaterMark) {//本次拉取,是的缓存数据大小掉到了低水位之下 highWaterMarkCounter.decrementAndGet(); if (isConnected() && !notifying.get()) { notifying.set(Boolean.TRUE); fireChannelInterestChanged(AbstractNioChannel.this); notifying.set(Boolean.FALSE); } } } } return e; }
超过高水位和低于低水位都会触发fireChannelInterestChanged,怎么区分呢?通过AbstractChannel. isWritable(),如果channel的interestOps里边有注册过OP_WRITE,则是不可写的,否则是可写的
public boolean isWritable() { return (getInterestOps() & OP_WRITE) == 0; } public int getInterestOps() { if (!isOpen()) { return Channel.OP_WRITE; } int interestOps = getRawInterestOps(); int writeBufferSize = this.writeBufferSize.get(); if (writeBufferSize != 0) { if (highWaterMarkCounter.get() > 0) {//还记得这个值,放数据到发送队列的时候值+=1,从队列拉数据出来的时候值-=1 int lowWaterMark = getConfig().getWriteBufferLowWaterMark(); if (writeBufferSize >= lowWaterMark) {//缓存队列数据量,超过高水位,也超过了低水位,意味着高水位>低水位,此时等于注册写操作 interestOps |= Channel.OP_WRITE; } else { interestOps &= ~Channel.OP_WRITE;//缓存队列数据量,超过高水位但是低于低水位,意味着低水位>高水位,此时等于没有注册写操作 } } else {//超过高水位counter<=0,意味着当前数据量小于高水位 int highWaterMark = getConfig().getWriteBufferHighWaterMark(); if (writeBufferSize >= highWaterMark) {//这里,缓存数据量仍然高于高水位.....并发?按道理说channel的处理是单线程处理的,此时等于注册写操作 interestOps |= Channel.OP_WRITE; } else { interestOps &= ~Channel.OP_WRITE; } } } else { interestOps &= ~Channel.OP_WRITE;//写队列没数据,没有注册写操作 } return interestOps; }
即,如果超过高水位isWritable()==false,低于低水位isWritable()==true,低水位优先级高于高水位,即如果 当前水位>低水位 则不可写,否则可写
如果在通过netty向某机器写数据,但是写很缓慢,则会导致数据都缓存到netty的发送队列中,如果不做控制,可能会导致full gc/cms gc频繁,甚至最终OOM。所以可以考虑用高水位和低水位的值来控制netty的缓存队列,即用AbstractChannel.isWritable来控制是否继续写,如果AbstractChannel.isWritable==false,则丢弃数据,或者记录发送数据的状态,待后续缓存数据队列水位下降到安全水位后再发送。
相关推荐
Netty 3.6.2.Final 稳定版本 含源码
包含翻译后的API文档:netty-3.6.2.Final-javadoc-API文档-中文(简体)版.zip; Maven坐标:io.netty:netty:3.6.2.Final; 标签:netty、jar包、java、中文文档; 使用方法:解压翻译后的API文档,用浏览器打开...
包含翻译后的API文档:netty-3.6.2.Final-javadoc-API文档-中文(简体)-英语-对照版.zip; Maven坐标:io.netty:netty:3.6.2.Final; 标签:netty、jar包、java、API文档、中英对照版; 使用方法:解压翻译后的API...
java运行依赖jar包
这是一个java web项目集成了netty websocket的...初始化握手对象时指定了maxFramePayloadLength 的长度、以及通过配置netty内置解码器处理数据半包等方法,均无效。以下是终极解决办法,供大家参考和解决这样的问题。
Netty实现端口数据转发:3000进4000出
通过Netty4 获取串口数据并且下发数据到串口,是一个封装不错的框架
java实现基于netty 的utp字节数据接收服务,服务具体实现代码。样例java实现基于netty 的utp字节数据接收服务,服务具体实现代码。样例
Netty发送protoBuf格式数据 Netty发送protoBuf格式数据 Netty发送protoBuf格式数据 Netty发送protoBuf格式数据
netty服务器解析16进制数据
随着物联网的发展,随之出现了各种传感器监测数据的实时发送,需要和netty服务器通讯,netty和传感器之间需要保持长连接(换句话说,netty和gateway之间都会主动给对方发送消息) 碰到的问题: netty作为服务器端如何...
java netty接收串口数据 开启windows串口工具 发送串口数据调试助手
Netty同步等待数据返回实例代码
使用Netty进行网络通信,完成分布式的数据采集任务,可以采集500以上数据节点。
基于netty 的udp字节数据接 收服务,发送服务实例 基于netty 的udp字节数据接收服务,发送服务实例
RPC是一种远程调用的通信协议,例如dubbo、thrift等,我们在互联网高并发应用开发时候都会使用到类似的服务。本专题主要通过三个章节实现一个rpc...- 手写RPC框架第二章《netty通信》 - 手写RPC框架第三章《RPC中间件》
netty案例,netty4.1中级拓展篇十一《Netty基于ChunkedStream数据流切块传输》源码 ...
主要是自己写的大概的netty
netty案例,netty4.1中级拓展篇六《SpringBoot+Netty+Elasticsearch收集日志信息数据存储》源码 ...
netty 数据分包、组包、粘包处理机制(部分)1