RxPY 快速指南



RxPY - 概述

本章解释什么是响应式编程,什么是 RxPY,它的运算符、特性、优点和缺点。

什么是响应式编程?

响应式编程是一种编程范式,它处理数据流和变化的传播。这意味着,当一个组件发出数据流时,变化将通过响应式编程库传播到其他组件。变化的传播将持续到到达最终接收者。

使用 RxPY,您可以很好地控制异步数据流,例如,可以使用 Observable 追踪对 URL 的请求,并使用 Observer 监听请求何时完成以获取响应或错误。

RxPY 允许您使用**Observables** 处理异步数据流,使用**Operators**(例如 filter、sum、concat、map)查询数据流,并使用**Schedulers**利用数据流的并发性。创建一个 Observable 会得到一个带有 on_next(v)、on_error(e) 和 on_completed() 方法的 Observer 对象,需要对其进行**订阅**才能在事件发生时收到通知。

Observable

可以使用管道运算符以链式格式使用多个运算符查询 Observable。

RxPY 提供各种类别的运算符:−

  • 数学运算符

  • 转换运算符

  • 过滤运算符

  • 错误处理运算符

  • 实用程序运算符

  • 条件运算符

  • 创建运算符

  • 可连接运算符

本教程将详细解释这些运算符。

什么是 RxPy?

根据 RxPy 的官方网站 https://rxpy.readthedocs.io/en/latest/. 的定义,RxPY 被定义为**一个用于使用可观察集合和可管道查询运算符在 Python 中组合异步和基于事件的程序的库**。

RxPY 是一个支持响应式编程的 Python 库。RxPY 代表**Python 的响应式扩展**。它是一个使用 Observable 来处理响应式编程的库,该库处理异步数据调用、回调和基于事件的程序。

RxPy 的特性

在 RxPy 中,以下概念负责处理异步任务 −

Observable

Observable 是一个创建 Observer 并将其附加到数据源的函数,该数据源包含预期的的数据流,例如推文、计算机相关事件等。

Observer

它是一个具有 on_next()、on_error() 和 on_completed() 方法的对象,当与 Observable 交互时(例如,传入推文等),这些方法将被调用。

Subscription (订阅)

创建 Observable 后,需要订阅它才能执行。

Operators (运算符)

运算符是一个纯函数,它以 Observable 作为输入,输出也是一个 Observable。您可以使用管道运算符在 Observable 数据上使用多个运算符。

Subject

Subject 既是一个 Observable 序列,也是一个 Observer,可以进行多播,即与许多已订阅的 Observer 通信。Subject 是一个冷 Observable,即值将在已订阅的 Observer 之间共享。

Schedulers (调度器)

RxPy 的一个重要特性是并发性,即允许任务并行执行。为此,RxPy 有两个运算符 subscribe_on() 和 observe_on() 与调度器一起工作,并决定订阅任务的执行。

使用 RxPY 的优点

以下是 RxPy 的优点 −

  • 在处理异步数据流和事件方面,RxPY 是一个很棒的库。RxPY 使用 Observable 来处理响应式编程,该编程处理异步数据调用、回调和基于事件的程序。

  • RxPy 提供了大量的运算符,包括数学、转换、过滤、实用程序、条件、错误处理、连接类别,这使得在使用响应式编程时更容易。

  • 在 RxPY 中使用调度器可以实现并发,即多个任务一起工作。

  • 使用 RxPY 可以提高性能,因为异步任务和并行处理更容易实现。

使用 RxPY 的缺点

  • 使用 Observable 调试代码有点困难。

RxPY - 环境搭建

本章将介绍 RxPy 的安装。要开始使用 RxPy,首先需要安装 Python。因此,我们将进行以下操作 −

  • 安装 Python
  • 安装 RxPy

安装 Python

访问 Python 官方网站:https://pythonlang.cn/downloads/. 如下所示,点击适用于 Windows、Linux/Unix 和 macOS 的最新版本。根据您的 64 位或 32 位操作系统下载 Python。

Python

下载完成后,点击**.exe 文件**并按照步骤在您的系统上安装 python。

Python Install

python 包管理器 pip 也将随上述安装默认安装。为了使其在您的系统上全局工作,直接将 python 的位置添加到 PATH 变量中,安装开始时也会显示相同的内容,请记住选中“添加到 PATH”复选框。如果您忘记选中它,请按照以下步骤添加到 PATH。

要添加到 PATH,请按照以下步骤操作 −

右键单击您的计算机图标,然后单击属性→高级系统设置。

将显示如下屏幕 −

System Properties

单击上面显示的环境变量。将显示如下屏幕 −

Environment Variable

选择 Path 并单击“编辑”按钮,在末尾添加 python 的位置路径。现在,让我们检查 python 版本。

检查 python 版本

E:\pyrx>python --version
Python 3.7.3

安装 RxPY

现在,我们已经安装了 python,我们将安装 RxPy。

安装 python 后,python 包管理器 pip 也将安装。以下是检查 pip 版本的命令 −

E:\pyrx>pip --version
pip 19.1.1 from c:\users\xxxx\appdata\local\programs\python\python37\lib\site-
packages\pip (python 3.7)

我们已经安装了 pip,版本为**19.1.1**。现在,我们将使用 pip 安装 RxPy

命令如下 −

pip install rx
Pip Install Rx

RxPY - 最新版本更新

在本教程中,我们使用的是 RxPY 版本 3 和 python 版本 3.7.3。RxPY 版本 3 的工作方式与早期版本(即 RxPY 版本 1)略有不同。

本章将讨论这两个版本之间的区别以及在更新 Python 和 RxPY 版本时需要进行的更改。

RxPY 中的 Observable

在 RxPY 版本 1 中,Observable 是一个单独的类 −

from rx import Observable

要使用 Observable,您必须按如下方式使用它 −

Observable.of(1,2,3,4,5,6,7,8,9,10)

在 RxPY 版本 3 中,Observable 直接是 rx 包的一部分。

示例

import rx
rx.of(1,2,3,4,5,6,7,8,9,10)

RxPY 中的运算符

在版本 1 中,运算符是 Observable 类中的方法。例如,要使用运算符,我们必须导入 Observable,如下所示 −

from rx import Observable

运算符用作 Observable.operator,例如,如下所示 −

Observable.of(1,2,3,4,5,6,7,8,9,10)\
   .filter(lambda i: i %2 == 0) \
   .sum() \
   .subscribe(lambda x: print("Value is {0}".format(x)))

对于 RxPY 版本 3,运算符是函数,导入和使用方式如下 −

import rx
from rx import operators as ops
rx.of(1,2,3,4,5,6,7,8,9,10).pipe(
   ops.filter(lambda i: i %2 == 0),
   ops.sum()
).subscribe(lambda x: print("Value is {0}".format(x)))

使用 pipe() 方法链接运算符

在 RxPY 版本 1 中,如果您必须在 Observable 上使用多个运算符,则必须按如下方式进行 −

示例

from rx import Observable
Observable.of(1,2,3,4,5,6,7,8,9,10)\
   .filter(lambda i: i %2 == 0) \
   .sum() \
   .subscribe(lambda x: print("Value is {0}".format(x)))

但是,对于 RxPY 版本 3,您可以使用 pipe() 方法和多个运算符,如下所示 −

示例

import rx
from rx import operators as ops
rx.of(1,2,3,4,5,6,7,8,9,10).pipe(
   ops.filter(lambda i: i %2 == 0),
   ops.sum()
).subscribe(lambda x: print("Value is {0}".format(x)))

RxPY - 使用 Observables

Observable 是一个创建 Observer 并将其附加到预期值的源的函数,例如,来自 DOM 元素的点击、鼠标事件等。

本章将详细介绍以下主题。

  • 创建 Observables

  • 订阅和执行 Observable

创建 Observables

要创建一个 Observable,我们将使用**create()** 方法并将函数传递给它,该函数包含以下项目。

  • **on_next()** − 当 Observable 发出项目时,此函数将被调用。

  • **on_completed()** − 当 Observable 完成时,此函数将被调用。

  • **on_error()** − 当 Observable 发生错误时,此函数将被调用。

要使用 create() 方法,首先导入该方法,如下所示 −

from rx import create

这是一个创建 Observable 的工作示例 −

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_error("Error")
   observer.on_completed()
source = create(test_observable).

订阅和执行 Observable

要订阅 Observable,我们需要使用 subscribe() 函数并将回调函数 on_next、on_error 和 on_completed 传递给它。

这是一个工作示例 −

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_completed()
source = create(test_observable)
source.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

subscribe() 方法负责执行 Observable。回调函数**on_next**、**on_error** 和**on_completed** 必须传递给 subscribe 方法。对 subscribe 方法的调用反过来又会执行 test_observable() 函数。

并非必须将所有三个回调函数都传递给 subscribe() 方法。您可以根据您的需求传递 on_next()、on_error() 和 on_completed()。

lambda 函数用于 on_next、on_error 和 on_completed。它将接收参数并执行给定的表达式。

以下是创建的 Observable 的输出 −

E:\pyrx>python testrx.py
Got - Hello
Job Done!

RxPY - 运算符

本章详细解释了 RxPY 中的运算符。这些运算符包括 −

  • 使用运算符
  • 数学运算符
  • 转换运算符
  • 过滤运算符
  • 错误处理运算符
  • 实用程序运算符
  • 条件运算符
  • 创建运算符
  • 可连接运算符
  • 组合运算符

Reactive (Rx) python 几乎有很多运算符,这使得 python 编码更容易。您可以将这些多个运算符一起使用,例如,在处理字符串时,您可以使用 map、filter、merge 运算符。

使用运算符

您可以使用 pipe() 方法一起使用多个运算符。此方法允许将多个运算符链接在一起。

这是一个使用运算符的工作示例 −

test = of(1,2,3) // an observable
subscriber = test.pipe(
   op1(),
   op2(),
   op3()
)

在上例中,我们使用 `of()` 方法创建了一个 Observable,该方法接收值 1、2 和 3。现在,在这个 Observable 上,您可以使用任意数量的操作符执行不同的操作,如上所示使用 `pipe()` 方法。操作符的执行将在给定的 Observable 上顺序进行。

要使用操作符,首先需要导入它,如下所示:

from rx import of, operators as op

这是一个可运行的例子:

testrx.py

from rx import of, operators as op
test = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
sub1 = test.pipe(
   op.filter(lambda s: s%2==0),
   op.reduce(lambda acc, x: acc + x)
)
sub1.subscribe(lambda x: print("Sum of Even numbers is {0}".format(x)))

在上例中,有一个数字列表,我们使用 `filter` 操作符过滤偶数,然后使用 `reduce` 操作符对其进行求和。

输出

E:\pyrx>python testrx.py
Sum of Even numbers is 30

以下是一些我们将要讨论的操作符:

  • 创建 Observable
  • 数学运算符
  • 转换运算符
  • 过滤运算符
  • 错误处理运算符
  • 实用程序运算符
  • 条件操作符
  • 可连接操作符
  • 组合运算符

创建 Observable

以下是我们将在创建类别中讨论的 Observable:

显示示例

Observable 描述
create 此方法用于创建 Observable。
empty 此 Observable 不会输出任何内容,并直接发出完成状态。
never 此方法创建一个永远不会到达完成状态的 Observable。
throw 此方法将创建一个抛出错误的 Observable。
from_ 此方法将给定的数组或对象转换为 Observable。
interval 此方法将在超时后产生一系列值。
just 此方法将给定值转换为 Observable。
range 此方法将根据给定的输入生成一系列整数。
repeat_value 此方法将创建一个 Observable,根据给定的计数重复给定的值。
start 此方法接收一个函数作为输入,并返回一个 Observable,该 Observable 将返回输入函数的值。
timer 此方法将在超时后顺序发出值。

数学运算符

我们将在数学运算符类别中讨论的操作符如下:

显示示例

运算符 描述
average 此运算符将计算给定源 Observable 的平均值,并输出一个包含平均值的 Observable。
concat 此运算符将接收两个或多个 Observable,并生成一个包含所有值的单个 Observable。
count

此运算符接收一个包含值的 Observable,并将其转换为一个包含单个值的 Observable。count 函数可选地接收一个谓词函数。

该函数的类型为布尔值,只有在满足条件时才会将值添加到输出中。

max 此运算符将返回一个包含源 Observable 中最大值的 Observable。
min 此运算符将返回一个包含源 Observable 中最小值的 Observable。
reduce 此运算符接收一个名为累加器函数的函数,该函数用于处理来自源 Observable 的值,并以 Observable 的形式返回累加值,并可以选择向累加器函数传递种子值。
sum 此运算符将返回一个包含所有源 Observable 值之和的 Observable。

转换运算符

我们将要讨论的转换运算符类别中的运算符如下:

显示示例

运算符 类别
buffer 此运算符将收集来自源 Observable 的所有值,并在满足给定的边界条件后定期发出它们。
group_by 此运算符将根据给定的 key_mapper 函数对来自源 Observable 的值进行分组。
map 此运算符将根据给定的 mapper_func 的输出,将源 Observable 中的每个值更改为一个新值。
scan 此运算符将对来自源 Observable 的值应用累加器函数,并返回一个包含新值的新 Observable。

过滤运算符

我们将要讨论的过滤运算符类别中的运算符如下:

显示示例

运算符 类别
debounce 此运算符将发出来自源 Observable 的值,直到给定的时间跨度,并忽略其余时间段。
distinct 此运算符将发出所有与源 Observable 中不同的值。
element_at 此运算符将根据给定的索引发出源 Observable 中的一个元素。
filter 此运算符将根据给定的谓词函数过滤源 Observable 中的值。
first 此运算符将发出源 Observable 中的第一个元素。
ignore_elements 此运算符将忽略源 Observable 中的所有值,只执行对 complete 或 error 回调函数的调用。
last 此运算符将发出源 Observable 中的最后一个元素。
skip 此运算符将返回一个 Observable,该 Observable 将跳过作为输入的 count 个项目的第一次出现。
skip_last 此运算符将返回一个 Observable,该 Observable 将跳过作为输入的 count 个项目的最后一次出现。
take 此运算符将根据给定的计数返回连续顺序的源值列表。
take_last 此运算符将根据给定的计数从最后返回连续顺序的源值列表。

错误处理运算符

我们将要讨论的错误处理运算符类别中的运算符如下:

显示示例

运算符 描述
catch 当出现异常时,此运算符将终止源 Observable。
retry 当出现错误时,此运算符将重试源 Observable,并在重试次数完成后终止。

实用程序运算符

以下是我们将在实用程序运算符类别中讨论的操作符。

显示示例

运算符 描述
delay 此运算符将根据给定的时间或日期延迟源 Observable 的发射。
materialize 此运算符将源 Observable 中的值转换为以显式通知值的形式发出的值。
time_interval 此运算符将给出源 Observable 中的值之间经过的时间。
timeout 此运算符将在经过时间后发出源 Observable 中的所有值,否则将触发错误。
timestamp 此运算符将时间戳附加到源 Observable 中的所有值。

条件和布尔运算符

我们将要讨论的条件和布尔运算符类别中的运算符如下:

显示示例

运算符 描述
all 此运算符将检查源 Observable 中的所有值是否都满足给定的条件。
contains 如果给定值存在且是源 Observable 的值,则此运算符将返回一个值为 true 或 false 的 Observable。
default_if_empty 如果源 Observable 为空,则此运算符将返回一个默认值。
sequence_equal 此运算符将比较两个 Observable 序列或一个值数组,并返回一个值为 true 或 false 的 Observable。
skip_until 此运算符将丢弃源 Observable 中的值,直到第二个 Observable 发出一个值。
skip_while 此运算符将返回一个包含满足所传递条件的源 Observable 中值的 Observable。
take_until 此运算符将在第二个 Observable 发出一个值或终止后丢弃源 Observable 中的值。
take_while 此运算符将在条件失败时丢弃源 Observable 中的值。

可连接运算符

我们将要讨论的可连接运算符类别中的运算符如下:

显示示例

运算符 描述
publish 此方法将 Observable 转换为可连接的 Observable。
ref_count 此运算符将 Observable 转换为普通的 Observable。
replay 此方法类似于 replaySubject。即使 Observable 已经发出值,并且一些订阅者订阅较晚,此方法也会返回相同的值。

组合运算符

以下是我们将在组合运算符类别中讨论的操作符。

显示示例

运算符 描述
combine_latest 此运算符将为给定的输入 Observable 创建一个元组。
merge 此运算符将合并给定的 Observable。
start_with 此运算符将接收给定的值,并将其添加到源 Observable 的开头,然后返回完整的序列。
zip 此运算符返回一个包含元组形式值的 Observable,该元组是通过获取给定 Observable 的第一个值等等形成的。

RxPY - 使用 Subject

Subject 是一个 Observable 序列,也是一个可以多播(即与许多已订阅的观察者对话)的观察者。

我们将讨论以下关于 Subject 的主题:

  • 创建 Subject
  • 订阅 Subject
  • 向 Subject 传递数据
  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

创建 Subject

要使用 Subject,我们需要导入 Subject,如下所示:

from rx.subject import Subject

您可以如下创建 Subject 对象:

subject_test = Subject()

该对象是一个观察者,具有三种方法:

  • on_next(value)
  • on_error(error) 和
  • on_completed()

订阅 Subject

您可以在 Subject 上创建多个订阅,如下所示:

subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)

向 Subject 传递数据

您可以使用 on_next(value) 方法向创建的 Subject 传递数据,如下所示:

subject_test.on_next("A")
subject_test.on_next("B")

数据将传递给添加到 Subject 的所有订阅。

这是一个 Subject 的可运行示例。

示例

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")

subject_test 对象是通过调用 Subject() 创建的。subject_test 对象引用 on_next(value)、on_error(error) 和 on_completed() 方法。上例的输出如下所示:

输出

E:\pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B

我们可以使用 on_completed() 方法停止 Subject 的执行,如下所示。

示例

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")

一旦我们调用 complete,稍后调用的 next 方法将不会被调用。

输出

E:\pyrx>python testrx.py
The value is A
The value is A

现在让我们看看如何调用 on_error(error) 方法。

示例

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.on_error(Exception('There is an Error!'))

输出

E:\pyrx>python testrx.py
Error: There is an Error!
Error: There is an Error!

BehaviorSubject

BehaviorSubject 在调用时会提供最新的值。您可以如下创建 BehaviorSubject:

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject

这是一个使用 BehaviorSubject 的可运行示例

示例

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
   lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
   lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")

输出

E:\pyrx>python testrx.py
Observer A : Testing Behaviour Subject
Observer A : Hello
Observer B : Hello
Observer A : Last call to Behaviour Subject
Observer B : Last call to Behaviour Subject

Replay Subject

ReplaySubject 类似于 BehaviorSubject,它可以缓冲值并将它们重播给新的订阅者。这是一个 ReplaySubject 的可运行示例。

示例

from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)

ReplaySubject 上使用的缓冲区值为 2。因此,最后两个值将被缓冲并用于调用的新订阅者。

输出

E:\pyrx>python testrx.py
Testing Replay Subject A: 1
Testing Replay Subject A: 2
Testing Replay Subject A: 3
Testing Replay Subject B: 2
Testing Replay Subject B: 3
Testing Replay Subject A: 5
Testing Replay Subject B: 5

AsyncSubject

在 AsyncSubject 的情况下,最后一个调用的值将传递给订阅者,并且只有在调用 complete() 方法后才会完成。

示例

from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.

输出

E:\pyrx>python testrx.py
Testing Async Subject A: 2
Testing Async Subject B: 2

RxPY - 使用 Scheduler 实现并发

RxPy的一个重要特性是并发性,即允许任务并行执行。为此,我们有两个操作符`subscribe_on()`和`observe_on()`,它们将与调度器一起工作,调度器将决定订阅任务的执行。

这是一个有效的例子,它展示了`subscribe_on()`、`observe_on()`和调度器的必要性。

示例

import random
import time
import rx
from rx import operators as ops
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a))
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a))
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
) 
input("Press any key to exit\n")

在上面的例子中,我有两个任务:任务1和任务2。任务的执行是顺序的。只有当第一个任务完成后,第二个任务才会开始。

输出

E:\pyrx>python testrx.py
From Task 1: 1
From Task 1: 2
From Task 1: 3
From Task 1: 4
From Task 1: 5
Task 1 complete
From Task 2: 1
From Task 2: 2
From Task 2: 3
From Task 2: 4
Task 2 complete

RxPy支持许多调度器,这里我们将使用`ThreadPoolScheduler`。`ThreadPoolScheduler`主要尝试管理可用的CPU线程。

在前面看到的例子中,我们将使用一个多处理模块,它将给我们提供`cpu_count`。这个计数将被提供给`ThreadPoolScheduler`,它将设法根据可用的线程来管理并行执行的任务。

这是一个可运行的例子:

import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
)
input("Press any key to exit\n")

在上面的例子中,我有两个任务,`cpu_count`是4。由于任务是2个,而我们可用的线程是4个,所以这两个任务可以并行启动。

输出

E:\pyrx>python testrx.py
Cpu count is : 4
Press any key to exit
From Task 1: 1
From Task 2: 1
From Task 1: 2
From Task 2: 2
From Task 2: 3
From Task 1: 3
From Task 2: 4
Task 2 complete
From Task 1: 4
From Task 1: 5
Task 1 complete

如果你看到输出,这两个任务已经并行启动了。

现在,考虑一种情况,任务数量超过CPU数量,例如CPU数量是4,任务是5个。在这种情况下,我们需要检查是否有任何线程在任务完成后空闲,以便可以将其分配给队列中可用的新任务。

为此,我们可以使用`observe_on()`操作符,它将观察调度器是否有任何线程空闲。这是一个使用`observe_on()`的有效例子。

示例

import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
)
#Task 3
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 3: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 3 complete")
)
#Task 4
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 4: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 4 complete")
)
#Task 5
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.observe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 5: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 5 complete")
)
input("Press any key to exit\n")

输出

E:\pyrx>python testrx.py
Cpu count is : 4
From Task 4: 1
From Task 4: 2
From Task 1: 1
From Task 2: 1
From Task 3: 1
From Task 1: 2
From Task 3: 2
From Task 4: 3
From Task 3: 3
From Task 2: 2
From Task 1: 3
From Task 4: 4
Task 4 complete
From Task 5: 1
From Task 5: 2
From Task 5: 3
From Task 3: 4
Task 3 complete
From Task 2: 3
Press any key to exit
From Task 5: 4
Task 5 complete
From Task 1: 4
From Task 2: 4
Task 2 complete
From Task 1: 5
Task 1 complete

如果你看到输出,任务4完成后,线程将被分配给下一个任务,即任务5,并且它开始执行。

RxPY - 例子

本章将详细讨论以下主题:

  • 展示observable、操作符和订阅观察者的基本示例。
  • observable和subject的区别。
  • 理解冷observable和热observable。

下面是一个基本示例,展示了observable、操作符和订阅观察者的工作方式。

示例

test.py

import requests
import rx
import json
from rx import operators as ops
def filternames(x):
   if (x["name"].startswith("C")):
      return x["name"]
   else :
      return ""
content = requests.get('https://jsonplaceholder.typicode.com/users')
y = json.loads(content.text)
source = rx.from_(y)
case1 = source.pipe(
   ops.filter(lambda c: filternames(c)),
   ops.map(lambda a:a["name"])
)
case1.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)), 8. RxPy — Examples
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

这是一个非常简单的例子,我从这个URL获取用户数据:

https://jsonplaceholder.typicode.com/users.

过滤数据,只显示名称以“C”开头的用户,然后使用map函数只返回名称。以下是输出:

E:\pyrx\examples>python test.py
Got - Clementine Bauch
Got - Chelsey Dietrich
Got - Clementina DuBuque
Job Done!

observable和subject的区别

在这个例子中,我们将看到observable和subject的区别。

from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))

输出

E:\pyrx>python testrx.py
From first subscriber
From sub1 1.610450821095726
From sub1 2.9567564032037335
From sub1 3.933217537811936
From sub1 4.82444905626622
From sub1 5.929414892567188
From second subscriber
From sub2 1.8573813517529874
From sub2 2.902433239469483
From sub2 3.2289868093016825
From sub2 4.050413890694411
From sub2 5.226515068012821

在上面的例子中,每次订阅observable时,它都会给你新的值。

Subject示例

from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)

输出

E:\pyrx>python testrx.py
From sub1 1.1789422863284509
From sub2 1.1789422863284509
From sub1 2.5525627903260153
From sub2 2.5525627903260153
From sub1 3.4191549324778325
From sub2 3.4191549324778325
From sub1 4.644042420199624
From sub2 4.644042420199624
From sub1 5.079896897489065
From sub2 5.079896897489065

如果你看到,值在使用subject的两个订阅者之间共享。

理解冷observable和热observable

observable被分类为

  • 冷observable
  • 热observable

当多个订阅者订阅时,observable的区别就会显现出来。

冷observable

冷observable是在每次订阅时执行并渲染数据的observable。当它被订阅时,observable被执行,并给出新的值。

下面的例子说明了冷observable。

from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))

输出

E:\pyrx>python testrx.py
From first subscriber
From sub1 1.610450821095726
From sub1 2.9567564032037335
From sub1 3.933217537811936
From sub1 4.82444905626622
From sub1 5.929414892567188
From second subscriber
From sub2 1.8573813517529874
From sub2 2.902433239469483
From sub2 3.2289868093016825
From sub2 4.050413890694411
From sub2 5.226515068012821

在上面的例子中,每次订阅observable时,它都会执行observable并发出值。正如上面例子所示,这些值在不同的订阅者之间也可能不同。

热observable

对于热observable,它们会在准备好时发出值,并且不会总是等待订阅。当值被发出时,所有订阅者都会收到相同的值。

当你想在observable准备好时发出值,或者你想将相同的值共享给所有订阅者时,可以使用热observable。

热observable的例子是Subject和可连接操作符。

from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)

输出

E:\pyrx>python testrx.py
From sub1 1.1789422863284509
From sub2 1.1789422863284509
From sub1 2.5525627903260153
From sub2 2.5525627903260153
From sub1 3.4191549324778325
From sub2 3.4191549324778325
From sub1 4.644042420199624
From sub2 4.644042420199624
From sub1 5.079896897489065
From sub2 5.079896897489065

如果你看到,相同的值在订阅者之间共享。你可以使用`publish()`可连接observable操作符来实现相同的功能。

广告
© . All rights reserved.