apppublic/aidocs/tworkers.md
2025-10-05 11:23:33 +08:00

9.0 KiB
Raw Permalink Blame History

多线程任务处理框架技术文档

本项目实现了一个基于 Python threadingqueue 模块的轻量级多线程任务调度系统支持异步执行可调用函数callable适用于 I/O 密集型任务如网络请求、文件读写等。


目录


1. 概述

该模块提供一个简单的线程池机制,通过预创建一组工作线程来异步执行提交的任务。所有任务通过队列传递给空闲的工作线程进行处理,避免频繁创建和销毁线程带来的开销。

主要特点:

  • 支持任意可调用对象函数、方法、lambda 等)作为任务。
  • 支持传递位置参数和关键字参数。
  • 提供优雅关闭机制等待所有任务完成。
  • 基于标准库,无外部复杂依赖(除使用场景中引入的第三方库)。

2. 核心组件

2.1 Worker

Worker 是继承自 threading.Thread 的工作线程类,负责从任务队列中获取并执行任务。

属性

属性 类型 描述
r_queue Queue 接收任务的任务队列实例
timeout int/float 获取任务时的超时时间(秒)
daemon bool 是否为守护线程(设为 False

⚠️ 注:虽然设置了 setDaemon(False),但实际默认行为是主线程退出后不会强制终止这些线程,需手动调用 wait_for_complete() 保证清理。

方法

__init__(self, rqueue, timeout=1)

初始化工作线程。

  • 参数
    • rqueue (Queue):任务队列对象。
    • timeout (float, 默认 1):从队列取任务的阻塞超时时间。

自动启动线程(调用 .start())。

run(self)

线程主循环逻辑:

  1. 循环尝试从 r_queue 中取出任务(格式为 [callable, args, kw])。
  2. 若取到的任务中 callableNone,则视为停止信号,线程退出。
  3. 否则执行 callable(*args, **kw)
  4. 队列为空时捕获 Empty 异常,并休眠 1 秒后继续尝试。

🛑 当前线程在 Empty 异常时使用 time.sleep(1),可能导致最多 1 秒延迟响应新任务或退出信号。

resulthandler(self, rez)

预留方法,用于处理任务返回结果。当前为空实现,用户可子类化重写此方法以实现回调功能。

扩展建议:若需结果收集,可在 add_job 时传入 result_callback 并在此方法中触发。


2.2 ThreadWorkers

管理多个 Worker 实例的线程池控制器。

属性

属性 类型 描述
workQueue Queue 存放待处理任务的队列
worker_cnt int 工作线程数量
workers list[Worker] 所有工作线程的列表

方法

__init__(self, num_workers=20)

构造函数,创建指定数量的工作线程。

  • 参数
    • num_workers (int, 默认 20):线程池大小。

内部调用 __createThreadPool(num_workers) 创建线程。

__createThreadPool(self, num)

私有方法,创建 numWorker 实例并加入 self.workers 列表。

  • 参数
    • num (int):要创建的线程数。

每个线程共享同一个 workQueue

add_job(self, callable, args=[], kw={})

向任务队列添加一个任务。

  • 参数
    • callable (callable):可调用对象(函数等)。若为 None,表示停止信号。
    • args (list):位置参数列表。
    • kw (dict):关键字参数字典。

💡 示例:tw.add_job(print, ['Hello'], {'end': '!\\n'})

任务以 [callable, args, kw] 形式放入队列。

wait_for_complete(self)

等待所有任务执行完毕并回收线程资源。

操作流程:

  1. 向队列发送 worker_cntNone 任务(作为每个线程的退出信号)。
  2. 遍历 workers 列表,逐个调用 join() 等待其结束。
  3. 清空 workers 列表。

必须调用此方法确保所有线程正常退出,防止程序挂起。


3. 使用示例

以下是一个使用 requests 发起大量 HTTP 请求的示例:

import requests
from thread_worker import ThreadWorkers  # 假设保存为 thread_worker.py

def get(url):
    x = requests.get(url)
    print(x.status_code)

# 创建线程池默认20个线程
tw = ThreadWorkers()

# 添加10000个任务
for i in range(10000):
    tw.add_job(get, ['http://www.baidu.com'])

# 等待所有任务完成
tw.wait_for_complete()
print('finished')

输出示例:

200
200
...
finished

4. API 参考

函数/类 签名 说明
Worker Worker(rqueue: Queue, timeout: float = 1) 工作线程,自动启动
Worker.run() -> None 主循环,处理任务或退出
Worker.resulthandler(rez) (rez: Any) -> None 预留结果处理器
ThreadWorkers ThreadWorkers(num_workers: int = 20) 初始化线程池
ThreadWorkers.add_job() (callable, args: list = [], kw: dict = {}) 添加任务
ThreadWorkers.wait_for_complete() () -> None 发送退出信号并等待所有线程结束

5. 注意事项与限制

⚠️ 已知问题与改进建议

  1. 缺少 import time

    • Worker.run() 中使用了 time.sleep(1),但未导入 time 模块。
    • 错误:运行时报 NameError: name 'time' is not defined
    • 修复:在文件顶部添加 import time
  2. 任务结果无法获取

    • 当前设计不支持任务返回值的传递。
    • 建议扩展:可通过 concurrent.futures.Future 或回调函数机制增强。
  3. 高延迟响应

    • get(timeout=self.timeout) + except Empty: sleep(1) 导致最大延迟达 timeout + 1 秒。
    • 建议:将 sleep(1) 改为更小值或移除,直接依赖 get(timeout) 轮询。
  4. 不可复用线程池

    • 调用 wait_for_complete() 后线程已退出,不能再次提交任务。
    • 如需复用,应重新设计“动态增减线程”或“重启线程池”机制。
  5. 异常未捕获

    • 任务执行过程中抛出异常会中断线程。
    • 建议在外层加 try-except 包裹 callable(*args,**kw) 并记录错误。
  6. 线程安全

    • Queue 是线程安全的,整体结构合理。
    • 不推荐手动修改 workersworkQueue

6. 依赖项

  • Python 3.6+
  • 标准库:
    • sys
    • threading
    • queue
    • time(需显式导入,原代码遗漏)

可选第三方库(仅示例使用):

  • requests:用于演示网络请求任务

附录:修复后的完整代码(建议版本)

import sys
import threading
import time  # 缺失的导入
from threading import Thread
from queue import Queue, Empty


class Worker(Thread):
    def __init__(self, rqueue, timeout=1):
        Thread.__init__(self)
        self.timeout = timeout
        self.setDaemon(False)  # 主线程结束后是否强制退出
        self.r_queue = rqueue
        self.start()

    def run(self):
        while True:
            try:
                task = self.r_queue.get(timeout=self.timeout)
                if task is None or task[0] is None:
                    break
                callable_func, args, kw = task
                try:
                    result = callable_func(*args, **kw)
                    self.resulthandler(result)
                except Exception as e:
                    print(f"Task error: {e}", file=sys.stderr)
            except Empty:
                continue  # 继续轮询,无需额外 sleep

    def resulthandler(self, rez):
        pass


class ThreadWorkers:
    def __init__(self, num_workers=20):
        self.workQueue = Queue()
        self.worker_cnt = num_workers
        self.workers = []
        self.__createThreadPool(num_workers)

    def __createThreadPool(self, num):
        for _ in range(num):
            thread = Worker(self.workQueue)
            self.workers.append(thread)

    def wait_for_complete(self):
        for _ in range(self.worker_cnt):
            self.workQueue.put([None, None, None])  # 发送退出信号

        while self.workers:
            worker = self.workers.pop()
            if worker.is_alive():
                worker.join()

    def add_job(self, callable_func, args=None, kw=None):
        if args is None:
            args = []
        if kw is None:
            kw = {}
        self.workQueue.put([callable_func, args, kw])


if __name__ == '__main__':
    import requests

    def get(url):
        x = requests.get(url)
        print(x.status_code)

    tw = ThreadWorkers(num_workers=10)
    for i in range(100):
        tw.add_job(get, ['http://www.baidu.com'])
    tw.wait_for_complete()
    print('finished')

此版本修复了 time 导入缺失、异常处理、延迟优化等问题,更具健壮性。