RxPY - 使用 Subject
Subject 既是一个可观察序列,也是一个可以多播(即与已订阅的多个观察者通信)的观察者。
我们将讨论以下关于 Subject 的主题:
- 创建 Subject
- 订阅 Subject
- 向 Subject 传递数据
- BehaviorSubject
- ReplaySubject
- AsyncSubject
创建 Subject
要使用 Subject,我们需要导入 Subject,如下所示:
from rx.subject import Subject
您可以如下创建 Subject 对象:
subject_test = Subject()
该对象是一个观察者,具有三种方法:
- on_next(value)
- on_error(error) 和
- on_completed()
订阅 Subject
您可以在 Subject 上创建多个订阅,如下所示:
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
向 Subject 传递数据
您可以使用 on_next(value) 方法向创建的 Subject 传递数据,如下所示:
subject_test.on_next("A")
subject_test.on_next("B")
数据将传递给添加到 Subject 上的所有订阅。
这是一个 Subject 的工作示例。
示例
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")
subject_test 对象通过调用 Subject() 创建。subject_test 对象引用了 on_next(value)、on_error(error) 和 on_completed() 方法。以上示例的输出如下所示:
输出
E:\pyrx>python testrx.py The value is A The value is A The value is B The value is B
我们可以使用 on_completed() 方法停止 Subject 执行,如下所示。
示例
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")
一旦我们调用 complete,稍后调用的 next 方法将不会被调用。
输出
E:\pyrx>python testrx.py The value is A The value is A
现在让我们看看如何调用 on_error(error) 方法。
示例
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.subscribe(
on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.on_error(Exception('There is an Error!'))
输出
E:\pyrx>python testrx.py Error: There is an Error! Error: There is an Error!
BehaviorSubject
BehaviorSubject 在被调用时会提供最新的值。您可以如下创建 BehaviorSubject:
from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject
这是一个使用 BehaviorSubject 的工作示例
示例
from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")
输出
E:\pyrx>python testrx.py Observer A : Testing Behaviour Subject Observer A : Hello Observer B : Hello Observer A : Last call to Behaviour Subject Observer B : Last call to Behaviour Subject
Replay Subject
ReplaySubject 类似于 BehaviorSubject,它可以缓冲值并将这些值重播给新的订阅者。这是一个 ReplaySubject 的工作示例。
示例
from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)
ReplaySubject 上使用的缓冲区值为 2。因此,最后两个值将被缓冲并用于调用的新订阅者。
输出
E:\pyrx>python testrx.py Testing Replay Subject A: 1 Testing Replay Subject A: 2 Testing Replay Subject A: 3 Testing Replay Subject B: 2 Testing Replay Subject B: 3 Testing Replay Subject A: 5 Testing Replay Subject B: 5
AsyncSubject
在 AsyncSubject 的情况下,最后一个调用的值将传递给订阅者,并且只有在调用 complete() 方法后才会执行此操作。
示例
from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.
输出
E:\pyrx>python testrx.py Testing Async Subject A: 2 Testing Async Subject B: 2