如何使用 Python 实现多线程队列


简介..

在这个例子中,我们将创建一个任务队列,它保存所有要执行的任务,以及一个与队列交互以单独处理其元素的线程池。

我们将从问题开始,什么是队列?队列是一种数据结构,它是一组以非常特定的顺序维护的不同元素的集合。让我举一个现实生活中的例子来解释。

假设你在杂货店柜台排队支付杂货账单(别问我哪家杂货店)

在排队等候支付账单的人群中,你会注意到以下几点

1. 人们从队列的一端进入,从另一端离开。

2. 如果 A 先生在 B 先生之前进入队列,则 A 先生会在 B 先生之前离开队列(除非 B 先生是名人或有更高的优先级)。

3. 一旦每个人都支付了账单,队列中将不再有人。

好了,回到编程中,队列的工作方式与此类似。

1. 入队 - 元素添加到队列的末尾。

2. 出队 - 从队列的开头删除元素。

还有更多,先进先出 (FIFO) - 最先添加的元素将最先被删除。后进先出 (LIFO) - 最后添加的元素将最先被删除。

Python 如何实现队列数据结构?

Python 中的 queue 模块提供了队列数据结构的简单实现。每个队列可以具有以下方法。

  • get(): 返回下一个元素。

  • put(): 添加一个新元素。

  • qsize(): 队列中当前元素的数量。

  • empty(): 返回一个布尔值,指示队列是否为空。

  • full(): 返回一个布尔值,指示队列是否已满。

1. 我们将创建一个函数,它接受一个参数 x,然后迭代 1 到自身 (x) 之间的数字,以执行乘法。例如,当你将 5 传递给此函数时,它会迭代 1 到 5 并继续进行乘法,即 1 乘以 5,2 乘以 5,3 乘以 5,4 乘以 5,5 乘以 5,最终将值作为列表返回。

示例

def print_multiply(x):
output_value = []
for i in range(1, x + 1):
output_value.append(i * x)
print(f"Output \n *** The multiplication result for the {x} is - {output_value}")
print_multiply(5)

输出

*** The multiplication result for the 5 is - [5, 10, 15, 20, 25]

2. 我们将编写另一个名为 process_queue() 的函数,该函数将尝试获取队列对象的下一个元素。此逻辑非常简单,继续传递元素,直到队列为空。我将使用 sleep 来稍微延迟处理。

示例

def process_queue():
while True:
try:
value = my_queue.get(block=False)
except queue.Empty:
return
else:
print_multiply(value)
time.sleep(2)

3. 创建一个类,当初始化并启动新实例时,将调用 process_queue() 函数。

示例

class MultiThread(threading.Thread):
def __init__(self, name):
threading.Thread.__init__(self)
self.name = name

def run(self):
print(f" ** Starting the thread - {self.name}")
process_queue()
print(f" ** Completed the thread - {self.name}")

4. 最后,我们将传递输入数字列表并填充队列。

# setting up variables
input_values = [5, 10, 15, 20]

# fill the queue
my_queue = queue.Queue()
for x in input_values:
my_queue.put(x)

5. 最后,将所有内容放在一起。

import queue
import threading
import time

# Class
class MultiThread(threading.Thread):
def __init__(self, name):
threading.Thread.__init__(self)
self.name = name

def run(self):
print(f"Output \n ** Starting the thread - {self.name}")
process_queue()
print(f" ** Completed the thread - {self.name}")

# Process thr queue
def process_queue():
while True:
try:
value = my_queue.get(block=False)
except queue.Empty:
return
else:
print_multiply(value)
time.sleep(2)

# function to multiply
def print_multiply(x):
output_value = []
for i in range(1, x + 1):
output_value.append(i * x)
print(f" \n *** The multiplication result for the {x} is - {output_value}")

# Input variables
input_values = [2, 4, 6, 5,10,3]

# fill the queue
my_queue = queue.Queue()
for x in input_values:
my_queue.put(x)
# initializing and starting 3 threads
thread1 = MultiThread('First')
thread2 = MultiThread('Second')
thread3 = MultiThread('Third')
thread4 = MultiThread('Fourth')

# Start the threads
thread1.start()
thread2.start()
thread3.start()
thread4.start()

# Join the threads
thread1.join()
thread2.join()
thread3.join()
thread4.join()

输出

** Starting the thread - First
*** The multiplication result for the 2 is - [2, 4]

输出

** Starting the thread - Second
*** The multiplication result for the 4 is - [4, 8, 12, 16]

输出

** Starting the thread - Third
*** The multiplication result for the 6 is - [6, 12, 18, 24, 30, 36]

输出

** Starting the thread - Fourth
*** The multiplication result for the 5 is - [5, 10, 15, 20, 25]
*** The multiplication result for the 10 is - [10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
*** The multiplication result for the 3 is - [3, 6, 9] ** Completed the thread - Third
** Completed the thread - Fourth
** Completed the thread - Second ** Completed the thread - First

6. 我们已成功实现了队列概念。看,我们有 4 个线程,但有 6 个值要处理,所以谁先到达队列就会被执行,其他线程将排队等待其他人完成。

这类似于现实生活,假设有 3 个柜台,但有 10 个人在等候支付账单,因此 10 个人将在 3 个队列中,谁先支付完账单就会离开队列,为下一个人腾出位置。

更新于: 2020-11-10

3K+ 浏览量

开启你的 职业生涯

通过完成课程获得认证

开始学习
广告