线程同步
线程同步可以定义为一种方法,通过这种方法,我们可以确保两个或多个并发线程不会同时访问称为临界区的程序段。另一方面,众所周知,临界区是程序中访问共享资源的部分。因此,我们可以说同步是确保两个或多个线程不会通过同时访问资源来相互干扰的过程。下图显示了四个线程试图同时访问程序的临界区。
为了更清楚地说明,假设两个或多个线程试图同时在列表中添加对象。此操作无法成功结束,因为它要么会丢弃一个或所有对象,要么会完全破坏列表的状态。这里的同步作用是,一次只能有一个线程访问列表。
线程同步中的问题
在实现并发编程或应用同步原语时,我们可能会遇到问题。在本节中,我们将讨论两个主要问题。这些问题是:
- 死锁
- 竞争条件
竞争条件
这是并发编程中的一个主要问题。对共享资源的并发访问可能导致竞争条件。竞争条件可以定义为当两个或多个线程可以访问共享数据并尝试同时更改其值时发生的情况。由于此原因,变量的值可能不可预测,并且会根据进程上下文切换的时机而有所不同。
示例
考虑以下示例以了解竞争条件的概念:
步骤 1 - 在此步骤中,我们需要导入 threading 模块 -
import threading
步骤 2 - 现在,定义一个全局变量,例如 x,以及其值为 0 -
x = 0
步骤 3 - 现在,我们需要定义 increment_global() 函数,它将在该全局函数 x 中递增 1 -
def increment_global(): global x x += 1
步骤 4 - 在此步骤中,我们将定义 taskofThread() 函数,它将调用 increment_global() 函数指定次数;对于我们的示例,为 50000 次 -
def taskofThread(): for _ in range(50000): increment_global()
步骤 5 - 现在,定义 main() 函数,其中创建线程 t1 和 t2。两者都将使用 start() 函数启动,并使用 join() 函数等待它们完成工作。
def main(): global x x = 0 t1 = threading.Thread(target= taskofThread) t2 = threading.Thread(target= taskofThread) t1.start() t2.start() t1.join() t2.join()
步骤 6 - 现在,我们需要指定范围,即我们希望调用 main() 函数多少次。在这里,我们调用它 5 次。
if __name__ == "__main__": for i in range(5): main() print("x = {1} after Iteration {0}".format(i,x))
在下面显示的输出中,我们可以看到竞争条件的影响,因为每次迭代后 x 的值预期为 100000。但是,值存在很大差异。这是由于线程对共享全局变量 x 的并发访问造成的。
输出
x = 100000 after Iteration 0 x = 54034 after Iteration 1 x = 80230 after Iteration 2 x = 93602 after Iteration 3 x = 93289 after Iteration 4
使用锁处理竞争条件
正如我们在上面的程序中看到了竞争条件的影响,我们需要一个同步工具来处理多个线程之间的竞争条件。在 Python 中,<threading> 模块提供 Lock 类来处理竞争条件。此外,Lock 类提供了不同的方法,我们可以通过这些方法来处理多个线程之间的竞争条件。这些方法描述如下:
acquire() 方法
此方法用于获取,即阻塞锁。锁可以是阻塞的或非阻塞的,具体取决于以下真或假值:
将值设置为 True - 如果使用 True 调用 acquire() 方法(这是默认参数),则线程执行将被阻塞,直到锁被解锁。
将值设置为 False - 如果使用 False 调用 acquire() 方法(这不是默认参数),则线程执行不会被阻塞,直到它被设置为 true,即直到它被锁定。
release() 方法
此方法用于释放锁。以下是一些与此方法相关的重要的任务:
如果锁已锁定,则 release() 方法将解锁它。它的作用是在多个线程被阻塞并等待锁解锁时,允许恰好一个线程继续执行。
如果锁已解锁,它将引发 ThreadError。
现在,我们可以使用 Lock 类及其方法重写上面的程序,以避免竞争条件。我们需要使用 lock 参数定义 taskofThread() 方法,然后需要使用 acquire() 和 release() 方法来阻塞和非阻塞锁,以避免竞争条件。
示例
以下是 Python 程序的示例,用于了解处理竞争条件的锁的概念:
import threading x = 0 def increment_global(): global x x += 1 def taskofThread(lock): for _ in range(50000): lock.acquire() increment_global() lock.release() def main(): global x x = 0 lock = threading.Lock() t1 = threading.Thread(target = taskofThread, args = (lock,)) t2 = threading.Thread(target = taskofThread, args = (lock,)) t1.start() t2.start() t1.join() t2.join() if __name__ == "__main__": for i in range(5): main() print("x = {1} after Iteration {0}".format(i,x))
以下输出显示竞争条件的影响被忽略了;因为每次迭代后 x 的值现在都是 100000,这符合该程序的预期。
输出
x = 100000 after Iteration 0 x = 100000 after Iteration 1 x = 100000 after Iteration 2 x = 100000 after Iteration 3 x = 100000 after Iteration 4
死锁 - 哲学家就餐问题
死锁是设计并发系统时可能遇到的一个麻烦问题。我们可以通过哲学家就餐问题来说明这个问题,如下所示:
Edsger Dijkstra 最初提出了哲学家就餐问题,这是并发系统最大问题之一的著名示例,称为死锁。
在这个问题中,有五位著名的哲学家围坐在一张圆桌旁,从他们的碗里吃一些食物。有五把叉子可以供五位哲学家使用来吃饭。但是,哲学家决定同时使用两把叉子来吃饭。
现在,哲学家有两个主要条件。首先,每个哲学家可以处于吃饭状态或思考状态,其次,他们必须首先获得两把叉子,即左边和右边。当所有五位哲学家都设法同时拿起左边的叉子时,问题就出现了。现在他们都在等待右边的叉子空闲,但他们永远不会放弃他们的叉子,直到他们吃完饭,而右边的叉子永远不会可用。因此,餐桌上会出现死锁状态。
并发系统中的死锁
现在如果我们看到,同样的问题也可能出现在我们的并发系统中。上面示例中的叉子将是系统资源,每个哲学家可以表示正在竞争获取资源的进程。
Python 程序解决方案
这个问题的解决方案可以通过将哲学家分成两种类型来找到:贪婪哲学家和慷慨哲学家。主要是一个贪婪的哲学家会试图拿起左边的叉子并等待它在那里。然后,他将等待右边的叉子在那里,拿起它,吃掉它,然后放下它。另一方面,一个慷慨的哲学家会试图拿起左边的叉子,如果它不在那里,他将等待并在一段时间后再次尝试。如果他们得到左边的叉子,他们将尝试得到右边的叉子。如果他们也得到右边的叉子,他们将吃饭并释放两个叉子。但是,如果他们没有得到右边的叉子,他们将释放左边的叉子。
示例
以下 Python 程序将帮助我们找到哲学家就餐问题的解决方案:
import threading import random import time class DiningPhilosopher(threading.Thread): running = True def __init__(self, xname, Leftfork, Rightfork): threading.Thread.__init__(self) self.name = xname self.Leftfork = Leftfork self.Rightfork = Rightfork def run(self): while(self.running): time.sleep( random.uniform(3,13)) print ('%s is hungry.' % self.name) self.dine() def dine(self): fork1, fork2 = self.Leftfork, self.Rightfork while self.running: fork1.acquire(True) locked = fork2.acquire(False) if locked: break fork1.release() print ('%s swaps forks' % self.name) fork1, fork2 = fork2, fork1 else: return self.dining() fork2.release() fork1.release() def dining(self): print ('%s starts eating '% self.name) time.sleep(random.uniform(1,10)) print ('%s finishes eating and now thinking.' % self.name) def Dining_Philosophers(): forks = [threading.Lock() for n in range(5)] philosopherNames = ('1st','2nd','3rd','4th', '5th') philosophers= [DiningPhilosopher(philosopherNames[i], forks[i%5], forks[(i+1)%5]) \ for i in range(5)] random.seed() DiningPhilosopher.running = True for p in philosophers: p.start() time.sleep(30) DiningPhilosopher.running = False print (" It is finishing.") Dining_Philosophers()
上面的程序使用了贪婪和慷慨哲学家的概念。该程序还使用了 <threading> 模块的 Lock 类的 acquire() 和 release() 方法。我们可以在以下输出中看到解决方案:
输出
4th is hungry. 4th starts eating 1st is hungry. 1st starts eating 2nd is hungry. 5th is hungry. 3rd is hungry. 1st finishes eating and now thinking.3rd swaps forks 2nd starts eating 4th finishes eating and now thinking. 3rd swaps forks5th starts eating 5th finishes eating and now thinking. 4th is hungry. 4th starts eating 2nd finishes eating and now thinking. 3rd swaps forks 1st is hungry. 1st starts eating 4th finishes eating and now thinking. 3rd starts eating 5th is hungry. 5th swaps forks 1st finishes eating and now thinking. 5th starts eating 2nd is hungry. 2nd swaps forks 4th is hungry. 5th finishes eating and now thinking. 3rd finishes eating and now thinking. 2nd starts eating 4th starts eating It is finishing.