`
cuisuqiang
  • 浏览: 3935683 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
3feb66c0-2fb6-35ff-968a-5f5ec10ada43
Java研发技术指南
浏览量:3650303
社区版块
存档分类
最新评论

ByteBuffer 到底怎么用?网络编程中一点总结!

    博客分类:
  • JDK
阅读更多

做tcp网络编程,要解析一批批的数据,可是数据是通过Socket连接的InputStream一次次读取的,读取到的不是需要转换的对象,而是要直接根据字节流和协议来生成自己的数据对象。

按照之前的编程思维,总是请求然后响应,当然Socket也是请求和响应,不过与单纯的请求响应是不同的。

这里Socket连接往往是要保持住的,也就是长连接,然后设置一个缓冲区,网络流不断的追加到缓冲区。然后后台去解析缓冲区的字节流。

如图所示,网络的流一直在传递,我们收到也许是完成的数据流,也可能是没有传递完的。这里就需要监视管道,不断读取管道中的流数据,然后向缓冲区追加。程序从头开始解析,如果目前缓冲区包含了数据,则解析,没有则放弃继续读取管道流。

就算管道中包含了数据,也不一定包含了完成的数据。例如,100个字节是一个数据体,可是目前缓冲区内包含了120个字节,这就是说缓冲区包含了一条数据,但是还有没有传递完的字节流。那么就要把前100个字节拿出来解析,然后从缓冲区清除这100个字节。那缓冲区就剩下20个字节了,这些数据可能在下次流中补充完成。

如何建立缓冲?

/**
 * 全局MVB数据缓冲区 占用 1M 内存
 */
private static ByteBuffer bbuf = ByteBuffer.allocate(10240);

/**
 * 线程安全的取得缓冲变量
 */
public static synchronized ByteBuffer getByteBuffer() {
	return bbuf;
}

 写一个Socket客户端,该客户端得到Socket连接,然后读取流,一直向缓冲中追加字节流,每次追加后调用一个方法来解析该流

public void run() {
	Socket socket = GlobalClientKeep.mvbSocket;
	if (null != socket) {
		try {
			// 获得mvb连接引用
			OutputStream ops = socket.getOutputStream();
			InputStream ips = socket.getInputStream();
			while (true) {
				if (null != ops && null != ips) {
					// 接收返回信息
					byte[] bt = StreamTool.inputStreamToByte(ips);
					ByteBuffer bbuf = GlobalCommonObjectKeep.getByteBuffer();
					// 设置到缓冲区中
					bbuf.put(bt);
					// ////////////////////////////////////////////////////////////////////////
					// 拆包解析方法
					splitByte(ops);
					ops.flush();
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	} else {
		// 如果连接存在问题,则必须重新建立
		GlobalClientKeep.initMvbSocket();
	}
}

 

关于如何读取流,我有一篇博客专门讲解了所以这里是直接调用方法

byte[] bt = StreamTool.inputStreamToByte(ips);

 那么解析方法是如何做的?

解析方法首先获得该缓冲中的所有可用字节,然后判断是否符合一条数据条件,符合就解析。如果符合两条数据条件,则递归调用自己。其中每次解析一条数据以后,要从缓冲区中清除已经读取的字节信息。

/**
 * @说明 拆包解析方法
 */
public static void splitByte(OutputStream ops) {
	try {
		ByteBuffer bbuf = GlobalCommonObjectKeep.getByteBuffer();
		int p = bbuf.position();
		int l = bbuf.limit();
		// 回绕缓冲区 一是将 curPointer 移到 0, 二是将 endPointer 移到有效数据结尾
		bbuf.flip();
		byte[] byten = new byte[bbuf.limit()]; // 可用的字节数量
		bbuf.get(byten, bbuf.position(), bbuf.limit()); // 得到目前为止缓冲区所有的数据
		// 进行基本检查,保证已经包含了一组数据
		if (checkByte(byten)) {
			byte[] len = new byte[4];
			// 数组源,数组源拷贝的开始位子,目标,目标填写的开始位子,拷贝的长度
			System.arraycopy(byten, 0, len, 0, 4);
			int length = StreamTool.bytesToInt(len); // 每个字节流的最开始肯定是定义本条数据的长度
			byte[] deco = new byte[length]; // deco 就是这条数据体
			System.arraycopy(byten, 0, deco, 0, length);
			// 判断消息类型,这个应该是从 deco 中解析了,但是下面具体的解析内容不再啰嗦
			int type = 0;
			// 判断类型分类操作
			if (type == 1) {
				
			} else if (type == 2) {
				
			} else if (type == 3) {
				
			} else {
				System.out.println("未知的消息类型,解析结束!");
				// 清空缓存
				bbuf.clear();
			}
			// 如果字节流是多余一组数据则递归
			if (byten.length > length) {
				byte[] temp = new byte[bbuf.limit() - length];
				// 数组源,数组源拷贝的开始位子,目标,目标填写的开始位子,拷贝的长度
				System.arraycopy(byten, length, temp, 0, bbuf.limit() - length);
				// 情况缓存
				bbuf.clear();
				// 重新定义缓存
				bbuf.put(temp);
				// 递归回调
				splitByte(ops);
			}else if(byten.length == length){ // 如果只有一条数据,则直接重置缓冲就可以了
				// 清空缓存
				bbuf.clear();
			}
		} else {
			// 如果没有符合格式包含数据,则还原缓冲变量属性
			bbuf.position(p);
			bbuf.limit(l);
		}
	} catch (Exception e) {
		e.printStackTrace();
	}
}

 

代码只是一个参考,主要讲解如何分解缓冲区,和取得缓冲区的一条数据,然后清除该数据原来站的空间。

至于缓冲区的属性,如何得到缓冲区的数据,为什么要清空,bbuf.flip();是什么意思。下面来说一下关于ByteBuffer 的一下事情。

 ByteBuffer 中有几个属性,其中有两个很重要。limit和 position。position开始在0,填充数据后等于数据的长度,而limit是整个缓冲可用的长度。bbuf.flip();之后,position直接变为0,而limit直接等于position。JDK源码如下:

    /**
     * Flips this buffer.  The limit is set to the current position and then
     * the position is set to zero.  If the mark is defined then it is
     * discarded.
     *
     * <p> After a sequence of channel-read or <i>put</i> operations, invoke
     * this method to prepare for a sequence of channel-write or relative
     * <i>get</i> operations.  For example:
     *
     * <blockquote><pre>
     * buf.put(magic);    // Prepend header
     * in.read(buf);      // Read data into rest of buffer
     * buf.flip();        // Flip buffer
     * out.write(buf);    // Write header + data to channel</pre></blockquote>
     *
     * <p> This method is often used in conjunction with the {@link
     * java.nio.ByteBuffer#compact compact} method when transferring data from
     * one place to another.  </p>
     *
     * @return  This buffer
     */
    public final Buffer flip() {
	limit = position;
	position = 0;
	mark = -1;
	return this;
    }

 这样,在position和limit之间的数据就是我们要的可用数据。

但是position和limit是ByteBuffer在put和get时需要的属性,所以在使用后要么还原,要么像上面代码一样,清除一些字节信息然后重置

 ByteBuffer 的get和put不是我们平常的取值和设值一样,他会操纵一些属性变化。

 

请您到ITEYE看我的原创:http://cuisuqiang.iteye.com

或支持我的个人博客,地址:http://www.javacui.com

 

8
2
分享到:
评论
6 楼 allen.lei 2015-10-27  
博主好,
看到您的帖子,http://cuisuqiang.iteye.com/blog/1443212
这个里面的 ByteBuffer bbuf = GlobalCommonObjectKeep.getByteBuffer(); 这个GlobalCommonObjectKeep有吗?具体做什么操作啊?

求GlobalCommonObjectKeep。
多谢

我的qq 25897555
邮箱 25897555@qq.com

另外,我的tcp socket长链接数据收不全,接收貌似分包了:
我的客户端接收代码:
while (!done && thread == readerThread){
while(connection.byreader.available()>3){
Log.d("summercoolAPP", "PacketReader 2");

int length = connection.byreader.read();

Log.d("summercoolAPP", "PacketReader 3 length="+length);
if (connection.byreader.available() < length){
Log.d("summercoolAPP", "PacketReader 5");
return;
}
Log.d("summercoolAPP", "PacketReader 6");
//
byte[] buffer = new byte[length];
try {
connection.byreader.read(buffer);
Object recMsg = null;
try{
recMsg = SERIALIZER.deserialize(buffer);
}catch(Exception e){
recMsg = null;
e.printStackTrace();
}
if (recMsg == null) {
continue;
}
if (recMsg instanceof byte[]) {
recMsg = CustomSerializer.decode((byte[]) recMsg);

}
System.out.println(recMsg);
Log.d("summercoolAPP", "PacketReader 7 recMsg="+recMsg);
//
// 如果接收到Server发送的握手反馈消息,则回送握手完成消息
if (recMsg instanceof HandshakeAck) {
Log.d("summercoolAPP", "PacketReader 8");
// 构造握手完成消息包
HandshakeFinish finish = new HandshakeFinish(groupName);
byte[] hfBytes = SERIALIZER.serialize(finish);
// 发送
connection.sendSpecial4HandshakeAck(new String(hfBytes,"UTF-8"));
}

else if(recMsg instanceof Heartbeat){
byte[] htBytes = SERIALIZER.serialize(Heartbeat.getSingleton());
connection.sendSpecial4HandshakeAck(new String(htBytes,"UTF-8"));
}

else if(recMsg instanceof HandshakeRequest){
}
else if(recMsg instanceof HandshakeFinish){

}
else{//具体业务parser4bs
parser.setInput(new StringReader((String)recMsg));
parser4bs();
}
} catch (Exception e) {
Log.d("summercoolAPP", "PacketReader 9");
e.printStackTrace();
done = true;
}
}
}

服务端:
发送字节流,
格式是:
length(4字节)+ID(1字节)+content
length的值是ID1字节+content的长度

我的服务端发送一段完整业务数据给客户端时,客户端接收是分段的,且没啥规律
对了,我的服务端在发送数据时,起了一个现成定时去ping客户端,也会发送ping包,我在客户端接收业务数据时,也看到了分包的中间穿插了ping包,数据
求如何接收完整业务数据啊?
多谢了
5 楼 allen.lei 2015-10-27  

这个里面的 ByteBuffer bbuf = GlobalCommonObjectKeep.getByteBuffer();   这个GlobalCommonObjectKeep有吗?具体做什么操作啊?

求GlobalCommonObjectKeep。
多谢

我的qq 25897555
邮箱 25897555@qq.com
4 楼 cuisuqiang 2012-03-09  
canghailan 写道
现在JavaEye的大牛基本都归隐了。。。

只有我们这些小辈的瞎折腾了
3 楼 canghailan 2012-03-08  
现在JavaEye的大牛基本都归隐了。。。
2 楼 cuisuqiang 2012-03-08  
canghailan 写道
引用
/** 
 * 全局MVB数据缓冲区 占用 1M 内存 
 */  
private static ByteBuffer bbuf = ByteBuffer.allocate(10240);  
  
/** 
 * 线程安全的取得缓冲变量 
 */  
public static synchronized ByteBuffer getByteBuffer() {  
    return bbuf;  
}

获取方法加了synchronized,但是获取的ByteBuffer不是线程安全的。
想要线程安全的话:
(1)使用ThreadLocal
(2)每次分配重新分配
(3)每次使用bbuf时synchronized或者加锁

这么牛逼的人物竟然是新申请的号,难道是专门出来灭我们的
1 楼 canghailan 2012-03-07  
引用
/** 
 * 全局MVB数据缓冲区 占用 1M 内存 
 */  
private static ByteBuffer bbuf = ByteBuffer.allocate(10240);  
  
/** 
 * 线程安全的取得缓冲变量 
 */  
public static synchronized ByteBuffer getByteBuffer() {  
    return bbuf;  
}

获取方法加了synchronized,但是获取的ByteBuffer不是线程安全的。
想要线程安全的话:
(1)使用ThreadLocal
(2)每次分配重新分配
(3)每次使用bbuf时synchronized或者加锁

相关推荐

Global site tag (gtag.js) - Google Analytics