最近在学习Python,在网上遇到一哥们问一个关于用Python多线程的问题。具体就是读取一个文件中的IP信息,然利用百度地图open api获取IP所在地的经纬度,最后再将取回的结果分类为“请求成功”和“请求失败”分别存入两个文件中。由于自己也在学,所以也就拿来做了做,并分别用Python3和Java实现。
稍稍分析了一下需求能看的出来里面性能瓶颈有两个:
1、一个是查询百度地图OPEN API;
2、写磁盘文件;
于是想到把这两部分分开处理,自然的想到了生产-消费者模式来实现。
对于IP文件信息格式如下(我的测试文件有41255行数据):
61.0.0.0__61.255.255.255__亚洲____
61.157.0.0__61.157.255.255__四川____
61.139.0.0__61.139.127.255__四川____
61.130.93.0__61.130.254.255__浙江____
下面是Python3实现代码:
#coding:utf-8 import urllib.request import json import threading import queue FILE_ENCODING = "utf-8" successList = [] failList = [] successPath = r"success.txt" failPath = r"failed.txt" ipPath = "ip.txt" queryCount = 5 threadCount = 20 lock = threading.RLock() separator = "|" saveCount = 100 successIndex = 0 failIndex = 0 theCount = 0 queryQueue = queue.Queue() processedQueue = queue.Queue() class IPStruct(object): def __init__(self): self.startIP = None self.endIP = None self.address = "" self.position = "" self.status = "" class QueryIPLocation(object): _retry = 5 def query(self, objList): for obj in objList: url = self.get_url(obj)#.encode('utf-8') # print("url:%s" % url) for j in range(0, self._retry + 1): try: result = urllib.request.urlopen(url,timeout=5) except Exception: if j == self._retry: obj.status = "Failed" break continue if result.getcode() != 200: if j == self._retry: obj.status = "Failed" else: jsonRet = json.loads(result.read().decode()) if jsonRet['status'] == 0: obj.status = "SUCCESS" self.proc_ok(jsonRet, obj) break elif jsonRet['status'] == 1: obj.status = "SERVER_ERROR" break elif jsonRet['status'] == 2: obj.status = "INVALID_PARAMETER" break elif jsonRet["status"] == 5: obj.status = "INVALID_AK" break elif jsonRet["status"] == 101: obj.status = "DISABLE_SERVICE" break elif jsonRet['status'] == 345: if j == self._retry: obj.status = "MINUTE_QUERY_LIMIT" elif jsonRet["status"] == 355: obj.status = "DAY_QUERY_LIMIT" break def get_url(self, obj): base = "http://api.map.baidu.com/location/ip?ak=7E814968d7b3ee0440678cb17cb4aa29&coor=bd09ll" return base + "&ip=" + str(obj.startIP) def proc_ok(self, result, obj): point = result["content"]["point"] address = result["content"]["address"] obj.address = address obj.position = str(point['x']) + "," + str(point['y']) class ThreadUrl(threading.Thread): """Threaded Url Grab""" def __init__(self,queryQueue,processedQueue): threading.Thread.__init__(self) self.queryQueue = queryQueue self.processedQueue = processedQueue def run(self): while True: queryList = self.queryQueue.get() app = QueryIPLocation() try: app.query(queryList) for item in queryList: self.processedQueue.put(item) except Exception: print("there has error..........") raise finally: self.queryQueue.task_done() class ThreadFile(threading.Thread): """Threaded save File""" def __init__(self,queue): threading.Thread.__init__(self) self.queue = queue def run(self): global successList global failList global successIndex global failIndex global theCount while True: item = self.queue.get() if item.status == "SUCCESS": lock.acquire() try: theCount += 1 print("theCount:",theCount) successList.append(item) successIndex += 1 if successIndex == saveCount: save_result(successPath,resultSequence=successList) successIndex = 0 del successList[:] except Exception: raise finally: lock.release() else: lock.acquire() try: theCount += 1 print("theCount:",theCount) failList.append(item) failIndex += 1 if failIndex == saveCount: save_result(failPath,resultSequence=failList) failIndex = 0 del failList[:] except Exception: raise finally: lock.release() self.queue.task_done() def save_result(filePath, resultSequence=[]): """save the success result to successPath""" line = "" file = open(filePath, "a") if not file: print("error open file %s to write" % filePath) for item in resultSequence: if item.status == "SUCCESS": line += item.startIP + separator + item.endIP + separator + item.address + separator + item.position + "\n" else: line += item.startIP + separator + item.endIP + separator + item.address + separator + item.status + "\n" file.write(line) file.close() def get_ip_iter(file): ipList = [] for line in file: addr = line.split("__") count = len(addr) if count <= 2 or "." not in addr[0] or "." not in addr[1]: continue obj = IPStruct() obj.startIP = addr[0] obj.endIP = addr[1] obj.address = addr[2] ipList.append(obj) if len(ipList) == queryCount: yield ipList del ipList[:] else: if len(ipList) > 0: yield ipList def main(): print("start read file") ipFile = open(ipPath,"r",encoding="utf-8") if not ipFile: print("errror open file %s" % ipPath) return iterIpFile = get_ip_iter(ipFile) for i in range(threadCount): t = ThreadUrl(queryQueue,processedQueue) t.setDaemon(True) t.start() for queryList in iterIpFile: queryQueue.put(list(queryList)) for i in range(2): dt = ThreadFile(processedQueue) dt.setDaemon(True) dt.start() queryQueue.join() processedQueue.join() save_result(successPath,resultSequence=successList) save_result(failPath,resultSequence=failList)
使用 time python3 ***.py 执行:
real 15m46.962s user 0m38.042s sys 1m0.162s
可以看到运行时间15分46秒
由于自己以前一直学的是Java,随即就对用Python和用Java的多线程来实现谁的执行速度会更高一些产生一些性趣,索性就会就Java也实现了同样的需求,并使用了同样的线程数。
以下是JAVA实现的代码:
package test; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import net.sf.json.JSONObject; import util.HttpUtil; import util.JSONUtil; public class IPQueryTest { public static void main(String[] args) throws InterruptedException { long start = System.currentTimeMillis(); BlockingQueue<IPStruct> fileQueue = new LinkedBlockingQueue<IPStruct>(); BlockingQueue<IPStruct> ipQueue = new LinkedBlockingQueue<IPStruct>(); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new ThreadReadFile(fileQueue)); for(int i=0;i<20;i++){ exec.execute(new ThreadQueryIPLocation(fileQueue,ipQueue)); } for(int i=0;i<2 ;i++){ exec.execute(new ThreadWriteFile(ipQueue)); } while(true){ Thread.sleep(10); if(fileQueue.size() == 0 && ipQueue.size() == 0){ ThreadWriteFile.saveResult(ThreadWriteFile.SUCC_PATH, ThreadWriteFile.succIPStructs); ThreadWriteFile.saveResult(ThreadWriteFile.FAIL_PATH, ThreadWriteFile.failIPStructs); long stend = System.currentTimeMillis(); System.out.println("total cost:" + (stend - start)); break; } /*else{ System.out.println("fileQueue size:" + fileQueue.size() + ",ipQueue size:" + ipQueue.size()); }*/ } } } class IPStruct { private String startIP; private String endIP; private String address; private String position; private String status; public String getStartIP() { return startIP; } public void setStartIP(String startIP) { this.startIP = startIP; } public String getEndIP() { return endIP; } public void setEndIP(String endIP) { this.endIP = endIP; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } public String getPosition() { return position; } public void setPosition(String position) { this.position = position; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } } class ThreadReadFile implements Runnable{ private final BlockingQueue<IPStruct> queue; private static final String FILE_PATH = "D:\\ip.txt"; ThreadReadFile(BlockingQueue<IPStruct> q){ this.queue = q; } @Override public void run() { try{ produceIPS(queue); }catch(InterruptedException ex){ ex.printStackTrace(); } } public void produceIPS(BlockingQueue<IPStruct> queue)throws InterruptedException{ File file = null; BufferedReader reader = null; try{ file = new File(FILE_PATH); reader = new BufferedReader(new FileReader(file)); String line = null; IPStruct ipStruct = null; while((line = reader.readLine()) != null){ ipStruct = formatIPStruct(line); if(null != ipStruct){ queue.add(ipStruct); } } }catch(IOException e){ e.printStackTrace(); }finally{ if(null != reader){ try { reader.close(); } catch (IOException e) { e.printStackTrace(); } } } } public IPStruct formatIPStruct(String line){ String[] addr = line.split("__"); if(addr.length <= 2 || addr[0].indexOf(".")== -1 || addr[1].indexOf(".") == -1){ return null; } IPStruct ipStruct = new IPStruct(); ipStruct.setStartIP(addr[0]); ipStruct.setEndIP(addr[1]); ipStruct.setAddress(addr[2]); return ipStruct; } } class ThreadQueryIPLocation implements Runnable{ private final BlockingQueue<IPStruct> inputQueue; private final BlockingQueue<IPStruct> outputQueue; ThreadQueryIPLocation(BlockingQueue<IPStruct> inputQueue,BlockingQueue<IPStruct> outputQueue){ this.inputQueue = inputQueue; this.outputQueue = outputQueue; } @Override public void run() { try{ while(true){ IPStruct ipStruct = inputQueue.take(); queryIPLocation(ipStruct); outputQueue.add(ipStruct); } } catch(InterruptedException ex){ ex.printStackTrace(); } } private static final int RETRY = 5; private static final String BASE_URL = "http://api.map.baidu.com/location/ip?ak=7E814968d7b3ee0440678cb17cb4aa29&coor=bd09ll"; public void queryIPLocation(IPStruct obj) { String url = null; String result = null; url = BASE_URL + "&ip=" + obj.getStartIP(); //System.out.println("url-->" + url); for (int j = 0; j <= RETRY; j++) { try { result = HttpUtil.getJsonStringFromUrl(url); } catch (Exception e) { if (j == RETRY) { obj.setStatus("Failed"); break; } continue; } if (null != result) { JSONObject jsonObject = JSONUtil.toJSONObject(result); Integer status = (Integer) jsonObject.get("status"); if (status == 0) { obj.setStatus("SUCCESS"); procOK(jsonObject, obj); break; } else if (status == 1) { obj.setStatus("SERVER_ERROR"); break; } else if (status == 5) { obj.setStatus("INVALID_AK"); break; } else if (status == 101) { obj.setStatus("DISABLE_SERVICE"); break; } else if (status == 345) { if (j == RETRY) { obj.setStatus("MINUTE_QUERY_LIMIT"); } } else if (status == 355) { obj.setStatus("DAY_QUERY_LIMIT"); break; } } } } public void procOK(JSONObject result, IPStruct obj) { JSONObject contentJSON = JSONUtil.toJSONObject(result.get("content")); JSONObject pointJSON = JSONUtil.toJSONObject(contentJSON.get("point").toString()); obj.setAddress((String) contentJSON.get("address")); obj.setPosition(pointJSON.get("x") + "," + pointJSON.get("y")); } } class ThreadWriteFile implements Runnable{ public static final List<IPStruct> succIPStructs = new ArrayList<IPStruct>(); public static final List<IPStruct> failIPStructs = new ArrayList<IPStruct>(); public static final String SEPARATOR = "|"; public static final int FILE_SAVE_SIZE = 100; public static final String SUCC_PATH = "D:\\success.txt"; public static final String FAIL_PATH = "D:\\fail.txt"; public static AtomicInteger counter = new AtomicInteger(0); private final BlockingQueue<IPStruct> queue; ThreadWriteFile(BlockingQueue<IPStruct> queue){ this.queue = queue; } @Override public void run() { try{ while(true){ IPStruct ipStruct = queue.take(); count(); if(ipStruct.getStatus() == "SUCCESS"){ saveSuccIPStruct(ipStruct); } else{ saveFailIPStruct(ipStruct); } } } catch(InterruptedException ex){ ex.printStackTrace(); } } private synchronized void count(){ counter.addAndGet(1); System.out.println("count:" + counter.get()); } public static void writeResult(String lines,String path){ try { if(null == lines || lines.trim().equals("")){ return; } FileWriter writer = new FileWriter(path,true); writer.write(lines); writer.close(); } catch (IOException e) { e.printStackTrace(); } } public synchronized void saveSuccIPStruct(IPStruct ipStruct){ succIPStructs.add(ipStruct); if(succIPStructs.size() >= FILE_SAVE_SIZE){ StringBuffer lines = new StringBuffer(); for(IPStruct succIPStruct : succIPStructs){ lines.append(succIPStruct.getStartIP()).append(SEPARATOR); lines.append(succIPStruct.getEndIP()).append(SEPARATOR); lines.append(succIPStruct.getAddress()).append(SEPARATOR); lines.append(succIPStruct.getPosition()).append("\n"); } writeResult(lines.toString(),SUCC_PATH); succIPStructs.clear(); } } public synchronized void saveFailIPStruct(IPStruct ipStruct){ failIPStructs.add(ipStruct); if(failIPStructs.size() >= FILE_SAVE_SIZE){ StringBuffer lines = new StringBuffer(); for(IPStruct failIPStruct : failIPStructs){ lines.append(failIPStruct.getStartIP()).append(SEPARATOR); lines.append(failIPStruct.getEndIP()).append(SEPARATOR); lines.append(failIPStruct.getAddress()).append(SEPARATOR); lines.append(failIPStruct.getStatus()).append("\n"); } writeResult(lines.toString(),FAIL_PATH); failIPStructs.clear(); } } public static void saveResult(String path,List<IPStruct> ipStructs){ StringBuffer lines = new StringBuffer(); for(IPStruct ipStruct : ipStructs){ lines.append(ipStruct.getStartIP()).append(SEPARATOR); lines.append(ipStruct.getEndIP()).append(SEPARATOR); lines.append(ipStruct.getAddress()).append(SEPARATOR); if(ipStruct.getStatus() == "SUCCESS"){ lines.append(ipStruct.getPosition()).append("\n"); } else{ lines.append(ipStruct.getStatus()).append("\n"); } } writeResult(lines.toString(),path); } }
JAVA代码是在windows下eclipse里执行:
total cost:1132948
可以看出JAVA代码的运行时间大约在11分钟。单单从运行时间上看java的实现会快差不多4分钟,大师们也都说过Python很关注程序员的生产率而不是绝对的性能。当然很大的可能性是我代码本身问题,而造成这4分钟的时间差。
写这些也并不想真的比较python和java那个好,只是想这样可以有对比的学习,做以记录。
相关推荐
经常会遇到下述问题:很多io busy的应用采取多线程的方式来解决,但这时候会发现python命令行不响应ctrl-c 了,而对应的java代码则没有问题: 复制代码 代码如下: public class Test { public static void main...
多线程对爬虫的效率提高是非凡的,当我们使用python的多线程有几点是需要我们知道的:1.Python的多线程并不如java的多线程,其差异在于当python解释器开始执行任务时,受制于GIL(全局解释所),Python的线程被限制到...
基于Petri net with inhibitor arcs的Python多线程程序分析.pdf
Python多线程爬虫项目源码.zip
多进程/多线程并发 : 任何任务 3. 基于fork的多进程并发程序 每当有一个客户端连接就创建一个新的进程 4. ftp文件服务程序 *********************************************** 多线程并发 threading 的多...
在批评Python的讨论中,常常说起Python多线程是多么的难用。还有人对 global interpreter lock(也被亲切的称为“GIL”)指指点点,说它阻碍了Python的多线程程序同时运行。因此,如果你是从其他语言(比如C++或Java)...
解决了多个进程或者线程对共享资源的争夺 Event e.set e.clear e.wait Lock lock.acquire() lock.release() 4. 什么是线程 threading Thread() t.start() t.join() t.name t.getName t.setName t.daemon...
在早期的Python多线程实现中,采用了thread模块。例如: from time import ctime,sleep from thread import start_new_thread def loop1(): print enter loop1:,ctime(); sleep(3); print leave loop1
多线程爬虫也就和JAVA的多线程差不多,直接上代码 ''' #此处代码为普通爬虫 import urllib.request import urllib.error import re headers = ("User-Agent","Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537...
该文件的解释详情由本博主上一篇博文说明,代码亲测可用,由于python多线程并行效果不好(解释器原因),这里用Python多进程和java多线程的方法计算矩阵乘法。链接详情:...
* 进程 线程 4. 多进程编程 时间片 PCB PID 父子进程 优先级 进程特征 进程状态: 就绪态 运行态 等待态 5. ps -aux ps -ajx pstree top nice 6. os.fork() 7. os.getpid() os.getppid() os._exit() sys.exit() ...
前情回顾 1. 如何处理僵尸进程 * 通过wait waitpid * 通过创建二级子进程,让一级子进程退出 2. multiprocessing创建进程 * Process 类 创建进程对象 * 通过start启动进程 * 通过join回收子进程 ...
前情回顾 1. 进程对象属性 p.pid p.name p.is_alive() p.daemon 2. 自定义进程类 继承Process 重写run 3. 进程池 大量进程事件需要频繁创建删除进程 Pool() apply_async() ... 管道 消息队列 共享内存 信号 ...
1、通过java调用Python命令执行datax任务调度 2、自动开始任务和调度结束关闭任务调度释放内存 3、如果我们在cmd使用命令调度,执行完毕后无法释放内存,会造成内存泄露不足,出现报错“Error occurred during ...
多线程,tcp,以及通信协议设计。 使用方法: 启动服务端 java/python ChatServer.java/py 启动客户端 java/python ChatClient.java/py 登录客户端时注意用loginfo.txt文件中的用户信息登录。格式为: 用户名##...
python实现的多线程爬虫源码.zip
opencv3和opencv4多线程内存泄漏问题:以cv::resize函数测试结果为例。 使用中可修复或者可避免内存泄漏:1)使用opencv2的版本;2)在代码中设置修复该问题.
基于python+bs+rq+gevent多线程爬取妹子图项目源码.zip 基于python+bs+rq+gevent多线程爬取妹子图项目源码.zip 基于python+bs+rq+gevent多线程爬取妹子图项目源码.zip
java多线程源码基于机器学习的多线程代码生成技术,用于验证Java程序中的并发错误 LSTM-RNN +强化学习 使用Tensorflow在Python中使用词层语言模型的多层递归神经网络(LSTM,RNN)。 灵感来自安德烈·卡帕蒂(Andrej...