Polling and Sockets
一个线程中有多个sokect,同时需要收发数据,zmq提供polling sockets实现,不用在recv()时阻塞。
下面这个例程中创建一个command server来告诉worker何时退出,worker从Publisher获得订阅并打印,('exit'时退出)。
1.PUSH server ,命令服务
import zmq import time import sys import random from multiprocessing import Process def server_push(port="5556"): context = zmq.Context() socket = context.socket(zmq.PUSH) socket.bind("tcp://*:%s" % port) print "Running server on port: ", port # serves only 5 request and dies for reqnum in range(10): if reqnum < 6: socket.send("Continue") else: socket.send("Exit") break time.sleep (1)
2.PUB server,发布消息
def server_pub(port="5558"): context = zmq.Context() socket = context.socket(zmq.PUB) socket.bind("tcp://*:%s" % port) publisher_id = random.randrange(0,9999) print "Running server on port: ", port # serves only 5 request and dies for reqnum in range(10): # Wait for next request from client topic = random.randrange(8,10) messagedata = "server#%s" % publisher_id print "%s %s" % (topic, messagedata) socket.send("%d %s" % (topic, messagedata)) time.sleep(1)
3.客户端
def client(port_push, port_sub): context = zmq.Context() socket_pull = context.socket(zmq.PULL) socket_pull.connect ("tcp://localhost:%s" % port_push) print "Connected to server with port %s" % port_push socket_sub = context.socket(zmq.SUB) socket_sub.connect ("tcp://localhost:%s" % port_sub) socket_sub.setsockopt(zmq.SUBSCRIBE, "9") print "Connected to publisher with port %s" % port_sub # 初始化Poller poller = zmq.Poller() poller.register(socket_pull, zmq.POLLIN) poller.register(socket_sub, zmq.POLLIN) # Work on requests from both server and publisher should_continue = True while should_continue: socks = dict(poller.poll()) if socket_pull in socks and socks[socket_pull] == zmq.POLLIN: message = socket_pull.recv() print "Recieved control command: %s" % message if message == "Exit": print "Recieved exit command, client will stop recieving messages" should_continue = False if socket_sub in socks and socks[socket_sub] == zmq.POLLIN: string = socket_sub.recv() topic, messagedata = string.split() print "Processing ... ", topic, messagedata
运行
if __name__ == "__main__": # Now we can run a few servers server_push_port = "5556" server_pub_port = "5558" Process(target=server_push, args=(server_push_port,)).start() Process(target=server_pub, args=(server_pub_port,)).start() Process(target=client, args=(server_push_port,server_pub_port,)).start()
看一下api
poll
(timeout=None)
Poll the registered 0MQ or native fds for I/O.
timeout (float, int) – The timeout in milliseconds. If None, no timeout (infinite). This is in milliseconds to be compatible with select.poll() . |
events – The list of events that are ready to be processed. This is a list of tuples of the form (socket, event) , where the 0MQ Socket or integer fd is the first element, and the poll event mask (POLLIN, POLLOUT) is the second. It is common to callevents = dict(poller.poll()) , which turns the list of tuples into a mapping ofsocket : event . |
list of tuples |
至于POLLIN,POLLOUT:
- flag (int, default=POLLIN|POLLOUT) – 0MQ poll flags. If flag|POLLIN, recv events will be flushed. If flag|POLLOUT, send events will be flushed. Both flags can be set at once, which is the default.
如此这般,不停的轮询注册在poller中的sockter状态,类似与java nio中将channel注册到selector。发现某个socket数据接收就绪(POLLIN),执行业务代码。
但是,用'if '的处理方式有点丑,所以pyzmq提供实现tornador ioloop的IOStream 的类:ZMQStream 来处理polling event,并且这样就可以使用回调。
首先,安装tornador : pip install tornado
然后,改造上面的代码:
import zmq import time import sys import random from multiprocessing import Process from zmq.eventloop import ioloop, zmqstream ioloop.install()ioloop.install()用来告诉tornador.ioloop.IOLoop使用zmq的poller。
相关推荐
Golongpoll 是 golang HTTP 的 longpolling 库,可以使构建 web pub-sub 更加容易。基本用法:import "github.com/jcuga/golongpoll" // This launches a goroutine and creates channels for all the ...
资源分类:Python库 所属语言:Python 资源全名:polling2-0.4.7.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059
lpc17xx_libcfg.h: Library configuration file - include needed driver library for this example makefile: Example's makefile (to build with GNU toolchain)adc_burst_test.c: Main program
该文章主要描述了对带有重传的轮询系统的改进与仿真实现。
GD32 MCU 开发板 Workshop 报告:GD32E231C-Start开发板按键长短按识别-GPIO_Key_Polling_mode.zip
基于STM32F103ZETX芯片,使用STM32CubeMx实现SDIO轮询方式读写数据的源文件。
Peter's Polling Package Highly customizable Polling/Voting controls for asp.net web sites
Learning about different client/server communication models including but not limited to client polling, Server-Sent Events and WebSockets. Efficiently use WebSockets, Server-Sent Events, and JSON in ...
两个12位带缓冲的DAC通道可以用于转换2路数字信号成为2路模拟电压信号并输出。这项功能内部是通过集成的电阻串和反向的放大器实现。这个双数字接口支持下述功能:参照2009年3月 STM32F103xCDE数据手册 英文第5版
Source Code and Errata Availability Acknowledgments Part 1: Introduction and TCP/IP Chapter 1. Introduction Section 1.1. Introduction Section 1.2. A Simple Daytime Client Section 1.3. ...
该程序对研究轮询算法的人很有帮助,很值得借鉴!
HTML 5 Web Sockets is a powerful and effective technique for real-time information processing. There exists many techniques such as Poling, Long Poling...Here goes a comparison of polling vs Web Sockets.
s brilliant and elegant tour of the modern science slash art of forecasting shows what happens when Big Data meets human nature Baseball weather forecasting earthquake prediction economics and polling...
SDMMC驱动SD卡程序,见附件,
libgdx PollingTest Polling
https://events.static.linuxfound.org/sites/events/files/slides/lemoal-nvme-polling-vault-2017-final_0.pdf
官方离线安装包,测试可用。使用rpm -ivh [rpm完整包名] 进行安装
Learn how to use long polling to "push" content from your server to browsers Create an application using the Tornado web server that makes sense of massive amounts of streaming content Understand the ...
Simple Polling System in Python using Django Framework