Python 中的进程同步和池化
进程之间的同步
Multiprocessing 是一个支持使用 API 生成进程的包。此包用于本地和远程并发。使用此模块,程序员可以在给定机器上使用多个处理器。它运行在 Windows 和 UNIX 操作系统上。
此包中提供了所有等效的同步原语。
示例代码
from multiprocessing import Process, Lock def my_function(x, y): x.acquire() print ('hello world', y) x.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target= my_function, args=(lock, num)).start()
这里一个实例可以锁定以确保一次只有一个进程可以显示标准输出。
池化
对于池化,我们使用 Pool 类。当一个人可以创建一个进程池来承载提交给它的所有任务时。
class multiprocessing.Pool([processes[, initializer[, initargs[, maxtasksperchild]]]])
池对象控制一个工作程序池,以选择可以提交哪些作业,并且它支持具有超时、回调和并行映射实现的异步结果。
如果进程为 None,则使用 cpu_count(),如果初始化程序不为 None,则此函数调用 initializer(*initargs)。
apply(func[, args[, kwds]])
这与 apply() 内置函数相同。它会阻塞直到结果准备就绪,如果它希望并行执行,则 apply_async() 方法更好。
apply_async(func[, args[, kwds[, callback]]])
返回结果对象。
map(func, iterable [, chunksize])
map() 是一个内置函数,它只支持一个可迭代参数。它会阻塞直到结果准备就绪。
在此方法中,可迭代对象被分成多个小块,这些小块作为单独的任务提交到进程池。
map_async(func, iterable[, chunksize[, callback]])
返回结果对象。
imap(func, iterable[, chunksize])
它与 itertools.imap() 相同。
参数的大小与 map() 中使用的大小相同。
imap_unordered(func, iterable[, chunksize])
这与 imap() 相同,只是返回的迭代器不必按顺序排列。
close()
当工作程序完成所有任务后,工作程序退出进程。
terminate()
如果我们希望立即停止工作程序进程而不完成任务,则使用此方法。
join()
在使用 join() 方法之前,我们必须使用 close() 和 terminate() 函数。
class multiprocessing.pool.AsyncResult
由 Pool.apply_async() 和 Pool.map_async() 返回。
get([timeout])
此函数在结果到达时返回结果。
wait([timeout])
使用此 wait 函数,我们等待结果可用或直到超时秒过去。
ready()
此函数返回调用是否已完成。
successful()
此函数在调用在没有任何错误的情况下完成时返回。
示例代码
# -*- coding: utf-8 -*- """ Created on Sun Sep 30 12:17:58 2018 @author: Tutorials Point """ from multiprocessing import Pool import time def myfunction(m): return m*m if __name__ == '__main__': my_pool = Pool(processes=4) # start 4 worker processes result = my_pool.apply_async(myfunction, (10,)) # evaluate "f(10)" asynchronously in a single process print (result.get(timeout=1)) print (my_pool.map(myfunction, range(10))) # prints "[0, 1, 4,..., 81]" my_it = my_pool.imap(myfunction, range(10)) print (my_it.next() ) # prints "0" print (my_it.next() ) # prints "1" print (my_it.next(timeout=1) ) # prints "4" unless your computer is *very* slow result = my_pool.apply_async(time.sleep, (10,)) print (result.get(timeout=1) ) # raises multiprocessing.TimeoutError