加入收藏 | 设为首页 | 会员中心 | 我要投稿 PHP编程网 - 黄冈站长网 (http://www.0713zz.com/)- 数据应用、建站、人体识别、智能机器人、语音技术!
当前位置: 首页 > 教程 > 正文

Python线程池达成

发布时间:2021-11-18 14:19:45 所属栏目:教程 来源:互联网
导读:Python 的线程池主要有threadpool,不过它并不是内置的库,每次使用都需要安装,而且使用起来也不是那么好用,所以自己写了一个线程池实现,每次需要使用直接import即可。其中还可以根据传入的特征量handlerkey来获取每个任务的结果。 #!/bin/env python # -

Python 的线程池主要有threadpool,不过它并不是内置的库,每次使用都需要安装,而且使用起来也不是那么好用,所以自己写了一个线程池实现,每次需要使用直接import即可。其中还可以根据传入的特征量handlerkey来获取每个任务的结果。
 
#!/bin/env python
# -*- coding:utf-8 -*-
 
"""
@lx
created on 2016-04-14
"""
 
import Queue
import sys
import threading
import time
import StringIO
import traceback
 
reload(sys)
sys.setdefaultencoding("utf8")
 
 
class MyThread(threading.Thread):
    """Background thread connected to the requests/results queues."""
    def __init__(self, workQueue, resultQueue, timeout=0.1, **kwds):
        threading.Thread.__init__(self, **kwds)
        self.setDaemon(True)
        self._workQueue = workQueue
        self._resultQueue = resultQueue
        self._timeout = timeout
        self._dismissed = threading.Event()
        self.start()
 
    def run(self):
        """Repeatedly process the job queue until told to exit."""
        while True:
            if self._dismissed.isSet():
                break
 
            handlerKey = None  # unique key
            code = 0  # callback return code
            handlerRet = None
            errMsg = ""
 
            try:
                callable, args, kwds = self._workQueue.get(True, self._timeout)
            except Queue.Empty:
                continue
            except:
                exceptMsg = StringIO.StringIO()
                traceback.print_exc(file=exceptMsg)
                errMsg = exceptMsg.getvalue()
                code = 3301  # system error
                self._resultQueue.put(
                        (handlerKey, code, (callable, args, kwds), errMsg))
                break
 
            if self._dismissed.isSet():
                self._workQueue.put((callable, args, kwds))
                break
 
            try:
                if "handlerKey" in kwds:
                    handlerKey = kwds["handlerKey"]
                handlerRet = callable(*args, **kwds)  # block
                self._resultQueue.put((handlerKey, code, handlerRet, errMsg))
            except:
                exceptMsg = StringIO.StringIO()
                traceback.print_exc(file=exceptMsg)
                errMsg = exceptMsg.getvalue()
                code = 3303
                self._resultQueue.put((handlerKey, code, handlerRet, errMsg))
 
    def dismiss(self):
        """Sets a flag to tell the thread to exit when done with current job."""
        self._dismissed.set()
 
 
class ThreadPool(object):
    def __init__(self, workerNums=3, timeout=0.1):
        self._workerNums = workerNums
        self._timeout = timeout
        self._workQueue = Queue.Queue()  # no maximum
        self._resultQueue = Queue.Queue()
        self.workers = []
        self.dismissedWorkers = []
        self._createWorkers(self._workerNums)
 
    def _createWorkers(self, workerNums):
        """Add num_workers worker threads to the pool."""
        for i in range(workerNums):
            worker = MyThread(self._workQueue, self._resultQueue,
                              timeout=self._timeout)
            self.workers.append(worker)
 
    def _dismissWorkers(self, workerNums, _join=False):
        """Tell num_workers worker threads to quit after their current task."""
        dismissList = []
        for i in range(min(workerNums, len(self.workers))):
            worker = self.workers.pop()
            worker.dismiss()
            dismissList.append(worker)
 
        if _join:
            for worker in dismissList:
                worker.join()
        else:
            self.dismissedWorkers.extend(dismissList)
 
    def _joinAllDissmissedWorkers(self):
        """
        Perform Thread.join() on all
        worker threads that have been dismissed.
        """
        for worker in self.dismissedWorkers:
            worker.join()
        self.dismissedWorkers = []
 
    def addJob(self, callable, *args, **kwds):
        self._workQueue.put((callable, args, kwds))
 
    def getResult(self, block=False, timeout=0.1):
        try:
            item = self._resultQueue.get(block, timeout)
            return item
        except Queue.Empty, e:
            return None
        except:
            raise
 
    def waitForComplete(self, timeout=0.1):
        """
        Last function. To dismiss all worker threads. Delete ThreadPool.
        :param timeout
        """
        while True:
            workerNums = self._workQueue.qsize()  # 释放掉所有线程
            runWorkers = len(self.workers)
 
            if 0 == workerNums:
                time.sleep(timeout)  # waiting for thread to do job
                self._dismissWorkers(runWorkers)
                break
            # if workerNums < runWorkers:  # 不能这样子乱取消
            #    self._dismissWorkers(runWorkers - workerNums)
            time.sleep(timeout)
        self._joinAllDissmissedWorkers()
 
 
if "__main__" == __name__:
    test1 = """
    def doSomething(*args, **kwds):
        if "sleep" in kwds:
            sleep = kwds["sleep"]
        msgTxt = "sleep %fs.." % sleep
        time.sleep(sleep)
        return msgTxt
 
    for i in range(10):
        print doSomething(sleep=0.1, handlerKey="key-%d"%i)
 
    wm = ThreadPool(10)
    for i in range(10):
        wm.addJob(doSomething, sleep=1, handlerKey="key-%d"%i)
    wm.waitForComplete()
    for i in range(10):
        print wm.getResult()
    del wm
    """
    # test2 = """
 
    def doSomething_(*args, **kwds):
        sleep = int(args[0])
        msgTxt = "sleep %ds.." % sleep
        time.sleep(sleep)
        return msgTxt
 
 
    wm = ThreadPool(10)
    result = []
    for i in range(10):
        data = 5
        wm.addJob(doSomething_, data)
 
    while 1:
        res = wm.getResult()
        if res:
            result.append(res)
        if 10 == len(result):
            break
        print "sleep 0.1"
        time.sleep(0.1)
    print time.time()
    wm.waitForComplete()
    print time.time()
# """

(编辑:PHP编程网 - 黄冈站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    热点阅读