`
wh0426
  • 浏览: 55083 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
博客专栏
Group-logo
架构师的知识与实践
浏览量:55072
社区版块
存档分类
最新评论

netty实现http api功能

阅读更多

无可致疑,netty是java的网络通讯框架,支持高并发。本文扫描使用netty完成简单的http的能力,不涉及安全,业务过滤等内容。

片段1

	/**
	 * 启动http服务器
	 * @throws InterruptedException
	 */
	private void runHttpServer(final EventProducer evtProducer) throws InterruptedException {
		
		// 配置TCP服务器.
		EventLoopGroup bossGroup = new NioEventLoopGroup(ServerBootOption.Parent_EventLoopGroup_ThreadNum);
		EventLoopGroup workerGroup = new NioEventLoopGroup(ServerBootOption.Child_EventLoopGroup_ThreadNum);
		try {

			ServerBootstrap b = new ServerBootstrap();
			b.group(bossGroup, workerGroup)
					.channel(NioServerSocketChannel.class)
					.option(ChannelOption.SO_BACKLOG, 10240)
					.option(ChannelOption.TCP_NODELAY, true)
					.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
					.option(ChannelOption.SO_REUSEADDR, true)
					.option(ChannelOption.SO_KEEPALIVE, true)
					.option(ChannelOption.SO_LINGER, 0)
					//.childOption(ChannelOption.SO_TIMEOUT, ServerBootOption.SO_TIMEOUT)
					// .handler(new LoggingHandler(LogLevel.INFO))
					.childHandler(new ChannelInitializer<SocketChannel>(){
						 @Override 
					 	 protected void initChannel(SocketChannel ch) throws Exception {
							 ChannelPipeline p = ch.pipeline();
							 p.addLast("idleStateHandler", new IdleStateHandler(60, 60, 30));<span style="color:#FF0000;">//读信道空闲60s,写信道空闲60s,读,写信道空闲30s</span>
							 p.addLast("http_server_codec", new HttpServerCodec());//<span style="color:#FF0000;">http消息转换</span>
						         p.addLast("http_server_handler",new HttpProxyServerHandler(manager,evtProducer));//<span style="color:#FF0000;">消息处理器</span>
						 }
			  
					});
			 
			// Start the tcp server.
			ChannelFuture f = b.bind(new InetSocketAddress(ip, port)).sync();//<span style="color:#FF0000;">启动http服务进程</span>
			logger.info("start http server ok");
			// Wait until the server socket is closed.
			f.channel().closeFuture().sync();
		} finally {
			// Shut down all event loops to terminate all threads.
			logger.info("Shut down all event loops to terminate all threads.");
                         bossGroup.shutdownGracefully();//关闭服务进程
			workerGroup.shutdownGracefully();//关闭服务进程
		}
	}

片段2HttpProxyServerHandler

 

 

/**
 * 
 * @author 
 * http 请求事件处理器,负责分发http请求事件
 */
public class HttpProxyServerHandler extends ChannelHandlerAdapter{
	private static final Logger logger = Logger.getLogger(HttpProxyServerHandler.class);
	private SessionContextManager manager;<span style="color:#FF0000;">//会话管理</span>
	private final AttributeKey<Long> timestamp = AttributeKey.valueOf("timestamp");
	private final StringBuilder buf = new StringBuilder();
	public HttpProxyServerHandler(SessionContextManager manager){
		this.manager=manager;
	}
	@Override
    public void handlerAdded(final ChannelHandlerContext ctx)
            throws Exception{
    	 logger.info("["+ctx.channel().id().asLongText()+" ] is Added ");
    }
	@Override <span style="color:#FF0000;">会话关闭,失效事件</span>
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		 manager.getSession().remove(ctx.channel().id().asLongText());
		 super.channelInactive(ctx);
	}
	@Override <span style="color:#FF0000;">读消息</span><span style="color:#FF0000;">事件,业务处理的入口</span>
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
    	logger.info("HttpProxyServerHandler[channelRead] ");
    	Attribute<Long> attr_timestamp = ctx.channel().attr(timestamp);
    	attr_timestamp.set(System.currentTimeMillis());//<span style="color:#FF0000;">测试信道中保存状态消息</span>,<span style="color:#FF0000;">如消息处理的开如时间</span>
        SessionContext sctx=new SessionContext();
    	sctx.setCtx(ctx);
    	sctx.setChannelId(ctx.channel().id());
    	sctx.setSessionActiveTime(""+System.currentTimeMillis());
    	manager.getSession().put(sctx.getChannelId().asLongText(), sctx);//<span style="color:#FF0000;">manager.getSession() 为并发Map结构,用于会话保持</span>
    	logger.info(sctx.getChannelId().asLongText()+" req time "+System.currentTimeMillis());
    	try {
    		dispatchMeesage(ctx,manager ,msg);
        } finally {
         	 ReferenceCountUtil.release(msg);
        }

    }
	private QueryStringDecoder qdecoder =   null;
	private String uri="";//http uri
	private String url="";//http url
	private Map<String,List<String>> httpReqParams=null;//http请求参数
	/**
	 * 消息分发
	 * @param ctx
	 * @param manager
	 * @param msg 消息
	 * @return
	 * @throws Exception
	 */
	private String  dispatchMeesage(final ChannelHandlerContext ctx,
			final SessionContextManager manager, final Object msg) 
			throws Exception{
		String decode_message = "";
		HttpResponseUtils util = new HttpResponseUtils();
		String result_code="1";
		
		if (msg instanceof HttpRequest) {// http请求头
			HttpRequest req = (HttpRequest) msg;
			url = req.getUri();
			if (url.equals("/favicon.ico")) {
	        	ctx.close();
	            return "0";
			}
			if(!url.equals("/billing")){
				ctx.close();
				return "0";
			}
			//if (is100ContinueExpected(req)) {
	           // ctx.write(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE));
	       // }
			qdecoder = new QueryStringDecoder(url);
			httpReqParams = qdecoder.parameters();
			uri = qdecoder.path();
			/* TODO:身份认证
			 * if(qdecoder.parameters().containsKey("crendial")){
			 * crendial=(String
			 * )qdecoder.parameters().get("crendial").get(0).toUpperCase(); }
			 */
			
		} else if (msg instanceof HttpContent) { // http请求体
			
			HttpContent httpContent = (HttpContent) msg;
			ByteBuf content = httpContent.content();
			if (content.isReadable()) {
				String chunk_message=content.toString(CharsetUtil.UTF_8);
		                 buf.append(chunk_message);
			}
			
		       if (!(msg instanceof LastHttpContent)) {//<span style="color:#FF0000;">不是最后一个chunk包块,此项非常 重要,当http分多片传输时,需要将多块内容合并</span>
	       		return "0";
	       	       }else{
	       		decode_message= buf.toString(); //
	       		logger.info(decode_message);
	       	      }
	         
			if (msg instanceof LastHttpContent) {
				//LastHttpContent trailer = (LastHttpContent) msg;
				
				String sessionId=ctx.channel().id().asLongText();
				System.out.println("请求"+decode_message);
				System.out.println("请求参数"+httpReqParams);
				System.out.println("请求地址"+uri);
				System.out.println("会话Id"+sessionId);
				//<span style="color:#FF0000;">TODO:模拟发送请求消息给后端处理,可以放入消息队列,ctx对象进入会话保持(Map对象)中,等消息队列中处理完成后,恢复会话,并完成消息应答。</span>
				
			}
			
		}
		return result_code;
	}
	private static AtomicInteger counter = new AtomicInteger(0);
	@Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		manager.getSession().remove(ctx.channel().id().asLongText());
        cause.printStackTrace();
        ctx.close();
        
    }
	@Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    	logger.error("HttpProxyServerHandler[userEventTriggered] ");
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            if (e.state() == IdleState.ALL_IDLE) {
            	logger.error("ALL_IDLE");

            } else if (e.state() == IdleState.READER_IDLE){
            	logger.error("READER_IDLE");
    
            }else if (e.state() == IdleState.WRITER_IDLE){
            	logger.error("WRITER_IDLE");
            }
        	
        	ctx.close();
        }else if(evt instanceof ErrorEvent){
        	logger.error(((ErrorEvent) evt).getErrorCode());
        	
        	ctx.close();
        }
        
    }
}


SessionContext

 

 

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
/**
 * http请求会话
 * @author wanghao
 *
 */
public class SessionContext {
	private ChannelId channelId =null ;/**会话 channel ID*/
	private ChannelHandlerContext ctx;/**会话*/
	private String sessionActiveTime;/**会话创建时间*/
	private String sessionUnActiveTime;/**会话失效时间*/
	
	public String getSessionActiveTime() {
		return sessionActiveTime;
	}
	public void setSessionActiveTime(String sessionActiveTime) {
		this.sessionActiveTime = sessionActiveTime;
	}
	public String getSessionUnActiveTime() {
		return sessionUnActiveTime;
	}
	public void setSessionunActiveTime(String sessionUnActiveTime) {
		this.sessionUnActiveTime = sessionUnActiveTime;
	}

	
	public ChannelHandlerContext getCtx() {
		return ctx;
	}
	public void setCtx(ChannelHandlerContext ctx) {
		this.ctx = ctx;
	}
	
	
	public ChannelId getChannelId() {
		return channelId;
	}
	public void setChannelId(ChannelId channelId) {
		this.channelId = channelId;
	}
	
}



 

http请求会话管理器

import java.util.Map;
/**
 * http请求会话管理器
 * @author 
 *
 */
public interface SessionContextManager {
	abstract Map<String,SessionContext> getSession();
}



 

 

分享到:
评论

相关推荐

    netty权威指南 第1版(李林峰) + 源码

    实战方面,从第一个Netty入门程序到私有协议栈的设计和开发,通过实际例程,由浅入深地对Netty的核心API和类库的功能和用法进行了细致讲解。本书适合架构师、设计师、软件开发工程师、测试人员和其他对Java NIO框架...

    Netty权威指南高清完整版PDF

    实战方面,从第一个Netty入门程序到私有协议栈的设计和开发,通过实际例程,由浅入深地对Netty的核心API和类库的功能和用法进行了细致讲解。 《Netty权威指南》适合架构师、设计师、软件开发工程师、测试人员和其他...

    Java + Netty 实现的高并发高可用MQTT服务broker,轻松支持10万并发(有群友实现了130万在线).zip

    Java + Netty 实现的高并发高可用MQTT服务broker,轻松支持10万并发(有群友实现了130万在线).zip 功能说明: 参考MQTT3.1.1规范实现 完整的QoS服务质量等级实现 遗嘱消息, 保留消息及消息分发重试 心跳机制 MQTT连接...

    Netty权威指南书中案例

    实战方面,从第一个Netty入门程序到私有协议栈的设计和开发,通过实际例程,由浅入深地对Netty的核心API和类库的功能和用法进行了细致讲解。本书适合架构师、设计师、软件开发工程师、测试人员和其他对Java NIO框架...

    netty权威指南

    实战方面,从第一个Netty入门程序到私有协议栈的设计和开发,通过实际例程,由浅入深地对Netty的核心API和类库的功能和用法进行了细致讲解。, 《Netty权威指南》适合架构师、设计师、软件开发工程师、测试人员和其他...

    Netty权威指南

    实战方面,从第一个Netty入门程序到私有协议栈的设计和开发,通过实际例程,由浅入深地对Netty的核心API和类库的功能和用法进行了细致讲解。, 《Netty权威指南》适合架构师、设计师、软件开发工程师、测试人员和其他...

    netty权威指南 第2版(李林峰) + 源码

    实战方面,从第一个Netty入门程序到私有协议栈的设计和开发,通过实际例程,由浅入深地对Netty的核心API和类库的功能和用法进行了细致讲解。, 《Netty权威指南》适合架构师、设计师、软件开发工程师、测试人员和其他...

    masker-rest:SDK:基于Netty实现Http Server,极简API发布Rest服务及Websocket服务

    掩蔽休息项目简介:基于Netty实现Http Server框架,极简API发布Rest服务及Websocket服务(端口可替换)主要功能: Http服务器框架rest请求处理注册与分配(同一端口支持发布多个context-path服务),支持重定向,...

    Netty权威指南第2版

    实战方面,从第一个Netty入门程序到私有协议栈的设计和开发,通过实际例程,由浅入深地对Netty的核心API和类库的功能和用法进行了细致讲解。本书适合架构师、设计师、软件开发工程师、测试人员和其他对Java NIO框架...

    Netty权威指南(高清版)

    实战方面,从第一个Netty入门程序到私有协议栈的设计和开发,通过实际例程,由浅入深地对Netty的核心API和类库的功能和用法进行了细致讲解。本书适合架构师、设计师、软件开发工程师、测试人员和其他对Java NIO框架...

    Netty权威指南.part1.rar

    实战方面,从第一个Netty入门程序到私有协议栈的设计和开发,通过实际例程,由浅入深地对Netty的核心API和类库的功能和用法进行了细致讲解。本书适合架构师、设计师、软件开发工程师、测试人员和其他对Java NIO框架...

Global site tag (gtag.js) - Google Analytics