`

Netty源码学习-ReadTimeoutHandler

阅读更多


ReadTimeoutHandler的实现思路:
开启一个定时任务,如果在指定时间内没有接收到消息,则抛出ReadTimeoutException
这个异常的捕获,在开发中,交给跟在ReadTimeoutHandler后面的ChannelHandler,例如

private final ChannelHandler timeoutHandler =
	new ReadTimeoutHandler(timer, READ_TIMEOUT);
private final ChannelHandler uptimeHandler =
	new UptimeClientHandler(bootstrap, timer);

public ChannelPipeline getPipeline() throws Exception {
	return Channels.pipeline(
			timeoutHandler, uptimeHandler);
}

public class UptimeClientHandler ...{
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
        Throwable cause = e.getCause();
        if (cause instanceof ReadTimeoutException) {
            // The connection was OK but there was no traffic for last period.
            println("Disconnecting due to no inbound traffic");
        } else {
            cause.printStackTrace();
        }
        ctx.getChannel().close();
    }
}


ReadTimeoutHandler的关键源码:
	
	//ChannelOpen时启动定时任务:
	public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
            throws Exception {
        initialize(ctx);
        ctx.sendUpstream(e);
    }
    private void initialize(ChannelHandlerContext ctx) {
        State state = state(ctx);
        state.timeout = timer.newTimeout(new ReadTimeoutTask(ctx), timeoutMillis, TimeUnit.MILLISECONDS);
    }

	
	//每次接收到消息时更新lastReadTime
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
            throws Exception {
        State state = (State) ctx.getAttachment();
        state.lastReadTime = System.currentTimeMillis();
        ctx.sendUpstream(e);
    }
	
	/*定时任务:判断指定时间内是否有消息到达
		举例:
		假设超时时间设为30秒,初始的lastReadTime=10:00:00
		那么,定时任务在10:00:30时执行run方法,如果:
		1.在10:00:18有消息到达(lastReadTime更新为10:00:18),则表示没有超时,
		继续监听下一个30秒,也就是定时任务需要在10:00:48再跑一次
		因此下一次定时任务的执行距离现在是:nextDelay=30-(30-18)=18(秒)
		2.没有消息到达,超时,抛异常
	*/
	private final class ReadTimeoutTask implements TimerTask {
        public void run(Timeout timeout) throws Exception {
            State state = (State) ctx.getAttachment();
            long currentTime = System.currentTimeMillis();
            long nextDelay = timeoutMillis - (currentTime - state.lastReadTime);
            if (nextDelay <= 0) {
                // Read timed out - set a new timeout and notify the callback.
                state.timeout =
                    timer.newTimeout(this, timeoutMillis, TimeUnit.MILLISECONDS);
                try {
                    // FIXME This should be called from an I/O thread.
                    //       To be fixed in Netty 4.
                    readTimedOut(ctx);
                } catch (Throwable t) {
                    fireExceptionCaught(ctx, t);
                }
            } else {
                // Read occurred before the timeout - set a new timeout with shorter delay.
                state.timeout =
                    timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS);
            }
        }
    }
	
	//为什么这里会调用initialize方法?分析在下面
	public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
        if (ctx.getPipeline().isAttached()) {
            // channelOpen event has been fired already, which means
            // this.channelOpen() will not be invoked.
            // We have to initialize here instead.
            initialize(ctx);
        } else {
            // channelOpen event has not been fired yet.
            // this.channelOpen() will be invoked and initialization will occur there.
        }
    }
	
	


上面的beforeAdd方法不太好理解
先看看ClientBootstrap的connect方法:
	public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress) {

        ChannelPipeline pipeline;
        try {
		
			//这里调用ChannelPipeline.addLast,在真正往链表里面插入之前,调用beforeAdd
            pipeline = getPipelineFactory().getPipeline();
        } catch (Exception e) {
            throw new ChannelPipelineException("Failed to initialize a pipeline.", e);
        }
		
		//创建一个代表Client的SocketChannel,SocketChannel的构造函数里会调用:
		//        pipeline.attach(this, sink);
		//然后会fireChannelOpen
        Channel ch = getFactory().newChannel(pipeline);
		
		//...
	}
	


从正常的流程来说,是先创建创建pipeline再创建channel,
也就是beforeAdd会在channel创建之前调用,那么beforeAdd里面的判断:
if (ctx.getPipeline().isAttached()) 就不会返回true(因为此时channel还未创建,更不可能与pipeline关联了)
这样看来,只需要在channelOpen中调用initialize就可以了?
不是的,
因为还有一种情况:动态添加ChannelHandler
有可能channel已经创建(与pipeline关联了),且channelOpen已经执行过了,
那就需要在添加ReadTimeoutHandler时,执行initialize


0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics