线程间通信



在现实生活中,如果一个团队的人在做一项共同的任务,那么他们之间就应该进行沟通才能正确完成任务。同样的比喻也适用于线程。在编程中,为了减少处理器的空闲时间,我们创建多个线程并将不同的子任务分配给每个线程。因此,必须有一个通信机制,并且它们应该相互交互以同步地完成工作。

请考虑以下与线程间通信相关的要点:

  • 没有性能提升 - 如果我们无法实现线程和进程之间的正确通信,那么并发和并行带来的性能提升就毫无用处。

  • 正确完成任务 - 如果没有线程之间的适当的互通机制,则无法正确完成分配的任务。

  • 比进程间通信更高效 - 线程间通信比进程间通信更高效、更容易使用,因为同一个进程中的所有线程共享相同的地址空间,不需要使用共享内存。

Python 用于线程安全通信的数据结构

多线程代码带来了一个问题:如何在不同的线程之间传递信息。标准的通信原语无法解决这个问题。因此,我们需要实现我们自己的复合对象,以便在线程之间共享对象以使通信线程安全。以下是一些经过修改后提供线程安全通信的数据结构:

集合 (Sets)

为了以线程安全的方式使用集合数据结构,我们需要扩展集合类来实现我们自己的锁机制。

示例

这是一个扩展 Python 集合类的示例:

class extend_class(set):
   def __init__(self, *args, **kwargs):
      self._lock = Lock()
      super(extend_class, self).__init__(*args, **kwargs)

   def add(self, elem):
      self._lock.acquire()
	  try:
      super(extend_class, self).add(elem)
      finally:
      self._lock.release()
  
   def delete(self, elem):
      self._lock.acquire()
      try:
      super(extend_class, self).delete(elem)
      finally:
      self._lock.release()

在上面的示例中,定义了一个名为 **extend_class** 的类对象,它进一步继承自 Python 的 **set 类**。一个锁对象在该类的构造函数中创建。现在,有两个函数 - **add()** 和 **delete()**。这些函数被定义并且是线程安全的。它们都依赖于 **super** 类的功能,但有一个关键例外。

装饰器 (Decorator)

这是另一个用于线程安全通信的关键方法,即使用装饰器。

示例

考虑一个 Python 示例,它展示了如何使用装饰器:

def lock_decorator(method):

   def new_deco_method(self, *args, **kwargs):
      with self._lock:
         return method(self, *args, **kwargs)
return new_deco_method

class Decorator_class(set):
   def __init__(self, *args, **kwargs):
      self._lock = Lock()
      super(Decorator_class, self).__init__(*args, **kwargs)

   @lock_decorator
   def add(self, *args, **kwargs):
      return super(Decorator_class, self).add(elem)
   @lock_decorator
   def delete(self, *args, **kwargs):
      return super(Decorator_class, self).delete(elem)

在上面的示例中,定义了一个名为 lock_decorator 的装饰器方法,它进一步继承自 Python 的方法类。然后,一个锁对象在该类的构造函数中创建。现在,有两个函数 - add() 和 delete()。这些函数被定义并且是线程安全的。它们都依赖于父类的功能,但有一个关键例外。

列表 (Lists)

列表数据结构是线程安全的,对于临时的内存存储来说既快速又容易。在 CPython 中,GIL(全局解释器锁)保护了对它们的并发访问。我们知道列表是线程安全的,但它们包含的数据呢?实际上,列表的数据并没有受到保护。例如,**L.append(x)** 并不能保证在另一个线程尝试做同样的事情时返回预期结果。这是因为,虽然 **append()** 是一个原子操作并且是线程安全的,但是另一个线程正在尝试并发地修改列表的数据,因此我们可以在输出中看到竞争条件的副作用。

为了解决这类问题并安全地修改数据,我们必须实现一个适当的锁机制,这可以进一步确保多个线程不会出现竞争条件。为了实现适当的锁机制,我们可以像在前面的示例中那样扩展类。

列表上的一些其他原子操作如下:

L.append(x)
L1.extend(L2)
x = L[i]
x = L.pop()
L1[i:j] = L2
L.sort()
x = y
x.field = y
D[x] = y
D1.update(D2)
D.keys()

此处:

  • L、L1、L2 都是列表
  • D、D1、D2 是字典
  • x、y 是对象
  • i、j 是整数

队列 (Queues)

如果列表的数据没有受到保护,我们可能会面临后果。我们可能会获取或删除错误的数据项,或者出现竞争条件。这就是为什么建议使用队列数据结构的原因。队列的一个现实世界例子可以是一条单车道单行道,先进入的车辆先离开。在售票窗口和公交车站也可以看到更多现实世界的例子。

Queues

队列默认是线程安全的数据结构,我们不需要担心实现复杂的锁机制。Python 提供了模块来在我们的应用程序中使用不同类型的队列。

队列类型

在本节中,我们将学习不同类型的队列。Python 提供了三个从 **** 模块中使用的队列选项:

  • 普通队列 (FIFO,先进先出)
  • LIFO,后进先出
  • 优先级队列

我们将在后续章节中学习不同的队列。

普通队列 (FIFO,先进先出)

这是 Python 提供的最常用的队列实现。在这种排队机制中,先到者先服务。FIFO 也称为普通队列。FIFO 队列可以表示如下:

FIFO

Python FIFO 队列实现

在 Python 中,FIFO 队列可以用单线程和多线程实现。

单线程 FIFO 队列

为了用单线程实现 FIFO 队列,**Queue** 类将实现一个基本的先进先出的容器。使用 **put()** 将元素添加到序列的“一端”,使用 **get()** 从另一端删除元素。

示例

以下是使用单线程实现 FIFO 队列的 Python 程序:

import queue

q = queue.Queue()

for i in range(8):
   q.put("item-" + str(i))

while not q.empty():
   print (q.get(), end = " ")

输出

item-0 item-1 item-2 item-3 item-4 item-5 item-6 item-7

输出显示上述程序使用单线程来说明元素从队列中移除的顺序与插入顺序相同。

多线程 FIFO 队列

为了用多线程实现 FIFO,我们需要定义 myqueue() 函数,该函数从 queue 模块扩展而来。get() 和 put() 方法的工作原理与上面讨论的单线程 FIFO 队列实现中的相同。然后,为了使其成为多线程的,我们需要声明和实例化线程。这些线程将以 FIFO 方式使用队列。

示例

以下是使用多线程实现 FIFO 队列的 Python 程序:

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
   item = queue.get()
   if item is None:
   break
   print("{} removed {} from the queue".format(threading.current_thread(), item))
   queue.task_done()
   time.sleep(2)
q = queue.Queue()
for i in range(5):
   q.put(i)
threads = []
for i in range(4):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join()

输出

<Thread(Thread-3654, started 5044)> removed 0 from the queue
<Thread(Thread-3655, started 3144)> removed 1 from the queue
<Thread(Thread-3656, started 6996)> removed 2 from the queue
<Thread(Thread-3657, started 2672)> removed 3 from the queue
<Thread(Thread-3654, started 5044)> removed 4 from the queue

LIFO,后进先出队列

该队列与 FIFO(先进先出)队列使用了完全相反的类比。在这种排队机制中,最后来的人将先得到服务。这类似于实现堆栈数据结构。LIFO 队列在实现人工智能的深度优先搜索等算法时非常有用。

Python LIFO 队列实现

在 Python 中,LIFO 队列可以用单线程和多线程实现。

单线程 LIFO 队列

为了用单线程实现 LIFO 队列,**Queue** 类将使用结构 **Queue.LifoQueue** 实现一个基本的最后进入,先离开的容器。现在,调用 **put()** 时,元素添加到容器的头部,使用 **get()** 时也从头部移除。

示例

以下是使用单线程实现 LIFO 队列的 Python 程序:

import queue

q = queue.LifoQueue()

for i in range(8):
   q.put("item-" + str(i))

while not q.empty():
   print (q.get(), end=" ")
Output:
item-7 item-6 item-5 item-4 item-3 item-2 item-1 item-0

输出显示上述程序使用单线程来说明元素从队列中移除的顺序与插入顺序相反。

多线程 LIFO 队列

实现方式与我们使用多线程实现 FIFO 队列的方式类似。唯一的区别是我们需要使用 **Queue** 类,它将使用结构 **Queue.LifoQueue** 实现一个基本的最后进入,先离开的容器。

示例

以下是使用多线程实现 LIFO 队列的 Python 程序:

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
      item = queue.get()
      if item is None:
      break
	  print("{} removed {} from the queue".format(threading.current_thread(), item))
      queue.task_done()
      time.sleep(2)
q = queue.LifoQueue()
for i in range(5):
   q.put(i)
threads = []
for i in range(4):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join() 

输出

<Thread(Thread-3882, started 4928)> removed 4 from the queue
<Thread(Thread-3883, started 4364)> removed 3 from the queue
<Thread(Thread-3884, started 6908)> removed 2 from the queue
<Thread(Thread-3885, started 3584)> removed 1 from the queue
<Thread(Thread-3882, started 4928)> removed 0 from the queue

优先级队列

在 FIFO 和 LIFO 队列中,项目的顺序与插入顺序有关。但是,在许多情况下,优先级比插入顺序更重要。让我们考虑一个现实世界的例子。假设机场安检正在检查不同类别的乘客。VVIP、航空公司工作人员、海关官员等类别的乘客可能会优先检查,而不是像普通乘客那样根据到达顺序进行检查。

需要考虑的另一个重要方面是:如何开发任务调度器。一种常见的方案是根据优先级顺序服务队列中最紧急的任务。可以使用此数据结构根据项目的优先级值从队列中选取项目。

Python 优先级队列实现

在 Python 中,优先级队列可以用单线程和多线程实现。

单线程优先级队列

为了使用单线程实现优先级队列,**Queue** 类将使用结构 **Queue.PriorityQueue** 实现一个基于优先级的任务容器。现在,调用 **put()** 时,元素将添加一个值,其中值越小,优先级越高,因此使用 **get()** 时将首先检索。

示例

考虑以下使用单线程实现优先级队列的 Python 程序:

import queue as Q
p_queue = Q.PriorityQueue()

p_queue.put((2, 'Urgent'))
p_queue.put((1, 'Most Urgent'))
p_queue.put((10, 'Nothing important'))
prio_queue.put((5, 'Important'))

while not p_queue.empty():
   item = p_queue.get()
   print('%s - %s' % item)

输出

1 – Most Urgent
2 - Urgent
5 - Important
10 – Nothing important

在上面的输出中,我们可以看到队列已根据优先级存储项目——值越小,优先级越高。

多线程优先级队列

实现方式与使用多线程实现 FIFO 和 LIFO 队列的方式类似。唯一的区别是我们需要使用 **Queue** 类来使用结构 **Queue.PriorityQueue** 初始化优先级。另一个区别在于队列的生成方式。在下面给出的示例中,它将使用两个相同的数据集生成。

示例

以下 Python 程序有助于使用多线程实现优先级队列:

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
      item = queue.get()
      if item is None:
      break
      print("{} removed {} from the queue".format(threading.current_thread(), item))
      queue.task_done()
      time.sleep(1)
q = queue.PriorityQueue()
for i in range(5):
   q.put(i,1)

for i in range(5):
   q.put(i,1)

threads = []
for i in range(2):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join()

输出

<Thread(Thread-4939, started 2420)> removed 0 from the queue
<Thread(Thread-4940, started 3284)> removed 0 from the queue
<Thread(Thread-4939, started 2420)> removed 1 from the queue
<Thread(Thread-4940, started 3284)> removed 1 from the queue
<Thread(Thread-4939, started 2420)> removed 2 from the queue
<Thread(Thread-4940, started 3284)> removed 2 from the queue
<Thread(Thread-4939, started 2420)> removed 3 from the queue
<Thread(Thread-4940, started 3284)> removed 3 from the queue
<Thread(Thread-4939, started 2420)> removed 4 from the queue
<Thread(Thread-4940, started 3284)> removed 4 from the queue
广告