`

【Python真的很强大】多线程的使用

阅读更多

需求:  我们需要监控客户端是否已经掉线, 采用的做法是客服端定时发送消息到 socket server.  我们在socket server确认是否已经收到心跳消息,如果超时(timeout)则会剔除相应的客户端。

          为此,我们准备在python抽象出原型雏形。熟悉Javascript的可能知道定时器函数setTimeoutsetInterval. 

我们在Python(2.7)也可有类似功能: 如下:

    

def set_interval(func,extraArgs,sec):
    def func_wrapper():
        func(*extraArgs)
    t = threading.Timer(sec, func_wrapper)
    t.start()
    return t

 

下一步,为每一个客服端连接(session)建立一个AliveThread来处理,第一次客户端连接来的时候,push msg(msgId, timestamp,...); 第二次来的时候,pop(delete )msg and  append msg. 为此,有了

MsgManager这样的消息汇总,我们就能在一个monitor function中处理掉线.

 

 

为此,我们又需要类似java里面的Vector,ConcurrentHashMap来确保MsgManager线程操作安全。

如下:

 

 

from multiprocessing import Process, Manager
manager = Manager()
MsgManager = manager.list()

 

 

好了,剩下的很简单了:

 

 

set_interval(monitor,[self._msg],NO_MSG_RECEIVED_AFTER_INTERVAL)
def monitor(tmpMsg):
     log.info('kick out')

 

 

需求2: 为了给客服端最快的响应,我们从多个服务端获取信息,最快返回的信息将返回给客户。

为此,我们用 package concurrent:

 

# python2.7.x 安装 https://pypi.python.org/packages/source/f/futures/futures-2.1.6.tar.gz#md5=cfab9ac3cd55d6c7ddd0546a9f22f453
# python3.x 自带
from concurrent import futures
import urllib2

SERVICES = ['http://host1/service',
        'http://host2/service',
        'http://host3/service',
        'http://host4/service']

def load_url(url, timeout):
    return urllib2.urlopen(url, timeout=timeout).read()

with futures.ThreadPoolExecutor(max_workers=5) as executor:
    future_to_url = dict((executor.submit(load_url, url, 60), url)
                         for url in SERVICES)

    for future in futures.as_completed(future_to_url):
        url = future_to_url[future]
        if future.exception() is None:
            print('url: %s,content:%s' % (url, future.result()))
            break

 

0
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics