Python 中的进程同步和池化


进程之间的同步

Multiprocessing 是一个支持使用 API 生成进程的包。此包用于本地和远程并发。使用此模块,程序员可以在给定机器上使用多个处理器。它运行在 Windows 和 UNIX 操作系统上。

此包中提供了所有等效的同步原语。

示例代码

from multiprocessing import Process, Lock
   def my_function(x, y):
      x.acquire()
      print ('hello world', y)
      x.release()
      if __name__ == '__main__':
      lock = Lock()
   for num in range(10):
Process(target= my_function, args=(lock, num)).start()

这里一个实例可以锁定以确保一次只有一个进程可以显示标准输出。

池化

对于池化,我们使用 Pool 类。当一个人可以创建一个进程池来承载提交给它的所有任务时。

class multiprocessing.Pool([processes[, initializer[, initargs[, maxtasksperchild]]]])

池对象控制一个工作程序池,以选择可以提交哪些作业,并且它支持具有超时、回调和并行映射实现的异步结果。

如果进程为 None,则使用 cpu_count(),如果初始化程序不为 None,则此函数调用 initializer(*initargs)。

apply(func[, args[, kwds]])

这与 apply() 内置函数相同。它会阻塞直到结果准备就绪,如果它希望并行执行,则 apply_async() 方法更好。

apply_async(func[, args[, kwds[, callback]]])

返回结果对象。

map(func, iterable [, chunksize])

map() 是一个内置函数,它只支持一个可迭代参数。它会阻塞直到结果准备就绪。

在此方法中,可迭代对象被分成多个小块,这些小块作为单独的任务提交到进程池。

map_async(func, iterable[, chunksize[, callback]])

返回结果对象。

imap(func, iterable[, chunksize])

它与 itertools.imap() 相同。

参数的大小与 map() 中使用的大小相同。

imap_unordered(func, iterable[, chunksize])

这与 imap() 相同,只是返回的迭代器不必按顺序排列。

close()

当工作程序完成所有任务后,工作程序退出进程。

terminate()

如果我们希望立即停止工作程序进程而不完成任务,则使用此方法。

join()

在使用 join() 方法之前,我们必须使用 close() 和 terminate() 函数。

class multiprocessing.pool.AsyncResult

由 Pool.apply_async() 和 Pool.map_async() 返回。

get([timeout])

此函数在结果到达时返回结果。

wait([timeout])

使用此 wait 函数,我们等待结果可用或直到超时秒过去。

ready()

此函数返回调用是否已完成。

successful()

此函数在调用在没有任何错误的情况下完成时返回。

示例代码

# -*- coding: utf-8 -*-
"""
Created on Sun Sep 30 12:17:58 2018
@author: Tutorials Point
"""
from multiprocessing import Pool
import time
def myfunction(m):
return m*m
if __name__ == '__main__':
my_pool = Pool(processes=4) # start 4 worker processes
result = my_pool.apply_async(myfunction, (10,)) # evaluate "f(10)" asynchronously in a single process
print (result.get(timeout=1))
print (my_pool.map(myfunction, range(10))) # prints "[0, 1, 4,..., 81]"
my_it = my_pool.imap(myfunction, range(10))
print (my_it.next() ) # prints "0"
print (my_it.next() ) # prints "1"
print (my_it.next(timeout=1) ) # prints "4" unless your computer is *very* slow
result = my_pool.apply_async(time.sleep, (10,))
print (result.get(timeout=1) ) # raises multiprocessing.TimeoutError

更新于: 2020-06-26

627 次查看

启动您的 职业生涯

通过完成课程获得认证

开始
广告