`

Netty源码学习-ChannelHandler

阅读更多


一般来说,“有状态”的ChannelHandler不应该是“共享”的,“无状态”的ChannelHandler则可“共享”
例如ObjectEncoder是“共享”的, 但 ObjectDecoder 不是
因为每一次调用decode方法时,可能数据未接收完全(incomplete),
它与上一次decode时接收到的数据“累计”起来才有可能是完整的数据,是“有状态”的

public class SomePipelineFactory implements ChannelPipelineFactory {

	//共享
	private final StringEncoder stringEncoder = new StringEncoder(CharsetUtil.UTF_8);
	
	//定义成static的也可以
	//private static final StringEncoder stringEncoder = new StringEncoder(CharsetUtil.UTF_8);
	
    @Override
    public ChannelPipeline getPipeline() throws Exception {
        ChannelPipeline pipeline = Channels.pipeline();
		pipeline.addLast("stringEncoder", stringEncoder);
		
		//不共享
        pipeline.addLast("handler", new SomeHandler());
        return pipeline;
    }
}


因此,每一次ChannelPipelineFactory.getPipeline被调用时,
如果ChannelHandler是“不共享”的,则会新创建一个ChannelHandler的实例

ChannelPipelineFactory.getPipeline在以下方法中调用:


//NioServerSocketPipelineSink.Boss.registerAcceptedChannel
//SocketChannel(创建了pipeline):
private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {

				//调用PipelineFactory.getPipeline
                ChannelPipeline pipeline =
                    channel.getConfig().getPipelineFactory().getPipeline();
				NioWorker worker = nextWorker();
                worker.register(new NioAcceptedSocketChannel(
                        channel.getFactory(), pipeline, channel,
                        NioServerSocketPipelineSink.this, acceptedSocket,
                        worker, currentThread), null);

}
	
//ServerBootstrap.bind	
//ServerSocketChannel(创建了pipeline):
public Channel bind(final SocketAddress localAddress) {

        ChannelPipeline bossPipeline = pipeline();
        bossPipeline.addLast("binder", binder);
        Channel channel = getFactory().newChannel(bossPipeline);
}



可见:
1.ServerSocketChannel与SocketChannel的pipeline实例不是同一个
每一个Channel都创建一个pipeline
2.ServerSocketChannel只有一个(如果同时绑定两个端口,例如同时提供80和443服务,则有两个)
与之关联的pipeline也只有一个,这pipeline里面的ChannelHandler只有“binder”(通常情况下)
3.SocketChannel会有多个:
socketChannelA--pipelineA(sharedHandler,newHandlerA0, newHandlerB0)
socketChannelB--pipelineB(sharedHandler,newHandlerA1, newHandlerB2)

socketChannelA和socketChannelB可能交由不同的workerThread来处理

因此如果ChannelHandler是“有状态”且“共享”,那就要注意线程同步的问题:

public class SharedHandler extends SimpleChannelUpstreamHandler  {
    
    private volatile int someState;
	//...
}


但查看org.jboss.netty.example.proxy.HexDumpProxyInboundHandler:
public class HexDumpProxyInboundHandler extends SimpleChannelUpstreamHandler {
    private volatile Channel outboundChannel;
	//...
}

public ChannelPipeline getPipeline() throws Exception {
        ChannelPipeline p = pipeline(); // Note the static import.
        p.addLast("handler", new HexDumpProxyInboundHandler(cf, remoteHost, remotePort));
        return p;
    }


不是很理解,照理说,HexDumpProxyInboundHandler是“不共享”的,应该不存在
线程同步的问题,但为什么outboundChannel要定义成“volatile”呢?
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics