`
xklin04
  • 浏览: 12527 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

初识Netty3.6

阅读更多
今天开始接触Netty这个NIO框架,由与MINA同一作者创作的框架,在看过User Guide 3.6之后,API风格与使用方式与MINA2大致类似。作为学习新框架的起始,先来几个Hello World程序吧。
看API依然是基于Reactor模型,程序分三部分
1.事件处理程序
主要包括对具体业务消息的处理,对各种连接状态的响应

  
    public class TimeServerHandler extends SimpleChannelHandler {
   
        @Override
       public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
            Channel ch = e.getChannel();
           
            ChannelBuffer time = ChannelBuffers.buffer(4);
          time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
            
          ChannelFuture f = ch.write(time);
            
          f.addListener(new ChannelFutureListener() {
                public void operationComplete(ChannelFuture future) {
                  Channel ch = future.getChannel();
                    ch.close();
             }
            });
      }
    
      @Override
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
          e.getCause().printStackTrace();
            e.getChannel().close();
      }
    }

2.编解码器
在大流量数据传输时的必备品,在流传输的基础根据业务情况分隔出数据包处理,ChannelBuffer转成POJO比较适合在解码器中进行。
package org.jboss.netty.example.time;
   
    public class TimeDecoder extends FrameDecoder(22) {
   
        @Override
       protected Object decode(
                ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) {
               
            if (buffer.readableBytes() < 4) {//如果不够数据,返回空对象,不处理
              return null; (24)
            }
          
            return buffer.readBytes(4);(25)
      }
    }


解码器不需要递归调用直到数据解析完毕,框架会自动调动直到把已接收的数据处理完毕。
另一个非常方便的解码器是ReplayingDecoder,它假设在解码时已经有足够的数据,如:
public class IntegerHeaderFrameDecoder
      extends ReplayingDecoder<VoidEnum> {

   protected Object decode(ChannelHandlerContext ctx,
                           Channel channel,
                           ChannelBuffer buf,
                           VoidEnum state) throws Exception {

     return buf.readBytes(buf.readInt());
   }
 }




编码器使用如下
public class TimeEncoder extends SimpleChannelHandler {
   
        public void writeRequested(ChannelHandlerContext ctx, MessageEvent(27) e) {
           UnixTime time = (UnixTime) e.getMessage();
            
          ChannelBuffer buf = buffer(4);
            buf.writeInt(time.getValue());
          
            Channels.write(ctx, e.getFuture(), buf);(28)
      }
    }


3.主体程序
服务器端:
public class DiscardServer {
    
      public static void main(String[] args) throws Exception {
            ChannelFactory factory =
              new NioServerSocketChannelFactory(
                        Executors.newCachedThreadPool(),
                      Executors.newCachedThreadPool());
    
          ServerBootstrap bootstrap = new ServerBootstrap(factory);
    
          bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
                public ChannelPipeline getPipeline() {
                  return Channels.pipeline(new TimeDecoder(),new DiscardServerHandler());//此处注意是否有顺序要求
                }
          });
    
          bootstrap.setOption("child.tcpNoDelay", true);
            bootstrap.setOption("child.keepAlive", true);
  
            bootstrap.bind(new InetSocketAddress(8080));
      }
    }


服务器资源释放包括:释放socket相关资源、释放ChannelFactory占用的资源
Netty提供了ChannelGroup来管理所有活动的连接,如果连接关闭则自动从集合中删除。
 
 public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) {
        TimeServer.allChannels.add(e.getChannel());(34)
   }
public class TimeServer {
   
        static final ChannelGroup allChannels = new DefaultChannelGroup("time-server"(35));
   
        public static void main(String[] args) throws Exception {
           ...
            ChannelFactory factory = ...;
          ServerBootstrap bootstrap = ...;
            ...
          Channel channel(36) = bootstrap.bind(...);
            allChannels.add(channel);(37)
          waitForShutdownCommand();(38)
            ChannelGroupFuture future = allChannels.close();(39)
          future.awaitUninterruptibly();
            factory.releaseExternalResources();
      }
    }



客户端:
 public class TimeClient {
    
       public static void main(String[] args) throws Exception {
            String host = args[0];
          int port = Integer.parseInt(args[1]);
    
          ChannelFactory factory =
                new NioClientSocketChannelFactory(
                      Executors.newCachedThreadPool(),
                        Executors.newCachedThreadPool());
  
            ClientBootstrap bootstrap = new ClientBootstrap(16)(factory);
  
            bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
              public ChannelPipeline getPipeline() {
                    return Channels.pipeline(new TimeDecoder(),new TimeClientHandler());//此处注意是否有顺序要求
              }
            });
          
            bootstrap.setOption("tcpNoDelay"(17), true);
          bootstrap.setOption("keepAlive", true);
    
         ChannelFuture future= bootstrap.connect(18)(new InetSocketAddress(host, port));

future.awaitUninterruptibly();(30)
            if (!future.isSuccess()) {
              future.getCause().printStackTrace();(31)
            }
          future.getChannel().getCloseFuture().awaitUninterruptibly();(32)
            factory.releaseExternalResources();(33)
      }

        }
  }


参考文献:http://netty.io/3.6/guide/
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics