最近在开发一个c/s项目,主要功能是:客户端负责采集数据发送给服务端入库。
客户端与服务端交互大致过程如下:
客户端发送一次数据,服务端read一次数据并解析入库。
先描述下问题,后面会贴出较为详细的设计和代码:
调试环境:
客户端与服务端在同一台pc机(排除丢包的可能性)
问题:
该项目在调试时,会
偶然出现一个问题
:
如:
客户端
一次性发送9793B的数据,服务端nio在reactor触发的一次readable事件里读取channel中的流数据,当该次readable事件的流数据读取完毕后,发现总数据量才4380B,小于客户端所发送的9793B,剩下的数据在紧接下来的一次readable事件中被读取出来。
因此,由于客户端所发送的一个数据包,服务端分为2次或多次接收,导致该数据包后续的处理均出现异常。
由于最初设计时,完全没考虑到这样的情况,也没有发现谁碰过类似的问题,导致现在改动代码会比较麻烦,还望各位英雄豪杰帮忙分析分析。
该项目的大体设计和工作过程如下:
客户端:
客户端并不使用nio,只是使用普通socket连接服务端(因为我没发现是否有区别,难道问题原因就在此?);
客户端将采集到的数据,定时发送给服务端,发送的
数据量大小是不固定的;
发送部分代码如下:
public void send2server(int command, byte[] msg) throws IOException
{
if (out != null)
{
msg = MessageFilter.addHeader(command, msg);
_log.info("发送字节数:" + msg.length);
//msg为byte[]类型
out.write(msg);
out.flush();
}
else
{
throw new IOException("输出流为空");
}
}
服务端:
服务端通过nio触发readable事件来
一次性读取channel中的数据、解析数据并入库。
服务端nio部分代码如下:
try
{
// 处理IO事件
if (key.isAcceptable())
accept(key);
else if (key.isReadable())
{
_log.debug("发现读IO");
//与此问题相关的关键代码为这句
Reader2.processKey(key); // 提交读服务线程读取客户端数据
SocketChannel sc = (SocketChannel) key.channel();
//如果为长连接
if (sc.socket().getKeepAlive())
{
key.interestOps(key.interestOps()
& ~SelectionKey.OP_READ);
_log.debug("移除兴趣读");
}
else
key.cancel();
}
else if (key.isWritable())
{
_log.debug("发现写IO");
SocketChannel sc = (SocketChannel) key.channel();
Writer.processRequest(key); // 提交写服务线程向客户端发送回应数据
//如果为长连接
if (sc.socket().getKeepAlive())
key.interestOps(SelectionKey.OP_READ);
else
key.cancel();
}
}
catch (Exception e)
{
key.cancel();
_log.info("处理key出错,连接可能意外中断了");
e.printStackTrace();
}
//与此问题相关的关键代码为这句
Reader2.processKey(key); // 提交读服务线程读取客户端数据
Reader2类代码为:
package com.gdtec.nmt.nioserver.io;
import java.io.IOException;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.LinkedList;
import java.util.List;
import org.apache.log4j.Logger;
import com.gdtec.nmt.filter.MessageFilter;
import com.gdtec.nmt.nioserver.InterestInfo;
import com.gdtec.nmt.nioserver.Notifier;
import com.gdtec.nmt.nioserver.Request;
import com.gdtec.nmt.nioserver.Server;
import com.gdtec.nmt.pojo.ClientInfo;
import com.gdtec.nmt.pojo.MessagePackage;
/**
* <p>Title: 读线程</p>
* <p>Description: 该线程用于读取客户端数据</p>
* @author zhuhongzheng
* @version 1.0
*/
public class Reader2 extends Thread {
private static Logger _log = Logger.getLogger(Reader2.class);
private static List<Request> requestsPool = new LinkedList<Request>();
private static Notifier notifier = Notifier.getNotifier();
public Reader2() {
}
public void run() {
while (true) {
try {
Request request;
synchronized (requestsPool) {
while (requestsPool.isEmpty()) {
requestsPool.wait();
}
request = requestsPool.remove(0);
}
// 读取数据
processRequest(request);
}
catch (Exception e) {
_log.info("读取池出现异常!", e);
}
}
}
private static int BUFFER_SIZE = 20480;
/**
* 读取客户端发出请求数据
* @param sc 套接通道
* @return
* @throws IOException
*/
private static byte[] readInput(SocketChannel sc) throws IOException
{
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
int off = 0;
int r = 0;
byte[] data = new byte[BUFFER_SIZE];
//与此问题相关的关键代码,在稍后我贴的日志可以看出,在这里只read了4380B的数据,这个while循环,只进行了一次
//何解?
while ((r = sc.read(buffer)) > 0)
{
_log.debug("发现数据:" + r);
if ((off + r) > data.length)
{
data = grow(data, BUFFER_SIZE * 2);
// System.out.println("容量扩展为:" + data.length);
_log.debug("容量扩展为:" + data.length);
}
byte[] buf = buffer.array();
System.arraycopy(buf, 0, data, off, r);
off += r;
buffer.clear();
_log.debug("共读了:" + off);
}
String memoryMsg = ", freeMemory="
+ (Runtime.getRuntime().freeMemory() / (1024)
+ "k,totalMemory="
+ (Runtime.getRuntime().totalMemory() / (1024)) + "k");
_log.info("共读了:" + off + memoryMsg);
byte[] req = new byte[off];
System.arraycopy(data, 0, req, 0, off);
return req;
}
/**
* 处理连接数据读取
* @param request Request
*/
public static void processRequest(Request request) {
try {
// 读取客户端数据
byte[] received = request.getDataInputByte();
_log.debug("接收到字节数:" + received.length);
//System.out.println("reader 收到数据包:" + new String(received, "GBK"));
MessagePackage msgPackage = MessageFilter.decodeMessage(received);
// String clientData = readInput(sc);
String clientData = null;
//数据不完整或者数据为空,则不做处理
if (msgPackage == null || null == msgPackage.getBody())
{
throw new Exception("数据包格式有问题,请检查");
}
clientData = msgPackage.getBody();
request.setDataInput(clientData);
request.setDataInputByte(new byte[0]);
request.setParameter("command", msgPackage.getCommand());
request.setParameter("bodyLength", msgPackage.getBodyLength());
// 触发onRead
notifier.fireOnRead(request);
}
catch (Exception e)
{
_log.error(e);
}
try
{
//SocketChannel 在accept事件中被置入request对象
SocketChannel sc = request.getSc();
if (sc == null)
return;
//如果是长连接模式,read完了就要把write事件注册到selector,否则无法触发该channel的write事件
if (sc.socket().getKeepAlive())
{
Server.registerInterest(new InterestInfo(sc,
SelectionKey.OP_WRITE, request));
}
}
catch (SocketException e)
{
_log.error("注册写兴趣时出现异常!", e);
}
}
public static void processKey(SelectionKey key)
{
SocketChannel sc = (SocketChannel) key.channel();
Request request = (Request) key.attachment();
try
{
byte[] received = readInput(sc);
if(received.length==0)
{
// TODO:收到空数据。。怎么回事;
return ;
}
_log.debug("接收到字节数:" + received.length);
request.setDataInputByte(received);
put2RequestPool(request);
}
catch (IOException e1)
{
key.cancel();
_log.error(e1);
ClientInfo client = (ClientInfo) request.getParameter("ClientInfo");
Server.clientConnectionError(client);
}
catch (Exception e)
{
_log.error(e);
}
}
/**
* 处理客户请求,管理用户的联结池,并唤醒队列中的线程进行处理
*/
public static void put2RequestPool(Request request) {
synchronized (requestsPool) {
requestsPool.add(requestsPool.size(), request);
requestsPool.notifyAll();
}
}
/**
* 数组扩容
* @param src byte[] 源数组数据
* @param size int 扩容的增加量
* @return byte[] 扩容后的数组
*/
public static byte[] grow(byte[] src, int size) {
byte[] tmp = new byte[src.length + size];
System.arraycopy(src, 0, tmp, 0, src.length);
return tmp;
}
}
//与此问题相关的关键代码,在稍后我贴的日志可以看出,在这里只read了4380B的数据,这个while循环,只进行了一次
//何解?
while ((r = sc.read(buffer)) > 0)
日志内容:
客户端部分日志:
引用
2010-11-16 18:53:10[INFO]ClientMain.java 369■Thread-2■: 发送字节数:9793
2010-11-16 18:53:10[INFO]ClientWriter.java 166■Thread-2■: 缓冲区已满,发送数据,清空缓冲区
服务端日志:
引用
2010-11-16 18:51:16[INFO]Reader2.java 121■main■: 发现数据:9600
2010-11-16 18:51:16[INFO]Reader2.java 121■main■: 发现数据:4441
2010-11-16 18:51:16[INFO]Reader2.java 138■main■: 共读了:14041, freeMemory=2936k,totalMemory=5056k
2010-11-16 18:51:16[INFO]ResultsHandler.java 271■Thread-11■: 保存了111条数据
2010-11-16 18:53:10[INFO]Reader2.java 121■main■: 发现数据:4380
2010-11-16 18:53:10[INFO]Reader2.java 138■main■: 共读了:4380, freeMemory=2090k,totalMemory=5056k
2010-11-16 18:53:10[ERROR]ResultsHandler.java 280■Thread-11■: 保存任务结果出现异常:
java.sql.BatchUpdateException: '127.0' 附近有语法错误。
at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeBatch(SQLServerStatement.java:1693)
at com.gdtec.nmt.nioserver.Handler.ResultsHandler.saveTasks(ResultsHandler.java:269)
at com.gdtec.nmt.nioserver.Handler.ResultsHandler.onRead(ResultsHandler.java:105)
at com.gdtec.nmt.nioserver.Notifier.fireOnRead(Notifier.java:71)
at com.gdtec.nmt.nioserver.io.Reader2.processRequest(Reader2.java:175)
at com.gdtec.nmt.nioserver.io.Reader2.run(Reader2.java:51)
2010-11-16 18:53:10[INFO]Reader2.java 121■main■: 发现数据:5413
2010-11-16 18:53:10[INFO]Reader2.java 138■main■: 共读了:5413, freeMemory=1056k,totalMemory=5056k
2010-11-16 18:53:10[ERROR]Reader2.java 180■Thread-11■: java.lang.NumberFormatException: For input string: "0A4E-D6DF-27D8-C444E3A3EA67'"
2010-11-16 18:53:21[INFO]Reader2.java 121■main■: 发现数据:4380
2010-11-16 18:53:21[INFO]Reader2.java 121■main■: 发现数据:4493
2010-11-16 18:53:21[INFO]Reader2.java 138■main■: 共读了:8873, freeMemory=1306k,totalMemory=5056k
2010-11-16 18:53:21[INFO]ResultsHandler.java 271■Thread-11■: 保存了68条数据
几乎每次出错都是因为服务端只读了4380B,从日志中可以看到,客户端发送了9793B,服务端第一次readable只发现了4380B,下一次的readable发现5413B,加起来刚好等于客户端发送的9793B,由此可知,一次readable事件并不一定能将io流读取完,可能会分多次readable事件读取。这又让我感觉越来越不理解nio底层原理了,reactor触发readable事件的条件是什么?
分享到:
相关推荐
一个NIO服务端,客户端的例子
JAVA.NIO 异步长连接客户端与服务端都有,大家可以看看,另不知道怎么样将客户端读取的BUFF后的数据进行处理可以给出修改吗?
Java NIO非阻塞服务端与客户端相互通信 每行代码都有注释, 看完后,会让你对非阻塞有一清楚的认识.
该JAVA NIO项目包含server服务端完整项目源码、client客户端项目工程源码。
NIO实现客户端与客户端之间的通信,通过中心服务进行消息转发。
socket编程 java代码示例,客户端发送消息,服务端接收
这是一个用java NIO 实现的简单多线程服务器有客户端例子,仅供学习参考。
Java nio 超大数据文件 超大数据文件Java nio 超大数据文件 超大数据文件Java nio 超大数据文件 超大数据文件Java nio 超大数据文件 超大数据文件Java nio 超大数据文件 超大数据文件Java nio 超大数据文件 超大数据...
服务器在JAVA上,客户端为C++,实现多人聊天消息转发,服务器也可以给客户端发送消息。附上项目完整源代码,client包负责消息发送,sensor包负责消息接收
利用socketNIO实现的多客户端聊天室,非阻塞式IO,java代码编写,使用方法:先启动服务端代码再启动客户端代码,可启动多个客户端代码。若使用多个电脑启动客户端,需在客户端代码中更改一下ip地址。
JAVA.NIO 异步长连接客户端与服务端都有,大家可以看看,另不知道怎么样将客户端读取的BUFF后的数据进行处理可以给出修改吗?
由于NIO并无按行读写数据的功能,试着实现了下,并与普通IO进行了对比。
多线程NIO客户端实例
客户端: package com.wcc.a_tcpnio; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import ...
详情查看博客地址详情查看博客地址详情查看博客地址详情查看博客地址详情查看博客地址http://blog.csdn.net/g290095142/article/details/77848088
如果 到达 fbb的结尾 还没有结束,就再通过nio读取一段字节,继续处理。 由于对于本程序 116个字节以上的行才有意义,所以 在next实现方法中,有对 116 长度的判断,否则返回 null 修正了之前版本中的问题: 修正...
在CMD运行模式下实现的JAVA异步通信,采用Java.nio包,使用线程实现
用java编写的nio通信的例子,nio是io编程的新版本,比io较流行。同时本例子是适用socket通信的。可以在此基础上,添加您的个人应用。本例子适用于:java通信的学习者,android平台通信的学习者。
本类,是专门为了处理大文件... 如果 到达 fbb的结尾 还没有结束,就再通过nio读取一段字节,继续处理。 由于对于本程序 116个字节以上的行才有意义,所以 在next实现方法中,有对 116 长度的判断,否则返回 null
3.读消息消费者线程池中派遣一个就绪线程从客户端读取数据包DataPacket,并触发onReaded事件 4.开发人员在onReaded事件编写逻辑代码,并且决定读入的数据包的去向(返回给客户端,还是结束) 5.若在onReaded事件里,...