RxJS - 使用 Subjects
Subject 是一种可以多播(即与多个观察者通信)的 Observable。考虑一个带有事件监听器的按钮,使用 addListener 附加到事件的函数在用户每次点击按钮时都会被调用,Subject 的功能与此类似。
在本章中,我们将讨论以下主题:
- 创建 Subject
- Observable 和 Subject 之间的区别是什么?
- Behaviour Subject
- Replay Subject
- AsyncSubject
创建 Subject
要使用 Subject,我们需要导入 Subject,如下所示:
import { Subject } from 'rxjs';
您可以如下创建 Subject 对象:
const subject_test = new Subject();
该对象是一个 Observer,具有三个方法:
- next(v)
- error(e)
- complete()
订阅 Subject
您可以在 Subject 上创建多个订阅,如下所示:
subject_test.subscribe({
next: (v) => console.log(`From Subject : ${v}`)
});
subject_test.subscribe({
next: (v) => console.log(`From Subject: ${v}`)
});
订阅注册到 Subject 对象,就像我们之前讨论的 addListener 一样。
向 Subject 传递数据
您可以使用 next() 方法向创建的 Subject 传递数据。
subject_test.next("A");
数据将传递到添加到 Subject 上的所有订阅。
示例
这是一个 Subject 的工作示例:
import { Subject } from 'rxjs';
const subject_test = new Subject();
subject_test.subscribe({
next: (v) => console.log(`From Subject : ${v}`)
});
subject_test.subscribe({
next: (v) => console.log(`From Subject: ${v}`)
});
subject_test.next("A");
subject_test.next("B");
subject_test 对象是通过调用 new Subject() 创建的。subject_test 对象引用了 next()、error() 和 complete() 方法。上面示例的输出如下所示:
输出
我们可以使用 complete() 方法停止 Subject 的执行,如下所示。
示例
import { Subject } from 'rxjs';
const subject_test = new Subject();
subject_test.subscribe({
next: (v) => console.log(`From Subject : ${v}`)
});
subject_test.subscribe({
next: (v) => console.log(`From Subject: ${v}`)
});
subject_test.next("A");
subject_test.complete();
subject_test.next("B");
一旦我们调用 complete,稍后调用的 next 方法将不会被调用。
输出
现在让我们看看如何调用 error() 方法。
示例
下面是一个工作示例:
import { Subject } from 'rxjs';
const subject_test = new Subject();
subject_test.subscribe({
error: (e) => console.log(`From Subject : ${e}`)
});
subject_test.subscribe({
error: (e) => console.log(`From Subject : ${e}`)
});
subject_test.error(new Error("There is an error"));
输出
Observable 和 Subject 之间的区别是什么?
Observable 将一对一地与订阅者通信。每当您订阅 Observable 时,执行将从头开始。以使用 ajax 发出的 Http 调用为例,以及两个调用 Observable 的订阅者。您将在浏览器网络选项卡中看到两个 Http 请求。
示例
这是一个相同功能的工作示例:
import { ajax } from 'rxjs/ajax';
import { map } from 'rxjs/operators';
let final_val = ajax('https://jsonplaceholder.typicode.com/users').pipe(map(e => e.response));
let subscriber1 = final_val.subscribe(a => console.log(a));
let subscriber2 = final_val.subscribe(a => console.log(a));
输出
现在,问题在于,我们希望共享相同的数据,但不想为此付出发出两个 Http 调用的代价。我们希望发出一个 Http 调用并在订阅者之间共享数据。
这可以通过 Subjects 实现。它是一个可以多播(即与多个观察者通信)的 Observable。它可以在订阅者之间共享值。
示例
这是一个使用 Subjects 的工作示例:
import { Subject } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { map } from 'rxjs/operators';
const subject_test = new Subject();
subject_test.subscribe({
next: (v) => console.log(v)
});
subject_test.subscribe({
next: (v) => console.log(v)
});
let final_val = ajax('https://jsonplaceholder.typicode.com/users').pipe(map(e => e.response));
let subscriber = final_val.subscribe(subject_test);
输出
现在您可以看到只有一个 Http 调用,并且相同的数据在调用的订阅者之间共享。
Behaviour Subject
Behaviour Subject 会在调用时为您提供最新值。
您可以如下创建 Behaviour Subject:
import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject("Testing Behaviour Subject");
// initialized the behaviour subject with value:Testing Behaviour Subject
示例
这是一个使用 Behaviour Subject 的工作示例:
import { BehaviorSubject } from 'rxjs';
const behavior_subject = new BehaviorSubject("Testing Behaviour Subject");
// 0 is the initial value
behavior_subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
behavior_subject.next("Hello");
behavior_subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
behavior_subject.next("Last call to Behaviour Subject");
输出
Replay Subject
Replay Subject 类似于 Behaviour Subject,它可以缓冲值并将这些值重播给新的订阅者。
示例
这是一个 Replay Subject 的工作示例:
import { ReplaySubject } from 'rxjs';
const replay_subject = new ReplaySubject(2);
// buffer 2 values but new subscribers
replay_subject.subscribe({
next: (v) => console.log(`Testing Replay Subject A: ${v}`)
});
replay_subject.next(1);
replay_subject.next(2);
replay_subject.next(3);
replay_subject.subscribe({
next: (v) => console.log(`Testing Replay Subject B: ${v}`)
});
replay_subject.next(5);
Replay Subject 上使用的缓冲区值为 2。因此,最后两个值将被缓冲并用于调用的新订阅者。
输出
AsyncSubject
对于 AsyncSubject,最后一个调用的值将传递给订阅者,并且只有在调用 complete() 方法后才会执行此操作。
示例
这是一个相同功能的工作示例:
import { AsyncSubject } from 'rxjs';
const async_subject = new AsyncSubject();
async_subject.subscribe({
next: (v) => console.log(`Testing Async Subject A: ${v}`)
});
async_subject.next(1);
async_subject.next(2);
async_subject.complete();
async_subject.subscribe({
next: (v) => console.log(`Testing Async Subject B: ${v}`)
});
在这里,在调用 complete 之前,传递给 Subject 的最后一个值为 2,并且该值也传递给订阅者。
输出