`

通过netty实现服务端与客户端的长连接通讯,及心跳检测。

 
阅读更多
基本思路:netty服务端通过一个Map保存所有连接上来的客户端SocketChannel,客户端的Id作为Map的key。每次服务器端如果要向某个客户端发送消息,只需根据ClientId取出对应的SocketChannel,往里面写入message即可。心跳检测通过IdleEvent 事件,定时向服务端放送Ping消息,检测SocketChannel是否终断。

        环境JDK1.8 和netty5

        以下是具体的代码实现和介绍:

1公共的Share部分(主要包含消息协议类型的定义)

     设计消息类型:

public enum  MsgType {
    PING,ASK,REPLY,LOGIN
}
Message基类:

//必须实现序列,serialVersionUID 一定要有,否者在netty消息序列化反序列化会有问题,接收不到消息!!!
public abstract class BaseMsg  implements Serializable {
    private static final long serialVersionUID = 1L;
    private MsgType type;
    //必须唯一,否者会出现channel调用混乱
    private String clientId;

    //初始化客户端id
    public BaseMsg() {
        this.clientId = Constants.getClientId();
    }

    public String getClientId() {
        return clientId;
    }

    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

    public MsgType getType() {
        return type;
    }

    public void setType(MsgType type) {
        this.type = type;
    }
}
常量设置:

public class Constants {
    private static String clientId;
    public static String getClientId() {
        return clientId;
    }
    public static void setClientId(String clientId) {
        Constants.clientId = clientId;
    }
}
登录类型消息:
public class LoginMsg extends BaseMsg {
    private String userName;
    private String password;
    public LoginMsg() {
        super();
        setType(MsgType.LOGIN);
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }
}
心跳检测Ping类型消息:

public class PingMsg extends BaseMsg {
    public PingMsg() {
        super();
        setType(MsgType.PING);
    }
}
请求类型消息:

public class AskMsg extends BaseMsg {
    public AskMsg() {
        super();
        setType(MsgType.ASK);
    }
    private AskParams params;

    public AskParams getParams() {
        return params;
    }

    public void setParams(AskParams params) {
        this.params = params;
    }
}
//请求类型参数
//必须实现序列化接口
public class AskParams implements Serializable {
    private static final long serialVersionUID = 1L;
    private String auth;

    public String getAuth() {
        return auth;
    }

    public void setAuth(String auth) {
        this.auth = auth;
    }
}
响应类型消息:

public class ReplyMsg extends BaseMsg {
    public ReplyMsg() {
        super();
        setType(MsgType.REPLY);
    }
    private ReplyBody body;

    public ReplyBody getBody() {
        return body;
    }

    public void setBody(ReplyBody body) {
        this.body = body;
    }
}
//相应类型body对像
public class ReplyBody implements Serializable {
    private static final long serialVersionUID = 1L;
}
public class ReplyClientBody extends ReplyBody {
    private String clientInfo;

    public ReplyClientBody(String clientInfo) {
        this.clientInfo = clientInfo;
    }

    public String getClientInfo() {
        return clientInfo;
    }

    public void setClientInfo(String clientInfo) {
        this.clientInfo = clientInfo;
    }
}
public class ReplyServerBody extends ReplyBody {
    private String serverInfo;
    public ReplyServerBody(String serverInfo) {
        this.serverInfo = serverInfo;
    }
    public String getServerInfo() {
        return serverInfo;
    }
    public void setServerInfo(String serverInfo) {
        this.serverInfo = serverInfo;
    }
}
2 Server端:主要包含对SocketChannel引用的Map,ChannelHandler的实现和Bootstrap.

Map:

public class NettyChannelMap {
    private static Map<String,SocketChannel> map=new ConcurrentHashMap<String, SocketChannel>();
    public static void add(String clientId,SocketChannel socketChannel){
        map.put(clientId,socketChannel);
    }
    public static Channel get(String clientId){
       return map.get(clientId);
    }
    public static void remove(SocketChannel socketChannel){
        for (Map.Entry entry:map.entrySet()){
            if (entry.getValue()==socketChannel){
                map.remove(entry.getKey());
            }
        }
    }

}
Handler

public class NettyServerHandler extends SimpleChannelInboundHandler<BaseMsg> {
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        //channel失效,从Map中移除
        NettyChannelMap.remove((SocketChannel)ctx.channel());
    }
    @Override
    protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception {

        if(MsgType.LOGIN.equals(baseMsg.getType())){
            LoginMsg loginMsg=(LoginMsg)baseMsg;
            if("robin".equals(loginMsg.getUserName())&&"yao".equals(loginMsg.getPassword())){
                //登录成功,把channel存到服务端的map中
                NettyChannelMap.add(loginMsg.getClientId(),(SocketChannel)channelHandlerContext.channel());
                System.out.println("client"+loginMsg.getClientId()+" 登录成功");
            }
        }else{
            if(NettyChannelMap.get(baseMsg.getClientId())==null){
                    //说明未登录,或者连接断了,服务器向客户端发起登录请求,让客户端重新登录
                    LoginMsg loginMsg=new LoginMsg();
                    channelHandlerContext.channel().writeAndFlush(loginMsg);
            }
        }
        switch (baseMsg.getType()){
            case PING:{
                PingMsg pingMsg=(PingMsg)baseMsg;
                PingMsg replyPing=new PingMsg();
                NettyChannelMap.get(pingMsg.getClientId()).writeAndFlush(replyPing);
            }break;
            case ASK:{
                //收到客户端的请求
                AskMsg askMsg=(AskMsg)baseMsg;
                if("authToken".equals(askMsg.getParams().getAuth())){
                    ReplyServerBody replyBody=new ReplyServerBody("server info $$$$ !!!");
                    ReplyMsg replyMsg=new ReplyMsg();
                    replyMsg.setBody(replyBody);
                    NettyChannelMap.get(askMsg.getClientId()).writeAndFlush(replyMsg);
                }
            }break;
            case REPLY:{
                //收到客户端
                ReplyMsg replyMsg=(ReplyMsg)baseMsg;
                ReplyClientBody clientBody=(ReplyClientBody)replyMsg.getBody();
                System.out.println("receive client msg: "+clientBody.getClientInfo());
            }break;
            default:break;
        }
        ReferenceCountUtil.release(baseMsg);
    }
}
ServerBootstrap:

public class NettyServerBootstrap {
    private int port;
    private SocketChannel socketChannel;
    public NettyServerBootstrap(int port) throws InterruptedException {
        this.port = port;
        bind();
    }

    private void bind() throws InterruptedException {
        EventLoopGroup boss=new NioEventLoopGroup();
        EventLoopGroup worker=new NioEventLoopGroup();
        ServerBootstrap bootstrap=new ServerBootstrap();
        bootstrap.group(boss,worker);
        bootstrap.channel(NioServerSocketChannel.class);
        bootstrap.option(ChannelOption.SO_BACKLOG, 128);
        //通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
        bootstrap.option(ChannelOption.TCP_NODELAY, true);
        //保持长连接状态
        bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                ChannelPipeline p = socketChannel.pipeline();
                p.addLast(new ObjectEncoder());
                p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
                p.addLast(new NettyServerHandler());
            }
        });
        ChannelFuture f= bootstrap.bind(port).sync();
        if(f.isSuccess()){
            System.out.println("server start");
        }
    }
    public static void main(String []args) throws InterruptedException {
        NettyServerBootstrap bootstrap=new NettyServerBootstrap(9);
        while (true){
            SocketChannel channel=(SocketChannel)NettyChannelMap.get("001");
            if(channel!=null){
                AskMsg askMsg=new AskMsg();
                channel.writeAndFlush(askMsg);
            }
            TimeUnit.SECONDS.sleep(5);
        }
    }
}
3 Client端:包含发起登录,发送心跳,及对应消息处理

handler

public class NettyClientHandler extends SimpleChannelInboundHandler<BaseMsg> {
    //利用写空闲发送心跳检测消息
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            switch (e.state()) {
                case WRITER_IDLE:
                    PingMsg pingMsg=new PingMsg();
                    ctx.writeAndFlush(pingMsg);
                    System.out.println("send ping to server-");
                    break;
                default:
                    break;
            }
        }
    }
    @Override
    protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception {
        MsgType msgType=baseMsg.getType();
        switch (msgType){
            case LOGIN:{
                //向服务器发起登录
                LoginMsg loginMsg=new LoginMsg();
                loginMsg.setPassword("yao");
                loginMsg.setUserName("robin");
                channelHandlerContext.writeAndFlush(loginMsg);
            }break;
            case PING:{
                System.out.println("receive ping from server-");
            }break;
            case ASK:{
                ReplyClientBody replyClientBody=new ReplyClientBody("client info **** !!!");
                ReplyMsg replyMsg=new ReplyMsg();
                replyMsg.setBody(replyClientBody);
                channelHandlerContext.writeAndFlush(replyMsg);
            }break;
            case REPLY:{
                ReplyMsg replyMsg=(ReplyMsg)baseMsg;
                ReplyServerBody replyServerBody=(ReplyServerBody)replyMsg.getBody();
                System.out.println("receive client msg: "+replyServerBody.getServerInfo());
            }
            default:break;
        }
        ReferenceCountUtil.release(msgType);
    }
}
bootstrap

public class NettyClientBootstrap {
    private int port;
    private String host;
    private SocketChannel socketChannel;
    private static final EventExecutorGroup group = new DefaultEventExecutorGroup(20);
    public NettyClientBootstrap(int port, String host) throws InterruptedException {
        this.port = port;
        this.host = host;
        start();
    }
    private void start() throws InterruptedException {
        EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
        Bootstrap bootstrap=new Bootstrap();
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
        bootstrap.group(eventLoopGroup);
        bootstrap.remoteAddress(host,port);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new IdleStateHandler(20,10,0));
                socketChannel.pipeline().addLast(new ObjectEncoder());
                socketChannel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
                socketChannel.pipeline().addLast(new NettyClientHandler());
            }
        });
        ChannelFuture future =bootstrap.connect(host,port).sync();
        if (future.isSuccess()) {
            socketChannel = (SocketChannel)future.channel();
            System.out.println("connect server  成功");
        }
    }
    public static void main(String[]args) throws InterruptedException {
        Constants.setClientId("001");
        NettyClientBootstrap bootstrap=new NettyClientBootstrap(9,"localhost");

        LoginMsg loginMsg=new LoginMsg();
        loginMsg.setPassword("yao");
        loginMsg.setUserName("robin");
        bootstrap.socketChannel.writeAndFlush(loginMsg);
        while (true){
            TimeUnit.SECONDS.sleep(3);
            AskMsg askMsg=new AskMsg();
            AskParams askParams=new AskParams();
            askParams.setAuth("authToken");
            askMsg.setParams(askParams);
            bootstrap.socketChannel.writeAndFlush(askMsg);
        }
    }
}
分享到:
评论

相关推荐

    Netty4长连接(服务端+客户端)

    Netty4长连接、断开重连、心跳检测、Msgpack编码解码 http://blog.csdn.net/giousa/article/details/72846303#t2

    Android使用Netty网络框架实践(客户端、服务端)

    Android Studio 开发Netty网络访问框架,实现了客户端、服务端两种访问方式,支持发送心跳数据,使用Handler实现外部数据交互,有调用Demo,在实际项目中使用暂时没有问题

    netty+websocket实现心跳和断线重连

    实现netty作为服务端,websocket连接成功,将channel保存到map集合,通过js发送心跳,服务端接收心跳信息并响应给客户端,当服务端断开时 客户端进行重连操作

    Netty实现私有协议,模仿dubbo单一长连接RPC服务调用,心跳检测机制源码

    1、客户端与服务端基于单一长连接进行通信,客户端一条连接被多个线程使用。 2、实现私有协议 请求协议 协议头:4字节的请求序列号 2字节的请求类型 2字节数据包长度 数据部分: 服务调用:2字节请求服务名...

    断网断电心跳检测

    断电断网的心跳检测,完美解决了websocket断电断网之后服务端不能收到关闭的通知,倒置客户端不能收到信息

    NettySocket同步数据获取实现

    NettySocket同步数据获取实现,并实现了心跳检测,客户端连接控制,客户端登陆等。 Demo代码

    springboot netty4 及时通讯(带心跳检测,zookeeper)

    netty 底层,实现心跳检测,下线重连,发送消息.服务端注册到zookeeper,客户端连接服务端

    基于netty实现采用自定义协议方式通讯,同时支持心跳机制和重连机制

    - 服务端采用 `IdleStateHandler`,在一段时间内(默认15s)没有读到客户端消息则说明客户端已离线,服务器会触发读超时事件断开连接 - 客户端采用定时(默认10s)任务方式向服务端发送一个ping消息作为心跳包,避免...

    Netty 入门与实战:仿写微信 IM 即时通讯系统.rar

    Netty 入门与实战:仿写微信 IM 即时通讯系统,掘金小册子,netty教程。章节齐全无缺失,排版非常不错。 1.仿微信IM系统简介 1 2.Netty是什么? 2 3.服务端启动流程 8 4.客户端启动流程 11 ...16.心跳与空闲检测 25

    netty断线重连机制及心跳机制.rar

    netty断线重连机制及心跳,包含客户端和服务端,主要学习怎么重连和发送接收心跳,不满足心跳则关闭管道。

    基于Netty框架的demo项目

    这是一个基于高并发网络框架-Netty框架的demo项目,旨在展示Netty服务端与客户端的基础使用方式,并深入探讨了自定义编解码器以及心跳机制的实现。本demo紧密结合了本人发布的《初识Netty》一文中的示例,为学习者...

    unity3d 游戏客户端源码(v1.0.3)

    2. 使用Netty实现Socket异步长连接游戏服务器 3. 解决了网络通信的断包粘包问题 4. 实现了消息的序列化与反序列化 5. 实现客户端连接认证 6. 实现客户端与服务器通信消息的加密与解密 7. 实现客户端与服务器通信消息...

    TCP自定义通讯协议参考

    •每个客户端仅能使用一个长连接连接服务端。 •客户端与服务端通讯之前需要在服务端配置用户名密码。 •每次建立连接需要发送登录信息,用于确定消息与客户端关系。 •通讯链路需考虑心跳保持,心跳间隔4分钟。 •...

    NettyAndroid,Netty在Android中的使用

    Netty在Android开发中的应用实战系列(一)——— 搭建服务端与客户端:https://azhon.blog.csdn.net/article/details/100569489 Netty在Android开发中的应用实战系列(二)——— Encoder | Decoder | Handler 的...

    SpringBoot整合Netty心跳机制过程详解

    主要介绍了SpringBoot整合Netty心跳机制过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

    pacebox-netty 是一个基于netty和pacebox-core封装的便捷工具包.rar

    Hermes是一款基于Netty的可以支持百万级别的并发连接的高性能、高度可扩展的的网络通讯框架,它参考了dubbo和sofa-bolt的网络通讯模块的设计,hemers可以使用在IM、长连接等领域,它具有以下的特性: 私有的通讯...

    基于Java+Netty+MySQL实现联机版坦克大战【100011266】

    使用Netty实现客户端和服务端之间的通讯 使用Marshalling作为编解码技术 游戏界面使用java自带的swing与awt进行编写 使用Spring的依赖注入与java的反射机制简化了消息类型的判定 使用Mysql数据库 使用log4j记录日志 ...

    mqttserver:基于netty实现mqtt协议 服务器端开发

    mqttserver,基于netty 4.1.1,可解码http、mqtt协议请求。...连接断开时,客户端的离线状态处理; 心跳超时处理; 消息发布/订阅处理; 4.HttpServerHandler类实现对http消息的自定义处理。该handle类包含以下

    netty-websocket.zip

    需求:服务端主动向客户端推送消息,服务端配置心跳,根据读写设置心跳时间,定时判断客户端是否有消息互动,客户端定时推送消息至服务端,如果时间段内未推送,则判断为网络断开或其他原因,服务端将长连接断开。

    netty入门示例工程

    本工程采用maven+netty4.1.0+PrefixedStringDecoder+json技术,包括客户端和服务端。先运行服务端SampleServer,再去等客户端SampleClient。示例中发的是心跳包,其中消息格式定义为msgType + msgNo + content(json...

Global site tag (gtag.js) - Google Analytics