`
luciferdevil
  • 浏览: 3075 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

基于Java NIO的即时聊天服务器雏形

阅读更多

前不久自己动手写了一个Android的聊天工具,跟服务器的交互还是基于HTTP方式的,在一般通讯上还算凑活,但是在即时聊天的时候就有点恶心了,客户端开启Service每隔3秒去询问服务器是否有自己的新消息(当然3秒有点太快了),在心疼性能和流量的前提下,只能自己动手写个服务器,传统的Socket是阻塞的,这样的话服务器对每个Socket都需要建立一个线程来操作,资源开销很大,而且线程多了直接会影响服务端的性能(曾经测试开了3000多个线程就不让创建了,所以并发数目也是有限制的),听说从JDK1.5就多了个New IO,灰常不错的样子,找了找相关的资料,网上竟然全都是最最最简单的一个demo,然后去CSDN发帖,基本上都是建议直接使用MINA框架的,这样一来根本达不到学习NIO的目的,而且现在的技术也太快餐了,只知道使用前辈留下的东西,知其然不知其所以然。

折腾了一个周,终于搞出来了一个雏形,相比于xmpp的xml,本人更喜欢json的简洁,为了防止客户端异常断开等,准备采用心跳检测的机制来判断用户是否在线,另外还有一种方法是学习例如Tomcat等Servlet中间件的方式,设置Session周期,定时清除过期Session。本Demo暂时实现了Session过期检测,心跳检测有空再搞,如果本例子在使用过程中有性能漏洞或者什么bug请及时通知我,谢谢

 

一开始把文章发博客园了,结果发现Java基本成悲剧了,还是回JavaEye吧……

 

废话不多说,关于NIOSelectionKeySelectorChannel网上的介绍例子都很多,直接上代码:

 

JsonParser

Json的解析类,随便封装了下,使用的最近比较火的fastjson

1 public class JsonParser {
2     
3     private static JSONObject mJson;
4     
5     public synchronized static String get(String json,String key) {
6         mJson = JSON.parseObject(json);
7         return mJson.getString(key);
8     }
9 }

Main

入口,不解释

1 public class Main {
2 
3     public static void main(String... args) {
4         new SeekServer().start();
5     }
6 }

Log

1 public class Log {
2 
3     public static void i(Object obj) {
4         System.out.println(obj);
5     }
6     public static void e(Object e) {
7         System.err.println(e);
8     }
9 }

SeekServer:

服务器端的入口,请求的封装和接收都在此类,端口暂时写死在了代码里,mSelector.select(TIME_OUT) > 0 目的是为了当服务器空闲的时候(没有任何读写甚至请求断开事件),循环时有个间隔时间,不然基本上相当于while(true){//nothing}了,你懂的

 1 public class SeekServer extends Thread{
 2     private final int ACCPET_PORT = 55555;
 3     private final int TIME_OUT = 1000;
 4     private Selector mSelector = null;
 5     private ServerSocketChannel mSocketChannel = null;
 6     private ServerSocket mServerSocket = null;
 7     private InetSocketAddress mAddress = null;
 8     
 9     public SeekServer() {
10         long sign = System.currentTimeMillis();
11         try {
12             mSocketChannel = ServerSocketChannel.open();
13             if(mSocketChannel == null) {
14                 System.out.println("can't open server socket channel");
15             }
16             mServerSocket = mSocketChannel.socket();
17             mAddress = new InetSocketAddress(ACCPET_PORT);
18             mServerSocket.bind(mAddress);
19             Log.i("server bind port is " + ACCPET_PORT);
20             mSelector = Selector.open();
21             mSocketChannel.configureBlocking(false);
22             SelectionKey key = mSocketChannel.register(mSelector, SelectionKey.OP_ACCEPT);
23             key.attach(new Acceptor());
24             
25             //检测Session状态
26             Looper.getInstance().loop();
27             
28             //开始处理Session
29             SessionProcessor.start();
30             
31             Log.i("Seek server startup in " + (System.currentTimeMillis() - sign) + "ms!");
32         } catch (ClosedChannelException e) {
33             Log.e(e.getMessage());
34         } catch (IOException e) {
35             Log.e(e.getMessage());
36         } 
37     }
38     
39     public void run() {
40         Log.i("server is listening...");
41         while(!Thread.interrupted()) {
42             try {
43                 if(mSelector.select(TIME_OUT) > 0) {
44                     Set<SelectionKey> keys = mSelector.selectedKeys();
45                     Iterator<SelectionKey> iterator = keys.iterator();
46                     SelectionKey key = null;
47                     while(iterator.hasNext()) {
48                         key = iterator.next();
49                         Handler at = (Handler) key.attachment();
50                         if(at != null) {
51                             at.exec();
52                         }
53                         iterator.remove();
54                     }
55                 }
56             } catch (IOException e) {
57                 Log.e(e.getMessage());
58             }
59         }
60     }
61 
62     class Acceptor extends Handler{
63 
64         public void exec(){
65             try {
66                 SocketChannel sc = mSocketChannel.accept();
67                 new Session(sc, mSelector);
68             } catch (ClosedChannelException e) {
69                 Log.e(e);
70             } catch (IOException e) {
71                 Log.e(e);
72             }
73         }
74     }
75 }

Handler

只有一个抽象方法execSession将会继承它

1 public abstract class Handler {
2     
3     public abstract void exec();
4 }

Session

封装了用户的请求和SelectionKeySocketChannel,每次接收到新的请求时都重置它的最后活动时间,通过状态mState=READING or SENDING 去执行消息的接收与发送,当客户端异常断开时则从SessionManager清除该会话。

  1 public class Session extends Handler{
  2 
  3     private SocketChannel mChannel;
  4     private SelectionKey  mKey;
  5     private ByteBuffer mRreceiveBuffer = ByteBuffer.allocate(10240);  
  6     private Charset charset = Charset.forName("UTF-8");
  7     private CharsetDecoder mDecoder = charset.newDecoder();
  8     private CharsetEncoder mEncoder = charset.newEncoder();
  9     private long lastPant;//最后活动时间
 10     private final int TIME_OUT = 1000 * 60 * 5; //Session超时时间
 11     private String key;
 12     
 13     private String sendData = "";
 14     private String receiveData = null;
 15     
 16     public static final int READING = 0,SENDING = 1;
 17     int mState = READING;
 18     
 19     public Session(SocketChannel socket, Selector selector) throws IOException {
 20         this.mChannel = socket;
 21         mChannel = socket;
 22         mChannel.configureBlocking(false);
 23         mKey = mChannel.register(selector, 0);
 24         mKey.attach(this);
 25         mKey.interestOps(SelectionKey.OP_READ);
 26         selector.wakeup();
 27         lastPant = Calendar.getInstance().getTimeInMillis();
 28     }
 29     
 30     public String getReceiveData() {
 31         return receiveData;
 32     }
 33     
 34     public void clear() {
 35         receiveData = null;
 36     }
 37 
 38     public void setSendData(String sendData) {
 39         mState = SENDING;
 40         mKey.interestOps(SelectionKey.OP_WRITE);
 41         this.sendData = sendData + "\n";
 42     }
 43 
 44     public boolean isKeekAlive() {
 45         return lastPant + TIME_OUT > Calendar.getInstance().getTimeInMillis();
 46     }
 47     
 48     public void setAlive() {
 49         lastPant = Calendar.getInstance().getTimeInMillis();
 50     }
 51     
 52     /**
 53      * 注销当前Session
 54      */
 55     public void distroy() {
 56         try {
 57             mChannel.close();
 58             mKey.cancel();
 59         } catch (IOException e) {}
 60     }
 61     
 62     @Override
 63     public synchronized void exec() {
 64         try {
 65             if(mState == READING) {
 66                 read();
 67             }else if(mState == SENDING) {
 68                 write();
 69             }
 70         } catch (IOException e) {
 71             SessionManager.remove(key);
 72             try {
 73                 mChannel.close();
 74             } catch (IOException e1) {
 75                 Log.e(e1);
 76             }
 77             mKey.cancel();
 78         }
 79     }
 80     
 81     public void read() throws IOException{
 82         mRreceiveBuffer.clear();
 83         int sign = mChannel.read(mRreceiveBuffer);
 84         if(sign == -1) { //客户端连接关闭
 85             mChannel.close();
 86             mKey.cancel();
 87         }
 88         if(sign > 0) {
 89             mRreceiveBuffer.flip();
 90             receiveData = mDecoder.decode(mRreceiveBuffer).toString();
 91             setAlive();
 92             setSign();
 93             SessionManager.addSession(key, this);
 94         }
 95     }
 96     
 97     private void setSign() {
 98         //设置当前Session的Key
 99         key = JsonParser.get(receiveData,"imei");
100         //检测消息类型是否为心跳包
101 //        String type = jo.getString("type");
102 //        if(type.equals("HEART_BEAT")) {
103 //            setAlive();
104 //        }
105     }
106     
107     
108     /**
109      * 写消息
110      */
111     public void write() {
112         try {
113             mChannel.write(mEncoder.encode(CharBuffer.wrap(sendData)));
114             sendData = null;
115             mState = READING;
116             mKey.interestOps(SelectionKey.OP_READ);
117         } catch (CharacterCodingException e) {
118             e.printStackTrace();
119         } catch (IOException e) {
120             try {
121                 mChannel.close();
122             } catch (IOException e1) {
123                 Log.e(e1);
124             }
125         }
126     }
127 }

 

SessionManager

将所有Session存放到ConcurrentHashMap这里使用手机用户的imeikeyConcurrentHashMap因为是线程安全的,所以能很大程度上避免自己去实现同步的过程,

封装了一些操作Session的方法例如getremove

 1 public class SessionManager {
 2 
 3     private static ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<String, Session>();
 4     
 5     public static void addSession(String key,Session session) {
 6         sessions.put(key, session);
 7     }
 8     
 9     public static Session getSession(String key) {
10         return sessions.get(key);
11     }
12     
13     public static Set<String> getSessionKeys() {
14         return sessions.keySet();
15     }
16     
17     public static int getSessionCount() {
18         return sessions.size();
19     }
20     
21     public static void remove(String[] keys) {
22         for(String key:keys) {
23             if(sessions.containsKey(key)) {
24                 sessions.get(key).distroy();
25                 sessions.remove(key);
26             }
27         }
28     }
29     public static void remove(String key) {
30         if(sessions.containsKey(key)) {
31             sessions.get(key).distroy();
32             sessions.remove(key);
33         }
34     }
35 }

SessionProcessor

里面使用了JDK自带的线程池,用来分发处理所有Session中当前需要处理的请求(线程池的初始化参数不是太熟,望有了解的童鞋能告诉我),内部类Process则是将Session再次封装成SocketRequestSocketResponse(看到这里是不是有点熟悉的感觉,对没错,JavaWeb里到处都是requestresponse

 1 public class SessionProcessor implements Runnable{
 2     
 3     private static Runnable processor = new SessionProcessor();
 4     private static ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 200, 500, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(10),new ThreadPoolExecutor.CallerRunsPolicy());
 5     public static void start() {
 6         new Thread(processor).start();
 7     }
 8     
 9     @Override
10     public void run() {
11         while(true) {
12             Session tmp = null;
13             for(String key:SessionManager.getSessionKeys()) {
14                 tmp = SessionManager.getSession(key);
15                 //处理Session未处理的请求
16                 if(tmp.getReceiveData() != null) {
17                     pool.execute(new Process(tmp));
18                 }
19             }
20             try {
21                 Thread.sleep(10);
22             } catch (InterruptedException e) {
23                 Log.e(e);
24             }
25         }
26     }
27     
28     class Process implements Runnable {
29 
30         private SocketRequest request;
31         private SocketResponse response;
32         
33         public Process(Session session) {
34             //将Session封装成Request和Response
35             request = new SocketRequest(session);
36             response = new SocketResponse(session);
37         }
38         
39         @Override
40         public void run() {
41             new RequestTransform().transfer(request, response);
42         }
43     }
44 
45 }

RequestTransform里的transfer方法利用反射对请求参数中的请求类别和请求动作来调用不同类的不同方法(UserHandlerMessageHandler

 1 public class RequestTransform {
 2 
 3     public void transfer(SocketRequest request,SocketResponse response) {
 4         String action = request.getValue("action");
 5         String handlerName = request.getValue("handler");
 6         //根据Session的请求类型,让不同的类方法去处理
 7         try {
 8             Class<?> c= Class.forName("com.seek.server.handler." + handlerName);
 9             Class<?>[] arg=new Class[]{SocketRequest.class,SocketResponse.class};
10             Method method=c.getMethod(action,arg);
11             method.invoke(c.newInstance(), new Object[]{request,response});
12         } catch (Exception e) {
13             e.printStackTrace();
14         }
15     }
16 }

SocketRequestSocketResponse

 1 public class SocketRequest {
 2 
 3     private Session mSession;
 4     private String  mReceive;
 5     
 6     public SocketRequest(Session session) {
 7         mSession = session;
 8         mReceive = session.getReceiveData();
 9         mSession.clear();
10     }
11     
12     public String getValue(String key) {
13         return JsonParser.get(mReceive, key);
14     }
15     
16     public String getQueryString() {
17         return mReceive;
18     }
19 }
 1 public class SocketResponse {
 2 
 3     private Session mSession;
 4     public SocketResponse(Session session) {
 5         mSession = session;
 6     }
 7     
 8     public void write(String msg) {
 9         mSession.setSendData(msg);
10     }
11 }

 

最后则是两个处理请求的Handler

1 public class UserHandler {
2 
3     public void login(SocketRequest request,SocketResponse response) {
4         System.out.println(request.getQueryString());
5         //TODO: 处理用户登录
6         response.write("你肯定收到消息了");
7     }
8 }
1 public class MessageHandler {
2     public void send(SocketRequest request,SocketResponse response) {
3         System.out.println(request.getQueryString());
4         //消息发送
5         String key = request.getValue("imei");
6         Session session = SessionManager.getSession(key);
7         new SocketResponse(session).write(request.getValue("sms"));
8     }
9 }

 

还有个监测是否超时的类Looper,定期去删除Session

 1 public class Looper extends Thread{
 2     private static Looper looper = new Looper();
 3     private static boolean isStart = false;
 4     private final int INTERVAL = 1000 * 60 * 5;
 5     private Looper(){}
 6     public static Looper getInstance() {
 7         return looper;
 8     }
 9     
10     public void loop() {
11         if(!isStart) {
12             isStart = true;
13             this.start();
14         }
15     }
16     
17     public void run() {
18         Task task = new Task();
19         while(true) {
20             //Session过期检测
21             task.checkState();
22             //心跳包检测
23             //task.sendAck();
24             try {
25                 Thread.sleep(INTERVAL);
26             } catch (InterruptedException e) {
27                 Log.e(e);
28             }
29         }
30     }
31 }
 1 public class Task {
 2     public void checkState() {
 3         Set<String> keys = SessionManager.getSessionKeys();
 4         if(keys.size() == 0) {
 5             return;
 6         }
 7         List<String> removes = new ArrayList<String>();
 8         Iterator<String> iterator = keys.iterator();
 9         String key = null;
10         while(iterator.hasNext()) {
11             key = iterator.next();
12             if(!SessionManager.getSession(key).isKeekAlive()) {
13                 removes.add(key);
14             }
15         }
16         if(removes.size() > 0) {
17             Log.i("sessions is time out,remove " + removes.size() + "session");
18         }
19         SessionManager.remove(removes.toArray(new String[removes.size()]));
20     }
21     
22     public void sendAck() {
23         Set<String> keys = SessionManager.getSessionKeys();
24         if(keys.size() == 0) {
25             return;
26         }
27         Iterator<String> iterator = keys.iterator();
28         while(iterator.hasNext()) {
29             iterator.next();
30             //TODO 发送心跳包
31         }
32     }
33 }

 

注意,在Task和SessionProcessor类里都有对SessionManager的sessions做遍历,文中使用的方法并不是很好,主要是效率问题,推荐使用遍历Entry的方式来获取Key和Value,

因为一直在JavaWeb上折腾,所以会的童鞋看到Request和Response会挺亲切,这个例子没有经过任何安全和性能测试,如果需要放到生产环境上得话请先自行做测试- -!

 

客户端请求时的数据内容例如{handler:"UserHandler",action:"login",imei:"2364656512636".......},这些约定就自己来定了

 

代码在家里的机器上,回去补上,请各位大大指证 - -!

 

分享到:
评论

相关推荐

    基于Java NIO实现五子棋游戏.zip

    基于Java NIO实现五子棋游戏.zip基于Java NIO实现五子棋游戏.zip 基于Java NIO实现五子棋游戏.zip基于Java NIO实现五子棋游戏.zip 基于Java NIO实现五子棋游戏.zip基于Java NIO实现五子棋游戏.zip 基于Java NIO实现...

    基于Java NIO的网络服务器Netty生产实例.zip

    基于Java NIO的网络服务器Netty生产实例

    java nio 聊天室源码

    此项目基于java nio实现聊天室功能

    java基于NIO实现Reactor模型源码.zip

    java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现...

    非GUI聊天室基于javaNIO

    服务端程序基于javaNIO,客户端程序基于旧IO,读完&lt;&lt;javaNIO&gt;&gt;后,导入eclipse即可运行,支持多人在线聊天,上下线通知.PS:非GUI程序,毕竟javaSwing用的少,不懂的地方大家可以一起讨论,评论必回!

    基于java NIO的简单聊天软件示例

    JAVA NIO有两种解释:一种叫非阻塞IO(Non-blocking I/O),另一种也叫新的IO(New I/O),其实是同一个概念。它是一种同步非阻塞的I/O模型,也是...本例是使用java nio实现的简单聊天系统,界面简单,旨在学习java nio

    java nio 通信服务器、客户端完整例子

    用java编写的nio通信的例子,nio是io编程的新版本,比io较流行。同时本例子是适用socket通信的。可以在此基础上,添加您的个人应用。本例子适用于:java通信的学习者,android平台通信的学习者。

    java NIO和java并发编程的书籍

    java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java...

    基于nio的简易聊天室

    基于nio 简易聊天室的服务端 客户端,有界面

    JavaNIO chm帮助文档

    Java NIO系列教程(一) Java NIO 概述 Java NIO系列教程(二) Channel Java NIO系列教程(三) Buffer Java NIO系列教程(四) Scatter/Gather Java NIO系列教程(五) 通道之间的数据传输 Java NIO系列教程(六)...

    java nio聊天室源码

    用java nio写的一个完整的、可运行的聊天室程序;复制代码到项目后可直接运行。

    Java NIO 在并发型服务器设计中的应用

    Java NIO 在并发型服务器设计中的应用Java NIO 在并发型服务器设计中的应用Java NIO 在并发型服务器设计中的应用

    高手使用Java NIO编写高性能的服务器

    使用Java NIO编写高性能的服务器

    基于java NIO的socket通信demo

    java NIO 创建的服务端,能够异步响应客户端的请求,客户端采用nio异步请求服务端,通信之间的乱码使用charset解决

    java NIO socket聊天室

    使用NIO socket不需要多线程来处理多个连接的请求,效率非常高 可以作为NIO socket入门的例子,Reactor模式,重点理解key.attach, jar文件里包含了源代码 1,运行server.bat启动服务器,可以打开编辑,修改端口号 ...

    java NIO socket聊天

    java NIO 高性能 socket通讯,服务端采用单线程,降低了cpu的压力,普通io socket通讯,server需要每个连接运行个线程,容易出现问题,效率也低

    nio-chatroom:基于Java Nio的单服务器多客户端聊天室

    聊天室基于Java Nio的单服务器多客户端聊天室实施规范服务器来自客户端的所有请求都显示在服务器端服务器处理连接/断开而没有其他客户端告知所有客户变更客户端显示给所有用户的在线用户列表向所有用户显示连接/断开...

    Java NIO英文高清原版

    Java NIO英文高清原版

    java NIO 视频教程

    Java NIO(New IO)是一个可以替代标准Java IO API的IO API(从Java 1.4开始),Java NIO提供了与标准IO不同的IO工作方式。 Java NIO: Channels and Buffers(通道和缓冲区) 标准的IO基于字节流和字符流进行操作的,...

    java nio 包读取超大数据文件

    Java nio 超大数据文件 超大数据文件Java nio 超大数据文件 超大数据文件Java nio 超大数据文件 超大数据文件Java nio 超大数据文件 超大数据文件Java nio 超大数据文件 超大数据文件Java nio 超大数据文件 超大数据...

Global site tag (gtag.js) - Google Analytics