如何使用 Python 实现多线程队列
简介..
在这个例子中,我们将创建一个任务队列,它保存所有要执行的任务,以及一个与队列交互以单独处理其元素的线程池。
我们将从问题开始,什么是队列?队列是一种数据结构,它是一组以非常特定的顺序维护的不同元素的集合。让我举一个现实生活中的例子来解释。
假设你在杂货店柜台排队支付杂货账单(别问我哪家杂货店)
在排队等候支付账单的人群中,你会注意到以下几点
1. 人们从队列的一端进入,从另一端离开。
2. 如果 A 先生在 B 先生之前进入队列,则 A 先生会在 B 先生之前离开队列(除非 B 先生是名人或有更高的优先级)。
3. 一旦每个人都支付了账单,队列中将不再有人。
好了,回到编程中,队列的工作方式与此类似。
1. 入队 - 元素添加到队列的末尾。
2. 出队 - 从队列的开头删除元素。
还有更多,先进先出 (FIFO) - 最先添加的元素将最先被删除。后进先出 (LIFO) - 最后添加的元素将最先被删除。
Python 如何实现队列数据结构?
Python 中的 queue 模块提供了队列数据结构的简单实现。每个队列可以具有以下方法。
get(): 返回下一个元素。
put(): 添加一个新元素。
qsize(): 队列中当前元素的数量。
empty(): 返回一个布尔值,指示队列是否为空。
full(): 返回一个布尔值,指示队列是否已满。
1. 我们将创建一个函数,它接受一个参数 x,然后迭代 1 到自身 (x) 之间的数字,以执行乘法。例如,当你将 5 传递给此函数时,它会迭代 1 到 5 并继续进行乘法,即 1 乘以 5,2 乘以 5,3 乘以 5,4 乘以 5,5 乘以 5,最终将值作为列表返回。
示例
def print_multiply(x): output_value = [] for i in range(1, x + 1): output_value.append(i * x) print(f"Output \n *** The multiplication result for the {x} is - {output_value}") print_multiply(5)
输出
*** The multiplication result for the 5 is - [5, 10, 15, 20, 25]
2. 我们将编写另一个名为 process_queue() 的函数,该函数将尝试获取队列对象的下一个元素。此逻辑非常简单,继续传递元素,直到队列为空。我将使用 sleep 来稍微延迟处理。
示例
def process_queue(): while True: try: value = my_queue.get(block=False) except queue.Empty: return else: print_multiply(value) time.sleep(2)
3. 创建一个类,当初始化并启动新实例时,将调用 process_queue() 函数。
示例
class MultiThread(threading.Thread): def __init__(self, name): threading.Thread.__init__(self) self.name = name def run(self): print(f" ** Starting the thread - {self.name}") process_queue() print(f" ** Completed the thread - {self.name}")
4. 最后,我们将传递输入数字列表并填充队列。
# setting up variables input_values = [5, 10, 15, 20] # fill the queue my_queue = queue.Queue() for x in input_values: my_queue.put(x)
5. 最后,将所有内容放在一起。
import queue import threading import time # Class class MultiThread(threading.Thread): def __init__(self, name): threading.Thread.__init__(self) self.name = name def run(self): print(f"Output \n ** Starting the thread - {self.name}") process_queue() print(f" ** Completed the thread - {self.name}") # Process thr queue def process_queue(): while True: try: value = my_queue.get(block=False) except queue.Empty: return else: print_multiply(value) time.sleep(2) # function to multiply def print_multiply(x): output_value = [] for i in range(1, x + 1): output_value.append(i * x) print(f" \n *** The multiplication result for the {x} is - {output_value}") # Input variables input_values = [2, 4, 6, 5,10,3] # fill the queue my_queue = queue.Queue() for x in input_values: my_queue.put(x) # initializing and starting 3 threads thread1 = MultiThread('First') thread2 = MultiThread('Second') thread3 = MultiThread('Third') thread4 = MultiThread('Fourth') # Start the threads thread1.start() thread2.start() thread3.start() thread4.start() # Join the threads thread1.join() thread2.join() thread3.join() thread4.join()
输出
** Starting the thread - First *** The multiplication result for the 2 is - [2, 4]
输出
** Starting the thread - Second *** The multiplication result for the 4 is - [4, 8, 12, 16]
输出
** Starting the thread - Third *** The multiplication result for the 6 is - [6, 12, 18, 24, 30, 36]
输出
** Starting the thread - Fourth *** The multiplication result for the 5 is - [5, 10, 15, 20, 25] *** The multiplication result for the 10 is - [10, 20, 30, 40, 50, 60, 70, 80, 90, 100] *** The multiplication result for the 3 is - [3, 6, 9] ** Completed the thread - Third ** Completed the thread - Fourth ** Completed the thread - Second ** Completed the thread - First
6. 我们已成功实现了队列概念。看,我们有 4 个线程,但有 6 个值要处理,所以谁先到达队列就会被执行,其他线程将排队等待其他人完成。
这类似于现实生活,假设有 3 个柜台,但有 10 个人在等候支付账单,因此 10 个人将在 3 个队列中,谁先支付完账单就会离开队列,为下一个人腾出位置。