关于ZeroMq(简称ZMQ)的定义、作用和强大这里就不再赘述了。总结一下它常见的几种经典模式,然后顺便提下我最近在高并发环境下使用它出现的一个异常及其解决过程吧!
(PS:图是盗的。。。。。)
一、REQ/REP模式
这是最常见的请求/响应模式。服务端作为发送方,客户端作为请求方。
服务端:
ZMQ.Context context = ZMQ.context(1); ZMQ.Socket socket = context.socket(ZMQ.REP); String url = "tcp://*:9999"; socket.bind(url); boolean wait = true; while (wait) { byte[] request; try { request = socket.recv(0); socket.send("OK".getBytes(), 1); } catch (ZMQException e) { } }
客户端:
ZMQ.Context context = ZMQ.context(1); ZMQ.Socket socket = context.socket(ZMQ.REQ); System.out.println("Connecting to hello world server..."); socket.connect("tcp://localhost:9999"); String requestString = "Hello" + " "; byte[] request = requestString.getBytes(); socket.send(request, ZMQ.NOBLOCK); byte[] reply = socket.recv(0); System.out.println("Received reply [" + new String(reply) + "]");
说明:服务端bind一个端口,客户端则connect一个ip地址(服务端所在的ip)和相应的端口。服务端通过context.socket(ZMQ.REP);表明是服务端,同理,客户端通过context.socket(ZMQ.REQ);表明是客户端。服务端为了持续监听,必须要把recv写在一个循环里。一般是while(true)!
二、PUB/SUB模式
发布/订阅模式,这种模式大家想必很熟悉,比如微博消息的推送等。
发布方作为服务端,多个订阅方作为客户端。代码略,总结一下:
发布端bind一个端口,订阅端则connect一个ip地址(服务端所在的ip)和相应的端口。服务端通过context.socket(ZMQ.PUB);表明是发布端,同理,订阅端通过context.socket(ZMQ.SUB);表明是客户端。服务端为了持续监听,必须要把recv写在一个循环里。一般是while(true)!——这里发布方作为服务端,订阅方作为客户端。
三、PUSH/PULL模式
总结:push端bind一个端口,pull端则connect一个ip地址(push端端所在的ip)和相应的端口。push端端通过context.socket(ZMQ.PUSH);表明是push端,同理,订阅端通过context.socket(ZMQ.PULL);表明是客户端。服务端为了持续监听,必须要把recv写在一个循环里。一般是while(true)!——这里push端作为服务端,pull端作为客户端。
后两种模式写法类似第一种。
---------------------------------------
高并发下,出现的一个异常:
zmq.ZError$IOException: java.io.IOException: Unable to establish loopback connection
at zmq.Signaler.make_fdpair(Signaler.java:87)
at zmq.Signaler.<init>(Signaler.java:48)
at zmq.Mailbox.<init>(Mailbox.java:55)
at zmq.Ctx.<init>(Ctx.java:132)
at zmq.ZMQ.zmq_ctx_new(ZMQ.java:225)
……………………
Caused by: java.io.IOException: Unable to establish loopback connection
at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:106)
at java.security.AccessController.doPrivileged(Native Method)
at sun.nio.ch.PipeImpl.<init>(PipeImpl.java:122)
at sun.nio.ch.SelectorProviderImpl.openPipe(SelectorProviderImpl.java:27)
at java.nio.channels.Pipe.open(Pipe.java:133)
at zmq.Signaler.make_fdpair(Signaler.java:85)
... 36 more
Caused by: java.net.SocketException: No buffer space available (maximum connections reached?): bind
at sun.nio.ch.Net.bind(Native Method)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:72)
... 41 more
说明ZMQ的连接不够!
解决办法:发现自己的一个类,是并发情况下每笔数据都会经过的类,而我把
ZMQ.Context context = ZMQ.context(1); ZMQ.Socket reg = context.socket(ZMQ.PUSH); reg.connect("tcp://localhost:xxx");
写在这个类的方法里,造成每次经过该类的该方法,都会创建一次context和socket,也许在并发情况还不是很明显的情况下,这个异常不会出现。但在高并发的情形下,频繁地创建ZMQ.Context和ZMQ.Socket并且再connect时,会造成很大的网络开销。故把该代码块放在static{...}类的静态代码块中——无论并发情况如何,只创建和连接一次。while循环里面有的代码,只能是send操作和recv操作!避免把ZMQ的Context,Socket的创建和connect操作放在循环里面。
改完之后,压力测试48w+数据,ZMQ终于没有挂了。。
相关推荐
ZeroMQ是一个很有个性的项目,它原来是定位为“史上最快消息队列”,所以名字里面有“MQ”两个字母,但是后来逐渐演变发展,慢慢淡化了消息队列的身影,改称为消息内核,或者消息层了。从网络通信的角度看,它处于...
消息队列zeromq学习的安装包之一libsodium,为学习zeromq安装部署的同学们提供帮助,zeromq是消息队列中的一种技术,安装过程中要注意版本对应。
“ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ的明确目标是...
ZeroMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。(摘自百度百科) ZMQ官方网址http://zeromq.org/ ZMQ本身只提供了C++版本的下载,...
客户端向服务端发送一个请求(REQ),然后服务端接收到请求之后给予一个答复(REP),而我的KTV系统也是这样的,播放端请求一首新歌,点播端将播放队列的队首答复回去,或者点播端请求暂停歌曲,播放端回复暂停成功与否...
在学习zeromq时,要将libzmq.dll加载到文件项目中,否则无法编译
“ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ的明确目标是...
不错的zeromq学习资料,对尽快了解zeromq的朋友很有帮助。
ZeroMQ学习资料,java项目。有一些简单的例子,可以好好的学习下。
“ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ的明确目标是...
像是一个并发式的框架。它提供的套接字可以在多种协议中传输消息,如线程 间、进程间、TCP、广播等。你可以使用套接字构建多对多的连接模式,如扇出、 发布-订阅、任务分发、请求-应答等。ZMQ 的快速足以胜任集群...
ngx_zeromq, 针对 Nginx的ZeroMQ传输 ngx_zeromq 是一个传输模块,它允许 nginx 在与上游服务器通信时使用面向ZeroMQ的传输层。它是 7层协议不可知的,这意味着它可以与任何性能良好的上游模块( 。proxy,fastcgi,...
ZeroMQ是一个开源的消息队列系统,按照官方的定义,它是一个消息通信库,帮助开发者设计分布式和并行的应用程序。 首先,我们需要明白,ZeroMQ不是传统的消息队列系统(比如ActiveMQ、WebSphereMQ、RabbitMQ等)。...
中文版ZeroMQ文档
zeromq-2.1.7zeromq-2.1.7zeromq-2.1.7zeromq-2.1.7zeromq-2.1.7zeromq-2.1.7zeromq-2.1.7zeromq-2.1.7zeromq-2.1.7zeromq-2.1.7zeromq-2.1.7zeromq-2.1.7zeromq-2.1.7zeromq-2.1.7zeromq-2.1.7zeromq-2.1.7zeromq-...
ZeroMQ是一种基于消息队列的多线程网络库,其对套接字类型、连接处理、帧、甚至路由的...ZeroMQ是网络通信中新的一层,介于应用层和传输层之间(按照TCP/IP划分),其是一个可伸缩层,可并行运行,分散在分布式系统间。
ZeroMQ指南.pdf
ZeroMQ是一个很有个性的项目,它原来是定位为“史上最快消息队列”,所以名字里面有“MQ”两个字母,但是后来逐渐演变发展,慢慢淡化了消息队列的身影,改称为消息内核,或者消息层了。从网络通信的角度看,它处于...
一个最基本的client,server通信。直接编译可用
ZeroMQ是一个开源的消息队列系统,按照官方的定义,它是一个消息通信库,帮助开发者设计分布式和并行的应用程序。 首先,我们需要明白,ZeroMQ不是传统的消息队列系统(比如ActiveMQ、WebSphereMQ、RabbitMQ等)。...