Python - 线程间通信



线程间通信是指在 Python 多线程程序中启用线程之间通信和同步的过程。

通常,Python 中的线程共享进程中的同一内存空间,这允许它们通过共享变量、对象和threading模块提供的专用同步机制来交换数据并协调其活动。

为了促进线程间通信,threading 模块提供了各种同步原语,例如锁、事件、条件和信号量对象。在本教程中,您将学习如何使用 Event 和 Condition 对象在多线程程序中实现线程间的通信。

Event 对象

Event 对象管理内部标志的状态,以便线程可以等待或设置。Event对象提供控制此标志状态的方法,允许线程基于共享条件同步其活动。

该标志最初为假,使用 set() 方法设置为真,使用 clear() 方法重置为假。wait() 方法会阻塞,直到标志为真。

以下是Event对象的关键方法:

  • is_set():当且仅当内部标志为真时返回 True。
  • set():将内部标志设置为真。所有等待它变为真的线程都会被唤醒。一旦标志为真,调用 wait() 的线程根本不会阻塞。
  • clear():将内部标志重置为假。随后,调用 wait() 的线程将阻塞,直到调用 set() 将内部标志再次设置为真。
  • wait(timeout=None):阻塞直到内部标志为真。如果在进入时内部标志为真,则立即返回。否则,阻塞直到另一个线程调用 set() 将标志设置为真,或直到可选超时发生。当存在 timeout 参数且不为 None 时,它应该是一个浮点数,以秒为单位指定操作的超时时间。

示例

以下代码尝试模拟由交通信号灯状态(绿色或红色)控制的交通流量。

程序中有两个线程,它们分别针对两个不同的函数。signal_state() 函数定期设置和重置事件,表示信号从绿色变为红色。

traffic_flow() 函数等待事件被设置,并在事件保持设置状态时运行循环。

from threading import Event, Thread
import time

terminate = False

def signal_state():
    global terminate
    while not terminate:
        time.sleep(0.5)
        print("Traffic Police Giving GREEN Signal")
        event.set()
        time.sleep(1)
        print("Traffic Police Giving RED Signal")
        event.clear()

def traffic_flow():
    global terminate
    num = 0
    while num < 10 and not terminate:
        print("Waiting for GREEN Signal")
        event.wait()
        print("GREEN Signal ... Traffic can move")
        while event.is_set() and not terminate:
            num += 1
            print("Vehicle No:", num," Crossing the Signal")
            time.sleep(1)
        print("RED Signal ... Traffic has to wait")

event = Event()
t1 = Thread(target=signal_state)
t2 = Thread(target=traffic_flow)
t1.start()
t2.start()

# Terminate the threads after some time
time.sleep(5)
terminate = True

# join all threads to complete
t1.join()
t2.join()

print("Exiting Main Thread")

输出

执行上述代码后,您将获得以下输出:

Waiting for GREEN Signal
Traffic Police Giving GREEN Signal
GREEN Signal ... Traffic can move
Vehicle No: 1  Crossing the Signal
Traffic Police Giving RED Signal
RED Signal ... Traffic has to wait
Waiting for GREEN Signal
Traffic Police Giving GREEN Signal
GREEN Signal ... Traffic can move
Vehicle No: 2  Crossing the Signal
Vehicle No: 3  Crossing the Signal
Traffic Police Giving RED Signal
Traffic Police Giving GREEN Signal
Vehicle No: 4  Crossing the Signal
Traffic Police Giving RED Signal
RED Signal ... Traffic has to wait
Traffic Police Giving GREEN Signal
Traffic Police Giving RED Signal
Exiting Main Thread

Condition 对象

Python 的threading模块中的 Condition 对象提供了一种更高级的同步机制。它允许线程在继续执行之前等待来自另一个线程的通知。Condition 对象始终与锁相关联,并提供线程间信号机制。

以下是 threading.Condition() 类的语法:

threading.Condition(lock=None)

以下是 Condition 对象的关键方法:

  • acquire(*args):获取底层锁。此方法调用底层锁上的相应方法;返回值是该方法返回的任何内容。
  • release():释放底层锁。此方法调用底层锁上的相应方法;没有返回值。
  • wait(timeout=None): 此方法释放底层锁,然后阻塞,直到另一个线程对同一个条件变量调用 notify() 或 notify_all() 方法唤醒它,或者直到可选的超时发生。一旦被唤醒或超时,它会重新获取锁并返回。
  • wait_for(predicate, timeout=None): 此实用程序方法可能会重复调用 wait(),直到满足谓词或发生超时。返回值是谓词的最后返回值,如果方法超时则将评估为 False。
  • notify(n=1): 此方法最多唤醒 n 个等待条件变量的线程;如果没有任何线程正在等待,则此方法什么也不做。
  • notify_all(): 唤醒所有等待此条件的线程。此方法类似于 notify(),但它会唤醒所有等待线程,而不是一个线程。如果调用线程在调用此方法时未获取锁,则会引发 RuntimeError。

示例

此示例演示了使用 Python threading 模块的 Condition 对象进行线程间通信的一种简单形式。这里 thread_athread_b 使用 Condition 对象进行通信,thread_a 等待直到收到来自 thread_b 的通知。thread_b 休眠 2 秒后通知 thread_a,然后结束。

from threading import Condition, Thread
import time

c = Condition()

def thread_a():
    print("Thread A started")
    with c:
        print("Thread A waiting for permission...")
        c.wait()
        print("Thread A got permission!")
    print("Thread A finished")

def thread_b():
    print("Thread B started")
    with c:
        time.sleep(2)
        print("Notifying Thread A...")
        c.notify()
    print("Thread B finished")

Thread(target=thread_a).start()
Thread(target=thread_b).start()

输出

执行上述代码后,您将获得以下输出:

Thread A started
Thread A waiting for permission...
Thread B started
Notifying Thread A...
Thread B finished
Thread A got permission!
Thread A finished

示例

这是另一个代码示例,演示了如何使用 Condition 对象在线程之间进行通信。在这个例子中,线程 t2 运行 taskB() 函数,线程 t1 运行 taskA() 函数。t1 线程获取条件并发出通知。

此时,t2 线程处于等待状态。条件释放后,等待线程继续使用通知函数生成的随机数。

from threading import Condition, Thread
import time
import random

numbers = []

def taskA(c):
    for _ in range(5):
        with c:
            num = random.randint(1, 10)
            print("Generated random number:", num)
            numbers.append(num)
            print("Notification issued")
            c.notify()
        time.sleep(0.3)

def taskB(c):
    for i in range(5):
        with c:
            print("waiting for update")
            while not numbers: 
                c.wait()
            print("Obtained random number", numbers.pop())
        time.sleep(0.3)

c = Condition()
t1 = Thread(target=taskB, args=(c,))
t2 = Thread(target=taskA, args=(c,))
t1.start()
t2.start()
t1.join()
t2.join()
print("Done")

执行此代码时,将产生以下输出

waiting for update
Generated random number: 2
Notification issued
Obtained random number 2
Generated random number: 5
Notification issued
waiting for update
Obtained random number 5
Generated random number: 1
Notification issued
waiting for update
Obtained random number 1
Generated random number: 9
Notification issued
waiting for update
Obtained random number 9
Generated random number: 2
Notification issued
waiting for update
Obtained random number 2
Done
广告