`
ahua186186
  • 浏览: 554067 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

netty 3.10.4.Final 源码学习总结

 
阅读更多
提示:个人理解并且只针对NIO模式


总的来说,包括1个“核心接口”和2个“核心接口帮助类”:

(1)ChannelFactory(NioClientSocketChannelFactory和 NioServerSocketChannelFactory)

作用:初始化boss,work线程

(2)Channels,DefaultChannelPipeline

作用:统一发送底层IO thread的 ChannelEvent事件给handler(上行+下行)或ChannelSink(下行),通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果

(3)ChannelSink(NioClientSocketChannelSlink和NioServerSocketChannelSlink)负责初始化客户端ClientBootstrap 的connect操作和服务端ServertBootstrap 的bind操作,以及下行通道消息的write。

细节:

1. 服务端读数据:采用轮询channel的方式读取数据。

关键代码:

  while ((ret = ch.read(bb)) > 0) { 
                readBytes += ret; 
                if (!bb.hasRemaining()) { 
                    break; 
                } 
            } 


@Override
    protected boolean read(SelectionKey k) {
        final SocketChannel ch = (SocketChannel) k.channel();
        final NioSocketChannel channel = (NioSocketChannel) k.attachment();

        final ReceiveBufferSizePredictor predictor =
            channel.getConfig().getReceiveBufferSizePredictor();
        final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
        final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();

        int ret = 0;
        int readBytes = 0;
        boolean failure = true;

        ByteBuffer bb = recvBufferPool.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());
        try {
            while ((ret = ch.read(bb)) > 0) {
                readBytes += ret;
                if (!bb.hasRemaining()) {
                    break;
                }
            }
            failure = false;
        } catch (ClosedChannelException e) {
            // Can happen, and does not need a user attention.
        } catch (Throwable t) {
            fireExceptionCaught(channel, t);
        }

        if (readBytes > 0) {
            bb.flip();

            final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
            buffer.setBytes(0, bb);
            buffer.writerIndex(readBytes);

            // Update the predictor.
            predictor.previousReceiveBufferSize(readBytes);

            // Fire the event.
            fireMessageReceived(channel, buffer);
        }

        if (ret < 0 || failure) {
            k.cancel(); // Some JDK implementations run into an infinite loop without this.
            close(channel, succeededFuture(channel));
            return false;
        }

        return true;
    }




2.服务端写数据:先保存数据到缓存队列,然后同步写入数据到计算机内核:

关键代码:

(1)synchronized (channel.writeLock)

(2)localWrittenBytes = buf.transferTo(ch);


 protected void write0(AbstractNioChannel<?> channel) {
        boolean open = true;
        boolean addOpWrite = false;
        boolean removeOpWrite = false;
        boolean iothread = isIoThread(channel);

        long writtenBytes = 0;

        final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
        final WritableByteChannel ch = channel.channel;
        final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
        final int writeSpinCount = channel.getConfig().getWriteSpinCount();
        List<Throwable> causes = null;

        synchronized (channel.writeLock) {
            channel.inWriteNowLoop = true;
            for (;;) {

                MessageEvent evt = channel.currentWriteEvent;
                SendBuffer buf = null;
                ChannelFuture future = null;
                try {
                    if (evt == null) {
                        if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
                            removeOpWrite = true;
                            channel.writeSuspended = false;
                            break;
                        }
                        future = evt.getFuture();

                        channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
                    } else {
                        future = evt.getFuture();
                        buf = channel.currentWriteBuffer;
                    }

                    long localWrittenBytes = 0;
                    for (int i = writeSpinCount; i > 0; i --) {
                        localWrittenBytes = buf.transferTo(ch);
                        if (localWrittenBytes != 0) {
                            writtenBytes += localWrittenBytes;
                            break;
                        }
                        if (buf.finished()) {
                            break;
                        }
                    }

                    if (buf.finished()) {
                        // Successful write - proceed to the next message.
                        buf.release();
                        channel.currentWriteEvent = null;
                        channel.currentWriteBuffer = null;
                        // Mark the event object for garbage collection.
                        //noinspection UnusedAssignment
                        evt = null;
                        buf = null;
                        future.setSuccess();
                    } else {
                        // Not written fully - perhaps the kernel buffer is full.
                        addOpWrite = true;
                        channel.writeSuspended = true;

                        if (writtenBytes > 0) {
                            // Notify progress listeners if necessary.
                            future.setProgress(
                                    localWrittenBytes,
                                    buf.writtenBytes(), buf.totalBytes());
                        }
                        break;
                    }
                } catch (AsynchronousCloseException e) {
                    // Doesn't need a user attention - ignore.
                } catch (Throwable t) {
                    if (buf != null) {
                        buf.release();
                    }
                    channel.currentWriteEvent = null;
                    channel.currentWriteBuffer = null;
                    // Mark the event object for garbage collection.
                    //noinspection UnusedAssignment
                    buf = null;
                    //noinspection UnusedAssignment
                    evt = null;
                    if (future != null) {
                        future.setFailure(t);
                    }
                    if (iothread) {
                        if (causes == null) {
                            causes = new ArrayList<Throwable>(1);
                        }
                        causes.add(t);
                    } else {
                        fireExceptionCaughtLater(channel, t);
                    }
                    if (t instanceof IOException) {
                        open = false;
                    }
                }
            }
            channel.inWriteNowLoop = false;
            if (open) {
                if (addOpWrite) {
                    setOpWrite(channel);
                } else if (removeOpWrite) {
                    clearOpWrite(channel);
                }
            }
        }
        if (causes != null) {
            for (Throwable cause: causes) {
                // notify about cause now as it was triggered in the write loop
                fireExceptionCaught(channel, cause);
            }
        }
        if (!open) {
            // close the channel now
            close(channel, succeededFuture(channel));
        }
        if (iothread) {
            fireWriteComplete(channel, writtenBytes);
        } else {
            fireWriteCompleteLater(channel, writtenBytes);
        }
    }


3. 服务端Handlers都是共享,读的时候没有并发问题,因为都是在IO线程中轮询读取数据,直到写入业务线程的队列中才返回,但是写的时候存在并发问题,因为写操作由业务线程执行,通常业务会使用线程池并行处理业务消息,这就意味着在某一个时刻会有多个业务线程同时操作ChannelHandler,我们需要对ChannelHandler进行并发保护,通常需要加锁。如果同步块的范围不当,可能会导致严重的性能瓶颈,这对开发者的技能要求非常高,降低了开发效率;

4. netty 3.10 写操作的并发问题指的是ChannelHandler的成员变量,因为:
{局部变量的数据存在于栈内存中。栈内存中的局部变量随着方法的消失而消失。
成员变量存储在堆中的对象里面,由垃圾回收器负责回收},我发现58同城那个RPC框架中httpHandler就存在这样的问题,估计他们都没有用到上传附件的功能所以一直没有修改这个bug。

5.引用李林峰的一句话:在Netty 3的时候,upstream是在I/O线程里执行的,而downstream是在业务线程里执行。当Netty从网络读取一个数据报投递给业务handler的时候,handler是在I/O线程里执行;而当我们在业务线程中调用write和writeAndFlush向网络发送消息的时候,handler是在业务线程里执行,直到最后一个Header handler将消息写入到发送队列中,业务线程才返回。


6. 业务线程只需要持有channel对象既可写入响应数据到channel的writeBufferQueue,这样在上层应用几乎只需要关注业务即可。
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics