Python - 线程池



线程池是一种自动高效管理多个线程的机制,允许并发执行任务。Python 没有通过threading模块直接提供线程池。

相反,它通过multiprocessing.dummy模块和concurrent.futures模块提供基于线程的池化。这些模块提供了方便的接口来创建和管理线程池,使并发任务执行更容易。

什么是线程池?

线程池是线程的集合,由一个池管理。池中的每个线程称为工作线程或工作者线程。这些线程可以重复使用来执行多个任务,从而减少了重复创建和销毁线程的负担。

线程池控制线程的创建及其生命周期,使其能够更有效地处理大量任务。

我们可以使用以下类在 Python 中实现线程池:

  • Python ThreadPool 类
  • Python ThreadPoolExecutor 类

使用 Python ThreadPool 类

multiprocessing.pool.ThreadPool类在multiprocessing模块中提供了一个线程池接口。它管理一个工作线程池,可以将作业提交到该池以进行并发执行。

ThreadPool对象通过处理工作线程之间的任务创建和分配来简化多个线程的管理。它与最初为进程设计的Pool类共享一个接口,但已调整为也能与线程一起使用。

ThreadPool实例与Pool实例完全接口兼容,应作为上下文管理器进行管理,或者手动调用close()和terminate()。

示例

此示例演示了使用Python线程池对数字列表并行执行square和cube函数,其中每个函数并发应用于数字,最多使用3个线程,每个线程在执行之间延迟1秒。

from multiprocessing.dummy import Pool as ThreadPool
import time

def square(number):
   sqr = number * number
   time.sleep(1)
   print("Number:{} Square:{}".format(number, sqr))

def cube(number):
   cub = number*number*number
   time.sleep(1)
   print("Number:{} Cube:{}".format(number, cub))

numbers = [1, 2, 3, 4, 5]
pool = ThreadPool(3)
pool.map(square, numbers)
pool.map(cube, numbers)

pool.close()

输出

执行上述代码后,您将获得以下输出:

Number:2 Square:4
Number:1 Square:1
Number:3 Square:9
Number:4 Square:16
Number:5 Square:25
Number:1 Cube:1
Number:2 Cube:8
Number:3 Cube:27
Number:4 Cube:64
Number:5 Cube:125

使用 Python ThreadPoolExecutor 类

Pythonconcurrent.futures模块的ThreadPoolExecutor类提供了一个高级接口,用于使用线程异步执行函数。concurrent.futures模块包括Future类和两个Executor类:ThreadPoolExecutorProcessPoolExecutor

Future 类

concurrent.futures.Future类负责处理任何可调用对象(例如函数)的异步执行。要获取Future对象,您应该在任何Executor对象上调用submit()方法。不应通过其构造函数直接创建它。

Future类中的重要方法有:

  • result(timeout=None):此方法返回调用返回的值。如果调用尚未完成,则此方法将等待最多timeout秒。如果调用在timeout秒内未完成,则将引发TimeoutError。如果未指定timeout,则等待时间没有限制。
  • cancel():此方法尝试取消调用。如果调用当前正在执行或已完成运行且无法取消,则该方法将返回布尔值False。否则,调用将被取消,该方法返回True。
  • cancelled():如果调用已成功取消,则返回True。
  • running():如果调用当前正在执行且无法取消,则返回True。
  • done():如果调用已成功取消或已完成运行,则返回True。

ThreadPoolExecutor 类

此类表示一个指定最大工作线程数的线程池,用于异步执行调用。

concurrent.futures.ThreadPoolExecutor(max_threads)

示例

这是一个使用 concurrent.futures.ThreadPoolExecutor 类在 Python 中异步管理和执行任务的示例。具体来说,它展示了如何将多个任务提交到线程池以及如何检查它们的执行状态。

from concurrent.futures import ThreadPoolExecutor
from time import sleep
def square(numbers):
   for val in numbers:
      ret = val*val
      sleep(1)
      print("Number:{} Square:{}".format(val, ret))
def cube(numbers):
   for val in numbers:
      ret = val*val*val
      sleep(1)
      print("Number:{} Cube:{}".format(val, ret))
if __name__ == '__main__':
   numbers = [1,2,3,4,5]
   executor = ThreadPoolExecutor(4)
   thread1 = executor.submit(square, (numbers))
   thread2 = executor.submit(cube, (numbers))
   print("Thread 1 executed ? :",thread1.done())
   print("Thread 2 executed ? :",thread2.done())
   sleep(2)
   print("Thread 1 executed ? :",thread1.done())
   print("Thread 2 executed ? :",thread2.done())

它将产生以下输出 -

Thread 1 executed ? : False
Thread 2 executed ? : False
Number:1 Square:1
Number:1 Cube:1
Thread 1 executed ? : False
Thread 2 executed ? : False
Number:2 Square:4
Number:2 Cube:8
Number:3 Square:9
Number:3 Cube:27
Number:4 Square:16
Number:4 Cube:64
Number:5 Square:25
Number:5 Cube:125
广告

© . All rights reserved.