RxPY - 使用可观察数组



可观察数组是一个创建观察者并将其附加到预期会有值来源的函数,例如,点击、DOM 元素的鼠标事件等等。

本节具体讲述以下内容。

  • 创建可观察数组

  • 订阅并执行一个可观察数组

创建可观察数组

要创建一个可观察数组,我们将使用 create() 方法,并向其传递具有以下项的函数。

  • on_next() − 当可观察数组发出一个项时,将调用此函数。

  • on_completed() − 当可观察数组完成时,将调用此函数。

  • on_error() − 当可观察数组上发生错误时,将调用此函数。

若要使用 create() 方法,首先要按如下所示导入该方法 −

from rx import create

以下是一个创建可观察数组的示例 −

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_error("Error")
   observer.on_completed()
source = create(test_observable).

订阅并执行一个可观察数组

若要订阅一个可观察数组,我们需要使用 subscribe() 函数,并传递回调函数 on_next、on_error 和 on_completed。

以下是一个示例 −

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_completed()
source = create(test_observable)
source.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

subscribe() 方法负责执行可观察数组。回调函数 on_nexton_erroron_completed 必须传递给 subscribe 方法。依次调用 subscribe 方法执行 test_observable() 函数。

不必向 subscribe() 方法传递所有三个回调函数。你可以根据需要传递 on_next()、on_error() 和 on_completed()。

lambda 函数用于 on_next、on_error 和 on_completed。它将采用参数并执行给定表达式。

以下是创建的可观察数组的输出 −

E:\pyrx>python testrx.py
Got - Hello
Job Done!
广告
© . All rights reserved.