`
dieslrae
  • 浏览: 34719 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

重构第一次写的线程池

阅读更多
最近没有什么学习欲望,修改之前的线程池的计划一直搁置,这几天比较闲,还是做了一次重构,由之前的2个类拆分为现在的4个类.

1、首先是工作线程类:TaskThread,此类为一个工作线程,用于完成一个工作任务,提供等待(wait),继续(proceed),绑定任务(bindTask)等方法
#!/usr/bin/env python
# -*- coding:utf8 -*-

import threading

class TaskThread(threading.Thread):
    def __init__(self):
        super(TaskThread,self).__init__()
        self._e = threading.Event()
        self.setDaemon(True)
        self.isReady(True)
        self.isActive(True)
       
    def run(self):
        while self.isActive():
            try:
                self.task()
            except:
                pass
            finally:
                self.wait()

    def wait(self):  
        self.isReady(True)
        self._e.clear()
        self._e.wait()

    def proceed(self):
        self.isReady(False)
        self._e.set()
        
        if not self.isAlive() and self.isActive():
            self.start()

    def isReady(self,flag = None):
        if isinstance(flag,bool):
            self.is_ready = flag
    
        return self.is_ready

    def isActive(self,flag = None):
         if isinstance(flag,bool):
            self.is_active = flag
    
        return self.is_active 
    
    def bindTask(self,task):
        self.task = task



2、线程池类(ThreadPool),此类是一个单例,用于模拟一个池,并提一个同步方法getThread用于获取池中线程,若池已经空了,返回None
#!/usr/bin/env python
# -*- coding:utf8 -*-

import threading
from TaskThread import TaskThread

class ThreadPool(object):
    __instance = None
    __lock = threading.Lock()

    def __init__():
        pass

    @classmethod
    def getInstance(self):
        self.__lock.acquire()
        if not self.__instance:
            self.__instance = super(ThreadPool,self).__new__(self)
        self.__lock.release()
        
        return self.__instance

    def initPool(self,pool_min_size = 5,pool_max_size = 10):
        self.pool_min_size = pool_min_size
        self.pool_max_size = pool_max_size
        self.pool = []

        for i in range(self.pool_min_size):
            self.pool.append(TaskThread())

    def getThread(self):
        th = None
        
        self.__lock.acquire()
        for t in self.pool:
            if not t._e.isSet() and t.isReady():
                t.isReady(False)
                th = t
                break
        self.__lock.release()

        if th is None and len(self.pool) < self.pool_max_size:
            th = TaskThread()
            self.pool.append(th)
        
        return th



3、任务队列类(TaskQueue),此类是一个单例,对Queue进行了简单的封装
#!/usr/bin/env python
# -*- coding:utf8 -*-

import threading

from Queue import Queue

class TaskQueue(object):
    __instance = None
    __lock = threading.Lock()
    
    def __init__():
        pass

    @classmethod
    def getInstance(self):
        self.__lock.acquire()
        if not self.__instance:
            self.__instance = super(TaskQueue,self).__new__(self)
        self.__lock.release()
        
        return self.__instance

    def initQueue(self,task_queue_size = 100):
        self.tasks = Queue(task_queue_size)

    def getTask(self):
        try:
            return self.tasks.get(0)
        except:
            raise Exception,'This queue is empty.'

    def addTask(self,task):
        try:
            self.tasks.put(task,0)
        except:
            raise Exception,'This queue is full.'



4、线程池管理类(ThreadPoolManager),此类负责从任务队列中获取任务,然后绑定到线程池中获取空闲线程中执行.
#!/usr/bin/env python
# -*- coding:utf8 -*-

import threading

class ThreadPoolManager(threading.Thread):
    def __init__(self,pool,tasks):
        super(ThreadPoolManager,self).__init__()
        self.setDaemon(True)

        self.pool = pool
        self.tasks = tasks
        
    def run(self):
        while True:
            t = self.pool.getThread()
            
            if t is not None:
                try:
                    task = self.tasks.getTask()
                except:
                    t.isReady(True)
                else:
                    t.bindTask(task)
                    t.proceed()




第一次重构算是完成了,目前就差一个调节负载的轮询线程,不过暂时没有想到一个好点的调节策略,只有搁置...
0
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics