`
shuiyutian
  • 浏览: 10731 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Python多线程 VS JAVA多线程

阅读更多

      最近在学习Python,在网上遇到一哥们问一个关于用Python多线程的问题。具体就是读取一个文件中的IP信息,然利用百度地图open api获取IP所在地的经纬度,最后再将取回的结果分类为“请求成功”和“请求失败”分别存入两个文件中。由于自己也在学,所以也就拿来做了做,并分别用Python3和Java实现。

    稍稍分析了一下需求能看的出来里面性能瓶颈有两个:

    1、一个是查询百度地图OPEN API;

    2、写磁盘文件;

    于是想到把这两部分分开处理,自然的想到了生产-消费者模式来实现。

    对于IP文件信息格式如下(我的测试文件有41255行数据):

   

    61.0.0.0__61.255.255.255__亚洲____

    61.157.0.0__61.157.255.255__四川____

    61.139.0.0__61.139.127.255__四川____

    61.130.93.0__61.130.254.255__浙江____

    

   

    下面是Python3实现代码:

#coding:utf-8
import urllib.request
import json
import threading
import queue

FILE_ENCODING = "utf-8"
successList = []
failList = []
successPath = r"success.txt"
failPath = r"failed.txt"
ipPath = "ip.txt"
queryCount = 5
threadCount = 20
lock = threading.RLock()
separator = "|"
saveCount = 100
successIndex = 0
failIndex = 0
theCount = 0


queryQueue = queue.Queue()
processedQueue = queue.Queue()

class IPStruct(object):
    def __init__(self):
        self.startIP = None
        self.endIP = None
        self.address = ""
        self.position = ""
        self.status = ""


class QueryIPLocation(object):
    _retry = 5
    
    def query(self, objList):
        for obj in objList:
            url = self.get_url(obj)#.encode('utf-8')
#            print("url:%s" % url)
            for j in range(0, self._retry + 1):
                try:
                    result = urllib.request.urlopen(url,timeout=5)
                except Exception:
                    if j == self._retry:
                        obj.status = "Failed"
                        break
                    continue

                if result.getcode() != 200:
                    if j == self._retry:
                        obj.status = "Failed"
                else:
                    jsonRet = json.loads(result.read().decode())
                    if jsonRet['status'] == 0:
                        obj.status = "SUCCESS"
                        self.proc_ok(jsonRet, obj)
                        break
                    elif jsonRet['status'] == 1:
                        obj.status = "SERVER_ERROR"
                        break
                    elif jsonRet['status'] == 2:
                        obj.status = "INVALID_PARAMETER"
                        break
                    elif jsonRet["status"] == 5:
                        obj.status = "INVALID_AK"
                        break
                    elif jsonRet["status"] == 101:
                        obj.status = "DISABLE_SERVICE"
                        break
                    elif jsonRet['status'] == 345:
                        if j == self._retry:
                            obj.status = "MINUTE_QUERY_LIMIT"
                    elif jsonRet["status"] == 355:
                        obj.status = "DAY_QUERY_LIMIT"
                        break

    def get_url(self, obj):
        base = "http://api.map.baidu.com/location/ip?ak=7E814968d7b3ee0440678cb17cb4aa29&coor=bd09ll"
        return base + "&ip=" + str(obj.startIP)

    def proc_ok(self, result, obj):
        point = result["content"]["point"]
        address = result["content"]["address"]
        obj.address = address
        obj.position = str(point['x']) + "," + str(point['y'])

class ThreadUrl(threading.Thread):
    """Threaded Url Grab"""
    def __init__(self,queryQueue,processedQueue):
        threading.Thread.__init__(self)
        self.queryQueue = queryQueue
        self.processedQueue = processedQueue

    def run(self):
        while True:
            queryList = self.queryQueue.get()
            app = QueryIPLocation()
            try:
                app.query(queryList)
                for item in queryList:
                    self.processedQueue.put(item)
            except Exception:
                print("there has error..........")
                raise
            finally:
                self.queryQueue.task_done()
    
class ThreadFile(threading.Thread):
    """Threaded save File"""
    
    def __init__(self,queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        global successList
        global failList
        global successIndex
        global failIndex
        global theCount
        while True:
            item = self.queue.get()
            if item.status == "SUCCESS":
                lock.acquire()
                try:
                    theCount += 1
                    print("theCount:",theCount)
                    successList.append(item)
                    successIndex += 1
                    if successIndex == saveCount:
                        save_result(successPath,resultSequence=successList)
                        successIndex = 0
                        del successList[:]
                except Exception:
                    raise
                finally:
                    lock.release()
            else:
                lock.acquire()
                try:
                    theCount += 1
                    print("theCount:",theCount)
                    failList.append(item)
                    failIndex += 1
                    if failIndex == saveCount:
                        save_result(failPath,resultSequence=failList)
                        failIndex = 0
                        del failList[:]
                except Exception:
                    raise
                finally:
                    lock.release()
            self.queue.task_done()

def save_result(filePath, resultSequence=[]):
    """save the success result to successPath"""
    line = ""
    file = open(filePath, "a")
    if not file:
        print("error open file %s to write" % filePath)
    for item in resultSequence:
        if item.status == "SUCCESS":
            line += item.startIP + separator + item.endIP + separator + item.address + separator + item.position + "\n"
        else:
            line += item.startIP + separator + item.endIP + separator + item.address + separator + item.status + "\n"
    file.write(line)
    file.close()

def get_ip_iter(file):
    ipList = []
    for line in file:
        addr = line.split("__")
        count = len(addr)
        if count <= 2 or "." not in addr[0] or "." not in addr[1]:
            continue
        obj = IPStruct()
        obj.startIP = addr[0]
        obj.endIP = addr[1]
        obj.address = addr[2]
        ipList.append(obj)
        if len(ipList) == queryCount:
            yield ipList
            del ipList[:]
    else:
        if len(ipList) > 0:
            yield ipList

def main():
    print("start read file")
    ipFile = open(ipPath,"r",encoding="utf-8")
    if not ipFile:
        print("errror open file %s" % ipPath)
        return
    
    iterIpFile = get_ip_iter(ipFile)
    for i in range(threadCount):
        t = ThreadUrl(queryQueue,processedQueue)
        t.setDaemon(True)
        t.start()

    for queryList in iterIpFile:
        queryQueue.put(list(queryList))

    for i in range(2):
        dt = ThreadFile(processedQueue)
        dt.setDaemon(True)
        dt.start()
    
    queryQueue.join()
    processedQueue.join()
     
    save_result(successPath,resultSequence=successList)
    save_result(failPath,resultSequence=failList)

    使用 time python3 ***.py 执行:

real    15m46.962s
user    0m38.042s
sys     1m0.162s

    可以看到运行时间15分46秒

 

    由于自己以前一直学的是Java,随即就对用Python和用Java的多线程来实现谁的执行速度会更高一些产生一些性趣,索性就会就Java也实现了同样的需求,并使用了同样的线程数。

    以下是JAVA实现的代码:

    

package test;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

import net.sf.json.JSONObject;
import util.HttpUtil;
import util.JSONUtil;

public class IPQueryTest {
	
   
    
	public static void main(String[] args) throws InterruptedException {
		
		long start = System.currentTimeMillis();
		BlockingQueue<IPStruct> fileQueue = new LinkedBlockingQueue<IPStruct>();
		BlockingQueue<IPStruct> ipQueue = new LinkedBlockingQueue<IPStruct>();
		
		ExecutorService exec = Executors.newCachedThreadPool();
		exec.execute(new ThreadReadFile(fileQueue));

		for(int i=0;i<20;i++){
			exec.execute(new ThreadQueryIPLocation(fileQueue,ipQueue));
		}

		for(int i=0;i<2 ;i++){
			exec.execute(new ThreadWriteFile(ipQueue));
		}
		
		while(true){
			Thread.sleep(10);
			if(fileQueue.size() == 0 && ipQueue.size() == 0){
				ThreadWriteFile.saveResult(ThreadWriteFile.SUCC_PATH, ThreadWriteFile.succIPStructs);
				ThreadWriteFile.saveResult(ThreadWriteFile.FAIL_PATH, ThreadWriteFile.failIPStructs);
				long stend = System.currentTimeMillis();
				System.out.println("total cost:" + (stend - start));
				break;
			}
			/*else{
				System.out.println("fileQueue size:" + fileQueue.size() + ",ipQueue size:" + ipQueue.size());
			}*/
		}
	}
}

class IPStruct {
	private String startIP;
	private String endIP;
	private String address;
	private String position;
	private String status;

	public String getStartIP() {
		return startIP;
	}

	public void setStartIP(String startIP) {
		this.startIP = startIP;
	}

	public String getEndIP() {
		return endIP;
	}

	public void setEndIP(String endIP) {
		this.endIP = endIP;
	}

	public String getAddress() {
		return address;
	}

	public void setAddress(String address) {
		this.address = address;
	}

	public String getPosition() {
		return position;
	}

	public void setPosition(String position) {
		this.position = position;
	}

	public String getStatus() {
		return status;
	}

	public void setStatus(String status) {
		this.status = status;
	}
}

class ThreadReadFile implements Runnable{
	private final BlockingQueue<IPStruct> queue;
	private static final String FILE_PATH = "D:\\ip.txt";

	ThreadReadFile(BlockingQueue<IPStruct> q){
		this.queue = q;
	}
	
	@Override
	public void run() {
		try{
			produceIPS(queue);
		}catch(InterruptedException ex){
			ex.printStackTrace();
		}
	}
	
	public void produceIPS(BlockingQueue<IPStruct> queue)throws InterruptedException{
		File file = null;
		BufferedReader reader = null;
		try{
			file = new File(FILE_PATH);
			reader = new BufferedReader(new FileReader(file));
			String line = null;
			IPStruct ipStruct = null;
			while((line = reader.readLine()) != null){
				ipStruct = formatIPStruct(line);
				if(null != ipStruct){
					queue.add(ipStruct);
				}
			}
		}catch(IOException e){
			e.printStackTrace();
		}finally{
			if(null != reader){
				try {
					reader.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}
	
	public IPStruct formatIPStruct(String line){
		String[] addr = line.split("__");
		if(addr.length <= 2 || addr[0].indexOf(".")== -1 || addr[1].indexOf(".") == -1){
			return null;
		}
		IPStruct ipStruct = new IPStruct();
		ipStruct.setStartIP(addr[0]);
		ipStruct.setEndIP(addr[1]);
		ipStruct.setAddress(addr[2]);
		return ipStruct;
	}
}

class ThreadQueryIPLocation implements Runnable{
	private final BlockingQueue<IPStruct> inputQueue;
	private final BlockingQueue<IPStruct> outputQueue;
	
	ThreadQueryIPLocation(BlockingQueue<IPStruct> inputQueue,BlockingQueue<IPStruct> outputQueue){
		this.inputQueue = inputQueue;
		this.outputQueue = outputQueue;
	}
	@Override
	public void run() {
		try{
			while(true){
				IPStruct ipStruct = inputQueue.take();
				queryIPLocation(ipStruct);
				outputQueue.add(ipStruct);
			}
		}
		catch(InterruptedException ex){
			ex.printStackTrace();
		}
	}
	
	private static final int RETRY = 5;
	private static final String BASE_URL = "http://api.map.baidu.com/location/ip?ak=7E814968d7b3ee0440678cb17cb4aa29&coor=bd09ll";
	
	public void queryIPLocation(IPStruct obj) {
		String url = null;
		String result = null;
		url = BASE_URL + "&ip=" + obj.getStartIP();
		//System.out.println("url-->" + url);
		for (int j = 0; j <= RETRY; j++) {
			try {
				result = HttpUtil.getJsonStringFromUrl(url);
			} catch (Exception e) {
				if (j == RETRY) {
					obj.setStatus("Failed");
					break;
				}
				continue;
			}

			if (null != result) {
				JSONObject jsonObject = JSONUtil.toJSONObject(result);
				Integer status = (Integer) jsonObject.get("status");
				if (status == 0) {
					obj.setStatus("SUCCESS");
					procOK(jsonObject, obj);
					break;
				} else if (status == 1) {
					obj.setStatus("SERVER_ERROR");
					break;
				} else if (status == 5) {
					obj.setStatus("INVALID_AK");
					break;
				} else if (status == 101) {
					obj.setStatus("DISABLE_SERVICE");
					break;
				} else if (status == 345) {
					if (j == RETRY) {
						obj.setStatus("MINUTE_QUERY_LIMIT");
					}
				} else if (status == 355) {
					obj.setStatus("DAY_QUERY_LIMIT");
					break;
				}
			}
		}
	}

	public void procOK(JSONObject result, IPStruct obj) {
		JSONObject contentJSON = JSONUtil.toJSONObject(result.get("content"));
		JSONObject pointJSON = JSONUtil.toJSONObject(contentJSON.get("point").toString());
		obj.setAddress((String) contentJSON.get("address"));
		obj.setPosition(pointJSON.get("x") + "," + pointJSON.get("y"));
	}
}

class ThreadWriteFile implements Runnable{
	
	public static final List<IPStruct> succIPStructs = new ArrayList<IPStruct>();
	public static final List<IPStruct> failIPStructs = new ArrayList<IPStruct>();
	public static final String SEPARATOR = "|";
	public static final int FILE_SAVE_SIZE = 100;
	public static final String SUCC_PATH = "D:\\success.txt";
	public static final String FAIL_PATH = "D:\\fail.txt";
	public static AtomicInteger counter = new AtomicInteger(0);
	
	
	private final BlockingQueue<IPStruct> queue;
	
	ThreadWriteFile(BlockingQueue<IPStruct> queue){
		this.queue = queue;
	}
	
	@Override
	public void run() {
		try{
			while(true){
				IPStruct ipStruct = queue.take();
				count();
				if(ipStruct.getStatus() == "SUCCESS"){
					saveSuccIPStruct(ipStruct);
				}
				else{
					saveFailIPStruct(ipStruct);
				}
			}
		}
		catch(InterruptedException ex){
			ex.printStackTrace();
		}
	}
	private synchronized void count(){
		counter.addAndGet(1);
		System.out.println("count:" + counter.get());
	}
	
	public static void writeResult(String lines,String path){
		try {
			if(null == lines || lines.trim().equals("")){
				return;
			}
			FileWriter writer = new FileWriter(path,true);
			writer.write(lines);
			writer.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	public synchronized void saveSuccIPStruct(IPStruct ipStruct){
		succIPStructs.add(ipStruct);
		if(succIPStructs.size() >= FILE_SAVE_SIZE){
			StringBuffer lines = new StringBuffer();
			for(IPStruct succIPStruct : succIPStructs){
				lines.append(succIPStruct.getStartIP()).append(SEPARATOR);
				lines.append(succIPStruct.getEndIP()).append(SEPARATOR);
				lines.append(succIPStruct.getAddress()).append(SEPARATOR);
				lines.append(succIPStruct.getPosition()).append("\n");
			}
			writeResult(lines.toString(),SUCC_PATH);
			succIPStructs.clear();
		}
	}
	
	public synchronized void saveFailIPStruct(IPStruct ipStruct){
		failIPStructs.add(ipStruct);
		if(failIPStructs.size() >= FILE_SAVE_SIZE){
			StringBuffer lines = new StringBuffer();
			for(IPStruct failIPStruct : failIPStructs){
				lines.append(failIPStruct.getStartIP()).append(SEPARATOR);
				lines.append(failIPStruct.getEndIP()).append(SEPARATOR);
				lines.append(failIPStruct.getAddress()).append(SEPARATOR);
				lines.append(failIPStruct.getStatus()).append("\n");
			}
			writeResult(lines.toString(),FAIL_PATH);
			failIPStructs.clear();
		}
	}
	
	public static void saveResult(String path,List<IPStruct> ipStructs){
		StringBuffer lines = new StringBuffer();
		for(IPStruct ipStruct : ipStructs){
			lines.append(ipStruct.getStartIP()).append(SEPARATOR);
			lines.append(ipStruct.getEndIP()).append(SEPARATOR);
			lines.append(ipStruct.getAddress()).append(SEPARATOR);
			if(ipStruct.getStatus() == "SUCCESS"){
				lines.append(ipStruct.getPosition()).append("\n");
			}
			else{
				lines.append(ipStruct.getStatus()).append("\n");
			}
		}
		writeResult(lines.toString(),path);
	}
}

    JAVA代码是在windows下eclipse里执行:

    

total cost:1132948

    可以看出JAVA代码的运行时间大约在11分钟。单单从运行时间上看java的实现会快差不多4分钟,大师们也都说过Python很关注程序员的生产率而不是绝对的性能。当然很大的可能性是我代码本身问题,而造成这4分钟的时间差。

    写这些也并不想真的比较python和java那个好,只是想这样可以有对比的学习,做以记录。

    

分享到:
评论

相关推荐

    探寻python多线程ctrl+c退出问题解决方案

    经常会遇到下述问题:很多io busy的应用采取多线程的方式来解决,但这时候会发现python命令行不响应ctrl-c 了,而对应的java代码则没有问题: 复制代码 代码如下: public class Test {   public static void main...

    python爬虫之多线程、多进程爬虫

    多线程对爬虫的效率提高是非凡的,当我们使用python的多线程有几点是需要我们知道的:1.Python的多线程并不如java的多线程,其差异在于当python解释器开始执行任务时,受制于GIL(全局解释所),Python的线程被限制到...

    基于Petri net with inhibitor arcs的Python多线程程序分析.pdf

    基于Petri net with inhibitor arcs的Python多线程程序分析.pdf

    Python多线程爬虫项目源码.zip

    Python多线程爬虫项目源码.zip

    python多线程DAY05.txt

    多进程/多线程并发 : 任何任务 3. 基于fork的多进程并发程序 每当有一个客户端连接就创建一个新的进程 4. ftp文件服务程序 *********************************************** 多线程并发 threading 的多...

    浅析Python中的多进程与多线程的使用

    在批评Python的讨论中,常常说起Python多线程是多么的难用。还有人对 global interpreter lock(也被亲切的称为“GIL”)指指点点,说它阻碍了Python的多线程程序同时运行。因此,如果你是从其他语言(比如C++或Java)...

    python多线程DAY04.txt

    解决了多个进程或者线程对共享资源的争夺 Event e.set e.clear e.wait Lock lock.acquire() lock.release() 4. 什么是线程 threading Thread() t.start() t.join() t.name t.getName t.setName t.daemon...

    Python多线程实例教程

    在早期的Python多线程实现中,采用了thread模块。例如:  from time import ctime,sleep from thread import start_new_thread def loop1(): print enter loop1:,ctime(); sleep(3); print leave loop1

    Python多线程爬虫实战_爬取糗事百科段子的实例

    多线程爬虫也就和JAVA的多线程差不多,直接上代码 ''' #此处代码为普通爬虫 import urllib.request import urllib.error import re headers = ("User-Agent","Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537...

    Task02多线程矩阵乘法.7z

    该文件的解释详情由本博主上一篇博文说明,代码亲测可用,由于python多线程并行效果不好(解释器原因),这里用Python多进程和java多线程的方法计算矩阵乘法。链接详情:...

    python多线程DAY01.txt

    * 进程 线程 4. 多进程编程 时间片 PCB PID 父子进程 优先级 进程特征 进程状态: 就绪态 运行态 等待态 5. ps -aux ps -ajx pstree top nice 6. os.fork() 7. os.getpid() os.getppid() os._exit() sys.exit() ...

    python多线程DAY02.txt

    前情回顾 1. 如何处理僵尸进程 * 通过wait waitpid * 通过创建二级子进程,让一级子进程退出 2. multiprocessing创建进程 * Process 类 创建进程对象 * 通过start启动进程 * 通过join回收子进程 ...

    python多线程DAY03.txt

    前情回顾 1. 进程对象属性 p.pid p.name p.is_alive() p.daemon 2. 自定义进程类 继承Process 重写run 3. 进程池 大量进程事件需要频繁创建删除进程 Pool() apply_async() ... 管道 消息队列 共享内存 信号 ...

    java通过Python命令执行datax任务调度

    1、通过java调用Python命令执行datax任务调度 2、自动开始任务和调度结束关闭任务调度释放内存 3、如果我们在cmd使用命令调度,执行完毕后无法释放内存,会造成内存泄露不足,出现报错“Error occurred during ...

    C/S模式的TCP聊天室程序(python和java版本)

    多线程,tcp,以及通信协议设计。 使用方法: 启动服务端 java/python ChatServer.java/py 启动客户端 java/python ChatClient.java/py 登录客户端时注意用loginfo.txt文件中的用户信息登录。格式为: 用户名##...

    python实现的多线程爬虫源码.zip

    python实现的多线程爬虫源码.zip

    opencv3和opencv4多线程内存泄漏问题

    opencv3和opencv4多线程内存泄漏问题:以cv::resize函数测试结果为例。 使用中可修复或者可避免内存泄漏:1)使用opencv2的版本;2)在代码中设置修复该问题.

    基于python+bs+rq+gevent多线程爬取妹子图项目源码.zip

    基于python+bs+rq+gevent多线程爬取妹子图项目源码.zip 基于python+bs+rq+gevent多线程爬取妹子图项目源码.zip 基于python+bs+rq+gevent多线程爬取妹子图项目源码.zip

    java多线程源码-generator_multiThreadCode:使用Tensorflow在Python中使用词层语言模型的多层递归神经

    java多线程源码基于机器学习的多线程代码生成技术,用于验证Java程序中的并发错误 LSTM-RNN +强化学习 使用Tensorflow在Python中使用词层语言模型的多层递归神经网络(LSTM,RNN)。 灵感来自安德烈·卡帕蒂(Andrej...

Global site tag (gtag.js) - Google Analytics