`

netty3.6.2中写数据的过程,以及写数据写不出去后怎么处理

    博客分类:
  • java
阅读更多

        netty写数据的时候,会先放到一个缓存队列AbstractNioChannel.writeBufferQueue中,这个队列是WriteRequestQueue

public void eventSunk(
        ChannelPipeline pipeline, ChannelEvent e) throws Exception {
    if (e instanceof ChannelStateEvent) {
        ……
    } else if (e instanceof MessageEvent) {
        MessageEvent event = (MessageEvent) e;
        NioSocketChannel channel = (NioSocketChannel) event.getChannel();
        boolean offered = channel.writeBufferQueue.offer(event);//写到channel的writeBufferQueue
        assert offered;
        channel.worker.writeFromUserCode(channel);
    }
}

        WriteRequestQueue的offer方法中会根据缓存消息的总大小(字节数)判断是否超过了高水位线highWaterMark,如果第一次超过了超过高水位线,就会fireChannelInterestChanged;后边如果仍然一直往队列放数据,缓存的消息的大小持续超过高水位线的时候,不会再fireChannelInterestChanged。

public boolean offer(MessageEvent e) {
    boolean success = queue.offer(e);
    assert success;

    int messageSize = getMessageSize(e);
    int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
    int highWaterMark =  getConfig().getWriteBufferHighWaterMark();

    if (newWriteBufferSize >= highWaterMark) {
        if (newWriteBufferSize - messageSize < highWaterMark) {
            highWaterMarkCounter.incrementAndGet();
            if (!notifying.get()) {
                notifying.set(Boolean.TRUE);
                fireChannelInterestChanged(AbstractNioChannel.this);
                notifying.set(Boolean.FALSE);
            }
        }
    }
    return true;
}

        fireChannelInterestChanged这个会调到SimpleChannelUpstreamHandler.handleUpstream,触发SimpleChannelUpstreamHandler.channelInterestChanged,可以通过继承这个方法来自定义做些事情。高水位的值可以通过Bootstrap设置,最终会调到DefaultNioSocketChannelConfig.setOption。writeBufferHighWaterMark默认值为64K

public boolean setOption(String key, Object value) {
    if (super.setOption(key, value)) {
        return true;
    }
    if ("writeBufferHighWaterMark".equals(key)) {
        setWriteBufferHighWaterMark0(ConversionUtil.toInt(value));
    } else if ("writeBufferLowWaterMark".equals(key)) {
        setWriteBufferLowWaterMark0(ConversionUtil.toInt(value));
    } else if ("writeSpinCount".equals(key)) {
        setWriteSpinCount(ConversionUtil.toInt(value));
    } else if ("receiveBufferSizePredictorFactory".equals(key)) {
        setReceiveBufferSizePredictorFactory((ReceiveBufferSizePredictorFactory) value);
    } else if ("receiveBufferSizePredictor".equals(key)) {
        setReceiveBufferSizePredictor((ReceiveBufferSizePredictor) value);
    } else {
        return false;
    }
    return true;
}

        然后在write0的时候会从队列拉数据,拉数据的时候,如果发现本次拉的数据会导致缓存的数据大小(字节)从低水位writeBufferLowWaterMark之上,掉到了低水位之下,即跨过了低水位,会再次触发fireChannelInterestChanged事件。writeBufferLowWaterMark默认值为32K

public MessageEvent poll() {
    MessageEvent e = queue.poll();
    if (e != null) {
        int messageSize = getMessageSize(e);
        int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
        int lowWaterMark = getConfig().getWriteBufferLowWaterMark();


        if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
            if (newWriteBufferSize + messageSize >= lowWaterMark) {//本次拉取,是的缓存数据大小掉到了低水位之下
                highWaterMarkCounter.decrementAndGet();
                if (isConnected() && !notifying.get()) {
                    notifying.set(Boolean.TRUE);
                    fireChannelInterestChanged(AbstractNioChannel.this);
                    notifying.set(Boolean.FALSE);
                }
            }
        }
    }
    return e;
}

         超过高水位和低于低水位都会触发fireChannelInterestChanged,怎么区分呢?通过AbstractChannel. isWritable(),如果channel的interestOps里边有注册过OP_WRITE,则是不可写的,否则是可写的

public boolean isWritable() {
    return (getInterestOps() & OP_WRITE) == 0;
}
public int getInterestOps() {
    if (!isOpen()) {
        return Channel.OP_WRITE;
    }

    int interestOps = getRawInterestOps();
    int writeBufferSize = this.writeBufferSize.get();
    if (writeBufferSize != 0) {
        if (highWaterMarkCounter.get() > 0) {//还记得这个值,放数据到发送队列的时候值+=1,从队列拉数据出来的时候值-=1
            int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
            if (writeBufferSize >= lowWaterMark) {//缓存队列数据量,超过高水位,也超过了低水位,意味着高水位>低水位,此时等于注册写操作
                interestOps |= Channel.OP_WRITE;
            } else {
                interestOps &= ~Channel.OP_WRITE;//缓存队列数据量,超过高水位但是低于低水位,意味着低水位>高水位,此时等于没有注册写操作
            }
        } else {//超过高水位counter<=0,意味着当前数据量小于高水位
            int highWaterMark = getConfig().getWriteBufferHighWaterMark();
            if (writeBufferSize >= highWaterMark) {//这里,缓存数据量仍然高于高水位.....并发?按道理说channel的处理是单线程处理的,此时等于注册写操作
                interestOps |= Channel.OP_WRITE;
            } else {
                interestOps &= ~Channel.OP_WRITE;
            }
        }
    } else {
        interestOps &= ~Channel.OP_WRITE;//写队列没数据,没有注册写操作
    }

    return interestOps;
}

       即,如果超过高水位isWritable()==false,低于低水位isWritable()==true,低水位优先级高于高水位,即如果 当前水位>低水位 则不可写,否则可写

        如果在通过netty向某机器写数据,但是写很缓慢,则会导致数据都缓存到netty的发送队列中,如果不做控制,可能会导致full gc/cms gc频繁,甚至最终OOM。所以可以考虑用高水位和低水位的值来控制netty的缓存队列,即用AbstractChannel.isWritable来控制是否继续写,如果AbstractChannel.isWritable==false,则丢弃数据,或者记录发送数据的状态,待后续缓存数据队列水位下降到安全水位后再发送。

  • 大小: 52.3 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics