RxJS - 使用 Subjects
Subject 是一种可以多播(即与多个观察者通信)的 Observable。考虑一个带有事件监听器的按钮,使用 addListener 附加到事件的函数在用户每次点击按钮时都会被调用,Subject 的功能与此类似。
在本章中,我们将讨论以下主题:
- 创建 Subject
- Observable 和 Subject 之间的区别是什么?
- Behaviour Subject
- Replay Subject
- AsyncSubject
创建 Subject
要使用 Subject,我们需要导入 Subject,如下所示:
import { Subject } from 'rxjs';
您可以如下创建 Subject 对象:
const subject_test = new Subject();
该对象是一个 Observer,具有三个方法:
- next(v)
- error(e)
- complete()
订阅 Subject
您可以在 Subject 上创建多个订阅,如下所示:
subject_test.subscribe({ next: (v) => console.log(`From Subject : ${v}`) }); subject_test.subscribe({ next: (v) => console.log(`From Subject: ${v}`) });
订阅注册到 Subject 对象,就像我们之前讨论的 addListener 一样。
向 Subject 传递数据
您可以使用 next() 方法向创建的 Subject 传递数据。
subject_test.next("A");
数据将传递到添加到 Subject 上的所有订阅。
示例
这是一个 Subject 的工作示例:
import { Subject } from 'rxjs'; const subject_test = new Subject(); subject_test.subscribe({ next: (v) => console.log(`From Subject : ${v}`) }); subject_test.subscribe({ next: (v) => console.log(`From Subject: ${v}`) }); subject_test.next("A"); subject_test.next("B");
subject_test 对象是通过调用 new Subject() 创建的。subject_test 对象引用了 next()、error() 和 complete() 方法。上面示例的输出如下所示:
输出
我们可以使用 complete() 方法停止 Subject 的执行,如下所示。
示例
import { Subject } from 'rxjs'; const subject_test = new Subject(); subject_test.subscribe({ next: (v) => console.log(`From Subject : ${v}`) }); subject_test.subscribe({ next: (v) => console.log(`From Subject: ${v}`) }); subject_test.next("A"); subject_test.complete(); subject_test.next("B");
一旦我们调用 complete,稍后调用的 next 方法将不会被调用。
输出
现在让我们看看如何调用 error() 方法。
示例
下面是一个工作示例:
import { Subject } from 'rxjs'; const subject_test = new Subject(); subject_test.subscribe({ error: (e) => console.log(`From Subject : ${e}`) }); subject_test.subscribe({ error: (e) => console.log(`From Subject : ${e}`) }); subject_test.error(new Error("There is an error"));
输出
Observable 和 Subject 之间的区别是什么?
Observable 将一对一地与订阅者通信。每当您订阅 Observable 时,执行将从头开始。以使用 ajax 发出的 Http 调用为例,以及两个调用 Observable 的订阅者。您将在浏览器网络选项卡中看到两个 Http 请求。
示例
这是一个相同功能的工作示例:
import { ajax } from 'rxjs/ajax'; import { map } from 'rxjs/operators'; let final_val = ajax('https://jsonplaceholder.typicode.com/users').pipe(map(e => e.response)); let subscriber1 = final_val.subscribe(a => console.log(a)); let subscriber2 = final_val.subscribe(a => console.log(a));
输出
现在,问题在于,我们希望共享相同的数据,但不想为此付出发出两个 Http 调用的代价。我们希望发出一个 Http 调用并在订阅者之间共享数据。
这可以通过 Subjects 实现。它是一个可以多播(即与多个观察者通信)的 Observable。它可以在订阅者之间共享值。
示例
这是一个使用 Subjects 的工作示例:
import { Subject } from 'rxjs'; import { ajax } from 'rxjs/ajax'; import { map } from 'rxjs/operators'; const subject_test = new Subject(); subject_test.subscribe({ next: (v) => console.log(v) }); subject_test.subscribe({ next: (v) => console.log(v) }); let final_val = ajax('https://jsonplaceholder.typicode.com/users').pipe(map(e => e.response)); let subscriber = final_val.subscribe(subject_test);
输出
现在您可以看到只有一个 Http 调用,并且相同的数据在调用的订阅者之间共享。
Behaviour Subject
Behaviour Subject 会在调用时为您提供最新值。
您可以如下创建 Behaviour Subject:
import { BehaviorSubject } from 'rxjs'; const subject = new BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject
示例
这是一个使用 Behaviour Subject 的工作示例:
import { BehaviorSubject } from 'rxjs'; const behavior_subject = new BehaviorSubject("Testing Behaviour Subject"); // 0 is the initial value behavior_subject.subscribe({ next: (v) => console.log(`observerA: ${v}`) }); behavior_subject.next("Hello"); behavior_subject.subscribe({ next: (v) => console.log(`observerB: ${v}`) }); behavior_subject.next("Last call to Behaviour Subject");
输出
Replay Subject
Replay Subject 类似于 Behaviour Subject,它可以缓冲值并将这些值重播给新的订阅者。
示例
这是一个 Replay Subject 的工作示例:
import { ReplaySubject } from 'rxjs'; const replay_subject = new ReplaySubject(2); // buffer 2 values but new subscribers replay_subject.subscribe({ next: (v) => console.log(`Testing Replay Subject A: ${v}`) }); replay_subject.next(1); replay_subject.next(2); replay_subject.next(3); replay_subject.subscribe({ next: (v) => console.log(`Testing Replay Subject B: ${v}`) }); replay_subject.next(5);
Replay Subject 上使用的缓冲区值为 2。因此,最后两个值将被缓冲并用于调用的新订阅者。
输出
AsyncSubject
对于 AsyncSubject,最后一个调用的值将传递给订阅者,并且只有在调用 complete() 方法后才会执行此操作。
示例
这是一个相同功能的工作示例:
import { AsyncSubject } from 'rxjs'; const async_subject = new AsyncSubject(); async_subject.subscribe({ next: (v) => console.log(`Testing Async Subject A: ${v}`) }); async_subject.next(1); async_subject.next(2); async_subject.complete(); async_subject.subscribe({ next: (v) => console.log(`Testing Async Subject B: ${v}`) });
在这里,在调用 complete 之前,传递给 Subject 的最后一个值为 2,并且该值也传递给订阅者。