RxPY - 使用调度器实现并发
RxPY 的一个重要特性是并发性,即允许任务并行执行。为此,我们有两个运算符 `subscribe_on()` 和 `observe_on()`,它们与调度器一起工作,调度器将决定订阅任务的执行。
这是一个展示 `subscribe_on()`、`observe_on()` 和调度器需求的工作示例。
示例
import random
import time
import rx
from rx import operators as ops
def adding_delay(value):
time.sleep(random.randint(5, 20) * 0.1)
return value
# Task 1
rx.of(1,2,3,4,5).pipe(
ops.map(lambda a: adding_delay(a))
).subscribe(
lambda s: print("From Task 1: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a))
).subscribe(
lambda s: print("From Task 2: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 2 complete")
)
input("Press any key to exit\n")
在上面的示例中,我有两个任务:任务 1 和任务 2。任务的执行是顺序的。只有在第一个任务完成后,第二个任务才会开始。
输出
E:\pyrx>python testrx.py From Task 1: 1 From Task 1: 2 From Task 1: 3 From Task 1: 4 From Task 1: 5 Task 1 complete From Task 2: 1 From Task 2: 2 From Task 2: 3 From Task 2: 4 Task 2 complete
RxPY 支持许多调度器,这里我们将使用 `ThreadPoolScheduler`。`ThreadPoolScheduler` 主要尝试管理可用的 CPU 线程。
在前面看到的示例中,我们将使用一个多处理模块,该模块将提供 `cpu_count`。该计数将提供给 `ThreadPoolScheduler`,它将根据可用的线程来管理并行执行任务。
这是一个工作示例:
import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
time.sleep(random.randint(5, 20) * 0.1)
return value
# Task 1
rx.of(1,2,3,4,5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 1: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 2: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 2 complete")
)
input("Press any key to exit\n")
在上面的示例中,我有 2 个任务,`cpu_count` 为 4。由于任务数为 2,而我们可用的线程数为 4,因此两个任务可以并行启动。
输出
E:\pyrx>python testrx.py Cpu count is : 4 Press any key to exit From Task 1: 1 From Task 2: 1 From Task 1: 2 From Task 2: 2 From Task 2: 3 From Task 1: 3 From Task 2: 4 Task 2 complete From Task 1: 4 From Task 1: 5 Task 1 complete
如果查看输出,则两个任务已并行启动。
现在,考虑一个任务数超过 CPU 核心数的情况,例如 CPU 核心数为 4,任务数为 5。在这种情况下,我们需要检查是否有任何线程在任务完成后空闲,以便可以将其分配给队列中可用的新任务。
为此,我们可以使用 `observe_on()` 运算符,它将观察调度器是否有任何空闲线程。这是一个使用 `observe_on()` 的工作示例。
示例
import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
time.sleep(random.randint(5, 20) * 0.1)
return value
# Task 1
rx.of(1,2,3,4,5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 1: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 2: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 2 complete")
)
#Task 3
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 3: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 3 complete")
)
#Task 4
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 4: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 4 complete")
)
#Task 5
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.observe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 5: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 5 complete")
)
input("Press any key to exit\n")
输出
E:\pyrx>python testrx.py Cpu count is : 4 From Task 4: 1 From Task 4: 2 From Task 1: 1 From Task 2: 1 From Task 3: 1 From Task 1: 2 From Task 3: 2 From Task 4: 3 From Task 3: 3 From Task 2: 2 From Task 1: 3 From Task 4: 4 Task 4 complete From Task 5: 1 From Task 5: 2 From Task 5: 3 From Task 3: 4 Task 3 complete From Task 2: 3 Press any key to exit From Task 5: 4 Task 5 complete From Task 1: 4 From Task 2: 4 Task 2 complete From Task 1: 5 Task 1 complete
如果查看输出,任务 4 完成的那一刻,线程将被分配给下一个任务,即任务 5,并且该任务开始执行。
广告