`
xiaoliang330
  • 浏览: 112062 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

Netty服务构建实践

 
阅读更多

用main的形式在服务器上启动了一个netty服务,有端口有地址,可请求


package com.mazing.wx;

import java.io.IOException;
import java.io.InputStream;

import org.apache.logging.log4j.core.config.ConfigurationSource;
import org.apache.logging.log4j.core.config.Configurator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;

/**
 *
 */
public class WxHttpServer {

	static {//指定初始化哪个log配置
		try {
			InputStream is = WxHttpServer.class.getResourceAsStream("/log4j2-wx.xml");
			ConfigurationSource cs = new ConfigurationSource(is);
			Configurator.initialize(null, cs);
		} catch (IOException e) {
			System.err.println(e);
		}
	}

	private static final Logger logger = LoggerFactory.getLogger(WxHttpServer.class);

    public void start(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            // server端发送的是httpResponse,所以要使用HttpResponseEncoder进行编码
                            ch.pipeline().addLast(new HttpResponseEncoder());
                            // server端接收到的是httpRequest,所以要使用HttpRequestDecoder进行解码
                            ch.pipeline().addLast(new HttpRequestDecoder());
                            ch.pipeline().addLast(new HttpServerInboundHandler());
                        }
                    }).option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture f = b.bind(port).sync();

            f.channel().closeFuture().sync();

        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }

        logger.info("Http server stopped ...");
    }

    public static void main(String[] args) throws Exception {
    	logger.info("http main ...");
        WxHttpServer server = new WxHttpServer();
        final WxAccessTokenReaderThread thread = new WxAccessTokenReaderThread();
        thread.start();
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                logger.info("shutdowning ...");
                thread.stopRunning();
            }
        });
        logger.info("Http Server listening on 20090 ...");
        server.start(20090);

    }
}





package com.mazing.wx;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;

import static io.netty.handler.codec.http.HttpHeaders.Names.*;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;

/**
 * 
 */
public class HttpServerInboundHandler extends ChannelInboundHandlerAdapter {

    private static Logger logger = LoggerFactory.getLogger(HttpServerInboundHandler.class);

    private HttpRequest request;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        if (msg instanceof HttpRequest) {
            request = (HttpRequest) msg;

            String uri = request.getUri();

            InetSocketAddress insocket = (InetSocketAddress) ctx.channel()
                    .remoteAddress();
            String clientIP = insocket.getAddress().getHostAddress();
            logger.info("uri: {}, ip: {}", uri, clientIP);
        }

        if (msg instanceof HttpContent) {
            HttpContent content = (HttpContent) msg;
            ByteBuf buf = content.content();
            buf.release();

            String text = WxAccessTokenReaderThread.wxAccessToken + "|" + WxAccessTokenReaderThread.wxJsapiTicket;
            FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1,
                    OK, Unpooled.wrappedBuffer(text.getBytes("UTF-8")));
            response.headers().set(CONTENT_TYPE, "text/plain");
            response.headers().set(CONTENT_LENGTH,
                    response.content().readableBytes());
            if (HttpHeaders.isKeepAlive(request)) {
                response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
            }
            ctx.write(response);
            ctx.flush();
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        logger.error(cause.getMessage());
        ctx.close();
    }

}





package com.mazing.wx;

import com.fasterxml.jackson.core.type.TypeReference;
import com.mazing.CommonConstants;
import com.mazing.commons.utils.HttpClientUtils;
import com.mazing.commons.utils.JsonUtils;
import com.mazing.commons.utils.cfg.DesPropertiesEncoder;
import com.mazing.core.remote.config.Config;
import com.mazing.core.web.RestResult;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * 读取微信 access token 的线程
 *
 */
public class WxAccessTokenReaderThread extends Thread {

    private static final Logger logger = LoggerFactory.getLogger(WxAccessTokenReaderThread.class);

    private boolean running = true;

    /**
     * 获取间隔
     */
    private static final int INTERVAL = 2 * 60 * 1000;

    // access token 在 7200秒过期,
    private long TOKEN_TIMEOUT = 7200 * 1000L - INTERVAL;

    private long lastReadTime = 0L;

    private String appid = null;

    private String secret = null;

    /**
     * access token
     */
    static volatile String wxAccessToken = "";

    /**
     * jsapi_ticket
     */
    static volatile String wxJsapiTicket = "";

    public WxAccessTokenReaderThread() {
        super("WxAccessTokenReaderThread");
    }

    public void stopRunning() {
        running = false;
        interrupt();
    }

    @Override
    public void run() {
        logger.info("WxAccessTokenReaderThread started ...");
        if (!readConfig()) {
            logger.error("读取配置错误,线程结束,请配置 group=weixin_mp 的内容。 ");
            running = false;
        }
        while (running) {
            long now = System.currentTimeMillis();
            if (now - lastReadTime >= TOKEN_TIMEOUT) {
                try {
                    readToken();
                    readTicket();

                    lastReadTime = now;
                } catch (Exception e) {
                    logger.error("读取微信 access token / jsapi ticket 错误", e);
                }
            }
            try {
                sleep(INTERVAL);
            } catch (InterruptedException e) {
                break;
            }
        }
        logger.info("WxAccessTokenReaderThread stopped ...");
    }

    /**
     * 读取配置,只读一次
     * @throws Exception
     */
    public boolean readConfig() {
        String json = HttpClientUtils.doGet(CommonConstants.CONFIG_DOMAIN + "/api/base/allConfigs");
//        logger.info("wx#readConfig | http response | result: {}", json.substring(0, 10) + "***");
        RestResult<List<Config>> result = JsonUtils.parseObject(json, new TypeReference<RestResult<List<Config>>>() {
        });

        if (!(result.isSuccess())) {
            logger.warn("wx#readConfig#Failure | Failure Request | result: {}", json);
           return false;
        }

        final String groupCode = "weixin_mp";
        DesPropertiesEncoder decoder = new DesPropertiesEncoder();
        int found = 0;
        for (Config config : result.getObject()) {
            if (groupCode.equals(config.getGroupCode())) {
                if ("appid".equalsIgnoreCase(config.getConfigKey())) {
                    appid = decoder.decode(config.getConfigValue());
                    found++;
                }
                else if ("secret".equalsIgnoreCase(config.getConfigKey())) {
                    secret = decoder.decode(config.getConfigValue());
                    found++;
                }
            }
            if (found >= 2) break;
        }
        return found >= 2;
    }
    /**
     * 读取 access token
     */
    private void readToken() {
        Map<String, String> params = new HashMap<>(4);
        params.put("grant_type", "client_credential");
        params.put("appid", appid);
        params.put("secret", secret);
        String json = HttpClientUtils.doGet("https://api.weixin.qq.com/cgi-bin/token", 10000, params);
        if (StringUtils.isBlank(json)) {
            logger.error("读取微信 access token 错误");
        }
        Map<String, Object> map = JsonUtils.parseObject(json.trim(), new TypeReference<Map<String, Object>>() {
        });
        if (map.containsKey("access_token")) {
            wxAccessToken = (String) map.get("access_token");
            logger.info("读取微信 access token 成功");
            if (map.containsKey("expires_in")) {
                int expiresIn = ((Number)map.get("expires_in")).intValue();
                logger.info("过期时间 (s):" + expiresIn);
                TOKEN_TIMEOUT = expiresIn * 1000L - INTERVAL;
                if (TOKEN_TIMEOUT < 1000) {
                    TOKEN_TIMEOUT = 10 * 60 * 1000L - INTERVAL;
                }
            }

        }
        else {
            logger.error("读取微信 access token 错误:" + json);
        }
    }

    /**
     * 读取 jsapi_ticket
     */
    private void readTicket() {
        Map<String, String> params = new HashMap<>(4);
        params.put("access_token", wxAccessToken);
        params.put("type", "jsapi");
        String json = HttpClientUtils.doGet("https://api.weixin.qq.com/cgi-bin/ticket/getticket", 10000, params);
        Map<String, Object> map = JsonUtils.parseObject(json.trim(), new TypeReference<Map<String, Object>>() {
        });
        int errcode = -1;
        if (map.get("errcode") != null) {
            errcode = ((Number) map.get("errcode")).intValue();
            logger.info("读取微信 jsapi_ticket 结果,errcode: {}, errmsg: {}", errcode, map.get("errmsg"));
            if (errcode == 0) {
                wxJsapiTicket = (String) map.get("ticket");
                logger.info("读取微信 jsapi_ticket 成功!");
            }
        }
        else {
            logger.error("读取微信 jsapi_ticket 错误:" + json);
        }
    }
}





以上代码在启动WxHttpServer 中的main后,也就相当于启动了端口为20090的服务了,  可以通过请求如:
http://123.40.50.60:20090/  获取到 HttpServerInboundHandler 中写入的参数

WeixinAccessToken.ACCESS_TOKEN  和
                    WeixinAccessToken.JSAPI_TICKET






分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics