- 浏览: 227947 次
- 性别:
- 来自: 淄博
文章分类
- 全部博客 (666)
- java (6)
- android (9)
- 架构 (0)
- android游戏 (13)
- android系统 (14)
- c++ (14)
- 数据库 (10)
- javascript (2)
- 版本控制 (1)
- webservice (1)
- linux (5)
- uml (1)
- android多媒体部分 (16)
- java中的JNI (6)
- HTML5 (5)
- CSS3 (1)
- swing (13)
- 线程并发 (9)
- 分布式 (5)
- 云计算 (1)
- 通信协议 (4)
- xml (4)
- c# (1)
- lucene (0)
- ibatis (0)
- hibernate (3)
- struts1 (3)
- struts2 (4)
- jsf (0)
- spring (5)
- spring for android (0)
- 感悟 (2)
- jpa (1)
- android gis (1)
- jbpm (0)
- java设计模式 (8)
- java web (4)
- EXT js (0)
- node JS (2)
- python (3)
- c (17)
- weblogic (0)
- opencv (1)
最新评论
基于消息方式实现系统间的通信
常用的通信协议:
tcp/ip保证数数据传输的可靠性,会牺牲性能
udp/ip双方不建立联接,面是发送到网上进行传递,性能较好
系统间通信对数据的处理
同步IO常用的是
(BlockingIO)当发起读写操作时,均为阻塞方式,只有当操作完成后才会释放
资源
NIO(Non-BlockingIO)基于事件驱动的,实际上采用的reactor模式2,发起读写操作
时是非阻塞方式,linux2.6以后版本采用epoll3方式来实现NIO
异步操作方式;AIO(基于事件戏动)采用proactor模式4直接调用API的read和write
方法
读取:操作系统将可读的流传达入read方法的缓冲区,通知应用
写入:操作系统把write方法写入的流写入完毕时通知应用程序
Windows基于locp实现了AIOlinux目前只有基于epoll模拟实现的AIO(只有jdk7才支持AIO)
实现方式:
TCP/IP+BIO
采用socket和serverSocket来实现,
客户端的关键代码
publicvoidclient()throwsUnknownHostException,IOException{
Socketsocket=newSocket("服务器ip/域名",123);
//读取服务器返回流
BufferedReaderin=newBufferedReader(newInputStreamReader(socket.getInputStream()));
//向服务器写入的流
PrintWriterout=newPrintWriter(socket.getOutputStream());
//向服务器发送信息
out.println("hello");
//阻塞读取服务端的返回信息
in.readLine();
}
服务器端的关键代码
publicvoidserver()throwsIOException{
//监听的端口
ServerSocketserverSocket=newServerSocket(123);
//设置超时时间
//serverSocket.setSoTimeout(11);
//接收客户端建立联接的请求
//也是通过socket.getInputStream()和socket.getOutputStream();
//来进行读写操作,该方法会一直阻塞到有发送建立联接的请求
Socketsocket=serverSocket.accept();
}
一般是采用联接池的方式来维护socket(容易出现服务器持掉的现象),所以要根据情况设置超时时间,为了能够同时接收多个连接请求,就要在accept取得socket后,将此socket放到一个线程中单独处理,通常称为一连接一线程,防止资源耗尽,必须限制线程数量,这就造成了BIO的情况下服务端能支撑的连接数是有限的
TCP/IP+NIO
采用channel和selector来实现
publicvoidclient()throwsIOException{
SocketChannelsocketChannel=SocketChannel.open();
//设置为非阻塞模式
socketChannel.configureBlocking(false);
//接着返回false,表示正在建立连接
socketChannel.connect(newSocketAddress(){
});
Selectorselector=Selector.open();
//注册selector和感受性趣的连接事件
socketChannel.register(selector,SelectionKey.OP_CONNECT);
//阻塞至有感性趣的事件发生,直到达超时时间
//如果希望一直等,就调用无参的select方法
intnKeys=selector.select();
//如果不阻塞直接返回目前是否有感性趣的事件发生
//selector.selectNow();
//nKeys>0说明有感兴趣的事发生
SelectionKeysKey=null;
if(nKeys>0){
Set<SelectionKey>keys=selector.selectedKeys();
for(SelectionKeykey:keys){
//
if(key.isConnectable()){
SocketChannelsc=(SocketChannel)key.channel();
//非阻塞式
sc.configureBlocking(false);
//注册感性趣的IO事件,通常不常注册写事件
//在缓冲区未满的情况,是一直可写的
sc.register(selector,SelectionKey.OP_READ);
//完成连接的建立
sc.finishConnect();
}
//有流可以读取
elseif(key.isReadable()){
ByteBufferbuffer=ByteBuffer.allocate(1024);
SocketChannelsc=(SocketChannel)key.channel();
intreadBytes=0;
try{
intret=0;
try{
while((ret=sc.read(buffer))>0){
readBytes+=ret;
}
}finally{
buffer.flip();
}
}finally{
if(null!=buffer){
buffer.clear();
}
}
}elseif(key.isWritable()){
//取消对OP_WRITE事件的注册
key.interestOps(key.interestOps()&(~SelectionKey.OP_WRITE));
SocketChannelsc=(SocketChannel)key.channel();
//阻塞操作,直到写入缓冲区或出现异常,返回的为成功写入的字节数
//当发送缓冲区已满,返回0
intwrittenedSize=sc.write(ByteBuffer.allocate(1024));
//如果未写入,则继续注册感性趣的事件
if(writtenedSize==0){
key.interestOps(key.interestOps()|SelectionKey.OP_WRITE);
}
}
selector.selectedKeys().clear();
}
//对于要写入的流
intwSize=socketChannel.write(ByteBuffer.allocate(1024));
}
}
publicvoidserver()throwsIOException{
ServerSocketChannelssc=ServerSocketChannel.open();
ServerSocketserverSocket=ssc.socket();
//绑定监听的接口
serverSocket.bind(newInetSocketAddress(123));
ssc.configureBlocking(false);
//注册感兴趣的连接建立事件
ssc.register(Selector.open(),SelectionKey.OP_ACCEPT);
//和客户端同样的方式对selector.select进行轮询,只是添加了一个如下方法
Set<SelectionKey>keys=selector.selectedKeys();
for(SelectionKeykey:keys){
//
if(key.isAcceptable()){
ServerSocketChannelserverSocketChannel=(ServerSocketChannel)key.channel();
SocketChannelsocketChannel=serverSocketChannel.accept();
if(null==socketChannel){
continue;
}
socketChannel.configureBlocking(false);
socketChannel.register(selector,SelectionKey.OP_READ);
}
}
}
UDP/IP+BIO
采用socket由于UDP/IP是无连接的,要进行双向通信,必须两端都成为UDPserver
基于datagramSocket和DatagramPacket
客户端和服务器端的代码如下:
//如果双向通信,必须启动一个监听端口,承担服务器的职责
publicvoidclentOrServer(){
try{
DatagramSocketserverSocket=newDatagramSocket(123);
byte[]buffer=newbyte[65507];
DatagramPacketreceivePackep=newDatagramPacket(buffer,buffer.length);
DatagramSocketsocket=newDatagramSocket();
DatagramPacketpacket=newDatagramPacket(datas,datas.length,server,port);
//阻塞发送pack到指定的服务器和端口,
//网络io异常抛出ioexception,
//连不上目标端口porUnreachableException
socket.send(packet);
//阻塞并同步读取流信息,如接收到的流信息比packet长度长
//则删除更长信息,
serverSocket.setSoTimeout(100);//设置读取流的超时时间
serverSocket.receive(receivePackep);
}catch(SocketExceptione){
e.printStackTrace();
}
}
UDP/IP+NIO
采用datagrameChannel和byteBuffer来实现
//一对一的系统间的通信
publicvoidclent(){
try{
DatagramChannelreceiveChannel=DatagramChannel.open();
//非阻塞模式
receiveChannel.configureBlocking(false);
DatagramSocketsocket=receiveChannel.socket();
socket.bind(newInetSocketAddress(123));
Selectorselector=Selector.open();
receiveChannel.register(selector,SelectionKey.OP_ACCEPT);
//可象TCP/IP+NIO中对selector的遍历一样
DatagramChannelsendChannel=DatagramChannel.open();
sendChannel.configureBlocking(false);
SocketAddresstarget=newInetSocketAddress("127.0.0.1",123);
sendChannel.write(ByteBuffer.allocate(1024));
}catch(IOExceptione){
e.printStackTrace();
}
}
try{
DatagramChannelreceiveChannel=DatagramChannel.open();
//非阻塞模式
receiveChannel.configureBlocking(false);
DatagramSocketsocket=receiveChannel.socket();
socket.bind(newInetSocketAddress(123));
Selectorselector=Selector.open();
receiveChannel.register(selector,SelectionKey.OP_ACCEPT);
//可象TCP/IP+NIO中对selector的遍历一样
DatagramChannelsendChannel=DatagramChannel.open();
sendChannel.configureBlocking(false);
SocketAddresstarget=newInetSocketAddress("127.0.0.1",123);
sendChannel.write(ByteBuffer.allocate(1024));
}catch(IOExceptione){
e.printStackTrace();
}
}
基于开源框架实现消息方式的系统间通信
Mina是apache的开源项目,基于是javaNIO构建
关键类为:
loConnector配置客户端的消息处理器io事件处理线程池消息发送/接收的
Filterchain
loAcctptor配置服务器端的io事件处理线程池消息发送/接收的filterChain
loHandler作为mina和应用的接口底层发生事件mina会通知应用实现的
handler
loSession类似于socketChannel的封装,可以进行连接的控制及流信息的输出
它采用filterchain的方式封装消息发送和接收
基于远程调用方式实现系统间的通信
这种方式主要用来实现基于RMI和webservice的应用
RMI(remotemethodinvocation)是java用于实现远程调用的重要机制
Webservice
基于开源框架实现远程调用方式的系统间通信
SpringRMI
相关推荐
Java分布式应用学习笔记01分布式Java应用和SOA
Java分布式学习笔记01分布式Java应用
Java分布式应用学习笔记
Java分布式应用学习笔记07线程池应用
Java分布式应用学习笔记02再谈JVM
Java分布式应用学习笔记-谈JVM
Java分布式应用学习笔记09JMX-MBean的介绍
java分布式应用学习笔记05多线程下的并发同步器.pdf
Java分布式应用学习笔记06浅谈并发加锁机制分析
Java分布式应用学习笔记04JDK的并发包的集合总结
Java分布式应用学习笔记08JMX规范与各种监控场景
Java分布式应用学习笔记05多线程下的并发同步器
Java分布式应用学习笔记08JMX规范与各种监控场景.pdf
NULL 博文链接:https://qi04072008.iteye.com/blog/1683834
Java分布式应用学习笔记03JVM对线程的资源同步和交互机制
java
Java分布式应用学习笔记04JDK的并发包的集合总结
JAVA学习笔记最新ppt版1 JAVA 是一种面向对象的程序语言,具有更高的跨平台可能性。它是 Sun 公司 GreenProject 中撰写 Star7 应用程序的一个程序语言,由 James Gosling 创造。Java 的特性包括简单、面向对象、...
│ │ 第一章第1节: 04-mybatis基础应用之mapper代理开发方式.mp4 │ │ 第一章第1节: 06-mybatis基础应用之全局配置文件.mp4 │ │ 第一章第1节: 08-mybatis基础应用之输入映射2.mp4 │ │ 第一章第1节...