RxJS - 快速指南



RxJS - 概述

本章介绍了关于 RxJS 的功能、优点和缺点的信息。在这里,我们还将学习何时使用 RxJS。

RxJS 的全称是 Reactive Extension for Javascript。它是一个 JavaScript 库,使用 Observables 来处理响应式编程,处理异步数据调用、回调和基于事件的程序。RxJS 可以与其他 JavaScript 库和框架一起使用。它支持 JavaScript,也支持 TypeScript。

什么是 RxJS?

根据 RxJS 官方网站的定义,它是一个用于使用可观察序列组合异步和基于事件的程序的库。它提供了一个核心类型 Observable、卫星类型(Observer、Schedulers、Subjects)和受 Array#extras(map、filter、reduce、every 等)启发的操作符,以便将异步事件作为集合进行处理。

RxJS 的特性

在 RxJS 中,以下概念负责处理异步任务:

Observable

Observable 是一个函数,它创建一个 Observer 并将其附加到期望获取值的源,例如,DOM 元素的点击、鼠标事件或 HTTP 请求等。

Observer

它是一个包含 next()、error() 和 complete() 方法的对象,当与 Observable 交互时,这些方法将被调用,例如,按钮点击、HTTP 请求等。

Subscription

创建 Observable 后,要执行 Observable,我们需要订阅它。它还可以用于取消执行。

操作符

操作符是一个纯函数,它接收 Observable 作为输入,输出也是一个 Observable。

Subject

Subject 是一个可以多播(即与多个 Observer 通信)的 Observable。考虑一个带有事件监听器的按钮,使用 addListener 附加到事件的函数在用户每次点击按钮时都会被调用,Subject 的功能与此类似。

Schedulers

调度器控制订阅何时开始和通知。

何时使用 RxJS?

如果您的项目包含大量异步任务处理,那么 RxJS 是一个不错的选择。它默认情况下与 Angular 项目一起加载。

使用 RxJS 的优点

以下是使用 RxJS 的优点:

  • RxJS 可以与其他 JavaScript 库和框架一起使用。它支持 JavaScript,也支持 TypeScript。一些示例包括 Angular、ReactJS、Vuejs、nodejs 等。

  • 在处理异步任务方面,RxJS 是一个很棒的库。RxJS 使用 Observables 来处理响应式编程,处理异步数据调用、回调和基于事件的程序。

  • RxJS 提供了大量操作符,包括数学、转换、过滤、实用程序、条件、错误处理、连接等类别,这使得在使用响应式编程时生活变得更加轻松。

使用 RxJS 的缺点

以下是使用 RxJS 的缺点:

  • 使用 Observables 调试代码有点困难。

  • 当您开始使用 Observables 时,最终可能会将您的整个代码都包装在 Observables 中。

RxJS - 环境搭建

在本章中,我们将安装 RxJS。要使用 RxJS,我们需要以下设置:

  • NodeJS
  • Npm
  • RxJS 包安装

NODEJS 和 NPM 安装

使用 npm 安装 RxJS 非常容易。您需要在系统上安装 nodejs 和 npm。要验证 NodeJS 和 npm 是否已安装在您的系统上,请尝试在命令提示符中执行以下命令。

E:\>node -v && npm -v
v10.15.1
6.4.1

如果您获取了版本号,则表示 nodejs 和 npm 已安装在您的系统上,并且系统上的当前版本分别为 10 和 6。

如果它没有打印任何内容,请在您的系统上安装 nodejs。要安装 nodejs,请访问 nodejs 的主页 https://node.org.cn/en/download/ 并根据您的操作系统安装软件包。

nodejs 的下载页面如下所示:

NodeJS

根据您的操作系统,安装所需的软件包。安装 nodejs 后,npm 也会随之安装。要检查 npm 是否已安装,请在终端中键入 npm –v。它应该显示 npm 的版本。

RxJS 包安装

要开始 RxJS 安装,首先创建一个名为 rxjsproj/ 的文件夹,我们将在其中练习所有 RxJS 示例。

创建 rxjsproj/ 文件夹后,运行命令 npm init,进行项目设置,如下所示

E:\>mkdir rxjsproj
E:\>cd rxjsproj
E:\rxjsproj>npm init

Npm init 命令在执行期间会询问一些问题,只需按 Enter 键并继续即可。npm init 执行完成后,它将在 rxjsproj/ 中创建 package.json,如下所示:

rxjsproj/
   package.json

现在您可以使用以下命令安装 rxjs:

npm install ---save-dev rxjs

E:\rxjsproj>npm install --save-dev rxjs
npm notice created a lockfile as package-lock.json. You should commit this file.

npm WARN [email protected] No description
npm WARN [email protected] No repository field.

+ [email protected]
added 2 packages from 7 contributors and audited 2 packages in 21.89s
found 0 vulnerabilities

我们完成了 RxJS 的安装。现在让我们尝试使用 RxJS,为此,在 rxjsproj/ 中创建一个名为 src/ 的文件夹

因此,现在我们的文件夹结构如下所示:

rxjsproj/
   node_modules/
   src/
   package.json

src/ 中创建一个名为 testrx.js 的文件,并编写以下代码:

testrx.js

import { of } from 'rxjs;
import { map } from 'rxjs/operators';

map(x => x * x)(of(1, 2, 3)).subscribe((v) => console.log(`Output is: ${v}`));

当我们使用命令 node testrx.js 在命令提示符中执行上述代码时,它将显示导入错误,因为 nodejs 不知道如何处理导入。

要使导入与 nodejs 一起工作,我们需要使用 npm 安装 ES6 模块包,如下所示:

E:\rxjsproj\src>npm install --save-dev esm
npm WARN [email protected] No description
npm WARN [email protected] No repository field.

+ [email protected]
added 1 package from 1 contributor and audited 3 packages in 9.32s
found 0 vulnerabilities

安装软件包后,我们现在可以执行 testrx.js 文件,如下所示:

E:\rxjsproj\src>node -r esm testrx.js
Output is: 1
Output is: 4
Output is: 9

现在我们可以看到输出,它显示 RxJS 已安装并可以使用。上述方法将帮助我们在命令行中测试 RxJS。如果您想在浏览器中测试 RxJS,我们将需要一些额外的软件包。

在浏览器中测试 RxJS

在 rxjsproj/ 文件夹中安装以下软件包:

npm install --save-dev babel-loader @babel/core @babel/preset-env webpack webpack-cli webpack-dev-server

E:\rxjsproj>npm install --save-dev babel-loader 
@babel/core @babel/preset-env webpack webpack-cli webpack-dev-server

npm WARN [email protected] No description
npm WARN [email protected] No repository field.
npm WARN optional SKIPPING OPTIONAL DEPENDENCY: [email protected]
(node_modules\fsevents):
npm WARN notsup SKIPPING OPTIONAL DEPENDENCY: Unsupported platform for fsevents@
1.2.9: wanted {"os":"darwin","arch":"any"} (current: {"os":"win32","arch":"x64"})

+ [email protected]
+ [email protected]
+ @babel/[email protected]
+ @babel/[email protected]
+ [email protected]
+ [email protected]
added 675 packages from 373 contributors and audited 10225 packages in 255.567s
found 0 vulnerabilities

要启动服务器以执行我们的 Html 文件,我们将使用 webpack-server。package.json 中的 "publish" 命令将帮助我们启动并使用 webpack 打包所有 js 文件。最终用于的打包后的 js 文件保存在路径 /dev 文件夹中。

要使用 webpack,我们需要运行 npm run publish 命令,该命令已添加到 package.json 中,如下所示:

Package.json

{
   "name": "rxjsproj",
   "version": "1.0.0",
   "description": "",
   "main": "index.js",
   "scripts": {
      "publish":"webpack && webpack-dev-server --output-public=/dev/",
      "test": "echo \"Error: no test specified\" && exit 1"
   },
   "author": "",
   "license": "ISC",
   "devDependencies": {
      "@babel/core": "^7.6.0",
      "@babel/preset-env": "^7.6.0",
      "babel-loader": "^8.0.6",
      "esm": "^3.2.25",
      "rxjs": "^6.5.3",
      "webpack": "^4.39.3",
      "webpack-cli": "^3.3.8",
      "webpack-dev-server": "^3.8.0"
   }
}

要使用 webpack,我们必须首先创建一个名为 webpack.config.js 的文件,其中包含 webpack 工作的配置详细信息。

文件中的详细信息如下:

var path = require('path');

module.exports = {
   entry: {
      app: './src/testrx.js'
   },
   output: {
      path: path.resolve(__dirname, 'dev'),
      filename: 'main_bundle.js'
   },
   mode:'development',
   module: {
      rules: [
         {
            test:/\.(js)$/,
            include: path.resolve(__dirname, 'src'),
            loader: 'babel-loader',
            query: {
               presets: ['@babel/preset-env']
            }
         }
      ]
   }
};

文件的结构如上所示。它以一个提供当前路径详细信息的路径开头。

var path = require('path'); //gives the current path

接下来是 module.exports 对象,它具有 entry、output 和 module 属性。Entry 是起点。在这里,我们需要提供我们想要编译的起始 js 文件。

entry: {
   app: './src/testrx.js'
},

path.resolve(_dirname, ‘src/testrx.js’) -- 将在目录中查找 src 文件夹,并在该文件夹中查找 testrx.js。

Output

output: {
   path: path.resolve(__dirname, 'dev'),
   filename: 'main_bundle.js'
},

输出是一个包含 path 和 filename 属性的对象。path 将保存编译文件将保存在其中的文件夹,filename 将告诉最终文件在您的 .html 文件中使用的名称。

Module

module: {
   rules: [
      {
         test:/\.(js)$/,
         include: path.resolve(__dirname, 'src'),
         loader: 'babel-loader',
         query: {
            presets: ['@babel/preset-env']
         }
      }
   ]
}

Module 是一个包含 rules 属性的对象,该属性具有 test、include、loader、query 属性。test 将保存所有以 .js 和 .jsx 结尾的 js 文件的详细信息。它具有将在给定的入口点中查找 .js 结尾的模式。

Include 指示要用于查找文件的文件夹。

Loader 使用 babel-loader 编译代码。

Query 具有 presets 属性,它是一个数组,其值为 '@babel/preset-env'。它将根据您需要的 ES 环境转换代码。

最终的文件夹结构如下所示:

rxjsproj/
   node_modules/
   src/
      testrx.js
   index.html
   package.json
   webpack.config.js

运行命令

npm run publish 将在其中创建 dev/ 文件夹和 main_bundle.js 文件。服务器将启动,您可以在浏览器中测试您的 index.html,如下所示。

Run Command

打开浏览器并访问 url: https://127.0.0.1:8080/

Main Bundle

输出显示在控制台中。

RxJS - 最新更新

在本教程中,我们使用的是 RxJS 版本 6。RxJS 通常用于处理响应式编程,并且更常与 Angular、ReactJS 一起使用。Angular 6 默认加载 rxjs6。

与版本 6 相比,RxJS 版本 5 的处理方式有所不同。如果您将 RxJS 5 更新到 6,代码将中断。在本章中,我们将了解处理版本更新的不同方法。

如果您正在将 RxJS 更新到 6 并且不想更改代码,您也可以这样做,并且需要安装以下软件包。

npm install --save-dev rxjs-compact

此软件包将负责提供向后兼容性,旧代码将可以与 RxJS 版本 6 正常工作。如果您想更改与 RxJS 6 兼容的代码,以下是要进行的更改。

操作符、Observables、主题的软件包已重新构建,因此导入方面的主要更改如下所述。

操作符导入

根据版本 5,对于操作符,应包含以下导入语句:

import 'rxjs/add/operator/mapTo'
import 'rxjs/add/operator/take'
import 'rxjs/add/operator/tap'
import 'rxjs/add/operator/map'

在 RxJS 版本 6 中,导入将如下所示:

import {mapTo, take, tap, map} from "rxjs/operators"

创建 Observables 的方法导入

根据版本 5,在使用 Observables 时,应包含以下导入方法:

import "rxjs/add/observable/from";
import "rxjs/add/observable/of";
import "rxjs/add/observable/fromEvent";
import "rxjs/add/observable/interval";

在 RxJS 版本 6 中,导入将如下所示:

import {from, of, fromEvent, interval} from 'rxjs';

Observables 导入

在 RxJS 版本 5 中,在使用 Observables 时,应包含以下导入语句:

import { Observable } from 'rxjs/Observable'

在 RxJS 版本 6 中,导入将如下所示:

import { Observable } from 'rxjs'

Subject 导入

在 RxJS 版本 5 中,Subject 应包含如下所示:

import { Subject} from 'rxjs/Subject'

在 RxJS 版本 6 中,导入将如下所示:

import { Subject } from 'rxjs'

如何在 RxJS 6 中使用操作符?

pipe() 方法 可用于创建的 Observable。它从版本 5.5 开始添加到 RxJS 中。现在使用 pipe(),您可以按顺序一起处理多个操作符。以下是 RxJS 版本 5 中操作符的使用方式。

示例

import "rxjs/add/observable/from";
import 'rxjs/add/operator/max'

let list1 = [1, 6, 15, 10, 58, 2, 40];
from(list1).max((a,b)=>a-b).subscribe(x => console.log("The Max value is "+x));

从 RxJS 版本 5.5 开始,我们必须使用 pipe() 来执行操作符:

示例

import { from } from 'rxjs';
import { max } from 'rxjs/operators';

from(list1).pipe(max((a,b)=>a-b)).subscribe(x => console.log(
   "The Max value is "+x)
);

操作符重命名

在软件包重构期间,一些操作符被重命名,因为它们与 JavaScript 关键字冲突或匹配。列表如下所示:

操作符 重命名为
do() tap()
catch() catchError()
switch() switchAll()
finally() finalize()
throw() throwError()

RxJS - Observables

Observable 是一个函数,它创建一个 Observer 并将其附加到期望获取值的源,例如,DOM 元素的点击、鼠标事件或 HTTP 请求等。

观察者是一个带有回调函数的对象,当 Observable 发生交互时,它将被调用,即数据源发生了交互,例如按钮点击、Http 请求等。

本章我们将讨论以下主题:

  • 创建 Observable
  • 订阅 Observable
  • 执行 Observable

创建 Observable

Observable 可以使用 Observable 构造函数创建,也可以使用 Observable create 方法创建,并将 subscribe 函数作为参数传递给它,如下所示:

testrx.js

import { Observable } from 'rxjs';

var observable = new Observable(
   function subscribe(subscriber) {
      subscriber.next("My First Observable")
   }
);

我们创建了一个 Observable,并使用 Observable 内部的 subscriber.next 方法添加了一条消息“My First Observable”。

我们也可以使用 Observable.create() 方法创建 Observable,如下所示:

testrx.js

import { Observable } from 'rxjs';
var observer = Observable.create(
   function subscribe(subscriber) {
      subscriber.next("My First Observable")
   }
);

订阅 Observable

您可以如下订阅 Observable:

testrx.js

import { Observable } from 'rxjs';

var observer = new Observable(
   function subscribe(subscriber) {
      subscriber.next("My First Observable")
   }
);
observer.subscribe(x => console.log(x));

当观察者订阅时,它将开始执行 Observable。

这是我们在浏览器控制台中看到的:

Subscribe Observable

执行 Observable

Observable 在被订阅时执行。观察者是一个带有三个方法的对象,这些方法会收到通知:

next() - 此方法将发送值,例如数字、字符串、对象等。

complete() - 此方法不会发送任何值,并指示 Observable 已完成。

error() - 如果有任何错误,此方法将发送错误。

让我们创建包含所有三个通知的 Observable 并执行它。

testrx.js

import { Observable } from 'rxjs';
var observer = new Observable(
   function subscribe(subscriber) {
      try {
         subscriber.next("My First Observable");
         subscriber.next("Testing Observable");
         subscriber.complete();
      } catch(e){
         subscriber.error(e);
      }
   }
);
observer.subscribe(x => console.log(x), (e)=>console.log(e), 
   ()=>console.log("Observable is complete"));

在上面的代码中,我们添加了 next、complete 和 error 方法。

try{
   subscriber.next("My First Observable");
   subscriber.next("Testing Observable");
   subscriber.complete();
} catch(e){
   subscriber.error(e);
}

要执行 next、complete 和 error,我们必须调用 subscribe 方法,如下所示:

observer.subscribe(x => console.log(x), (e)=>console.log(e), 
   ()=>console.log("Observable is complete"));

只有在发生错误时才会调用 error 方法。

这是在浏览器中看到的输出:

Execute Observable

RxJS - 操作符

操作符是 RxJS 的重要组成部分。操作符是一个纯函数,它接收 Observable 作为输入,输出也是一个 Observable。

使用操作符

操作符是一个纯函数,它接收 Observable 作为输入,输出也是一个 Observable。

要使用操作符,我们需要 pipe() 方法。

使用 pipe() 的示例

let obs = of(1,2,3); // an observable
obs.pipe(
   operator1(),
   operator2(),
   operator3(),
   operator3(),
)

在上面的示例中,我们使用 of() 方法创建了一个 Observable,该方法接收值 1、2 和 3。现在,在这个 Observable 上,您可以使用 pipe() 方法使用任意数量的操作符执行不同的操作,如上所示。操作符的执行将按顺序在给定的 Observable 上进行。

下面是一个工作示例:

import { of } from 'rxjs';
import { map, reduce, filter } from 'rxjs/operators';

let test1 = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
let case1 = test1.pipe(
   filter(x => x % 2 === 0),
   reduce((acc, one) => acc + one, 0)
)
case1.subscribe(x => console.log(x));

Output

30

在上面的示例中,我们使用了 filter 操作符,它过滤偶数,接下来我们使用了 reduce() 操作符,它将添加偶数值并在订阅时给出结果。

以下是我们将要讨论的 Observable 列表。

  • 创建
  • 数学
  • 连接
  • 转换
  • 过滤
  • 实用程序
  • 条件
  • 多播
  • 错误处理

创建操作符

以下是我们将在创建操作符类别中讨论的操作符:

序号 操作符和描述
1 ajax

此操作符将为给定的 URL 发起一个 ajax 请求。

2 from

此操作符将从数组、类数组对象、Promise、可迭代对象或类 Observable 对象创建 Observable。

3 fromEvent

此操作符将输出一个 Observable,该 Observable 将用于发出事件的元素,例如按钮、点击等。

4 fromEventPattern

此操作符将从用于注册事件处理程序的输入函数创建 Observable。

5 interval

此操作符将为给定时间内的每次创建 Observable。

6 of

此操作符将接收传递的参数并将其转换为 Observable。

7 range

此操作符将创建一个 Observable,该 Observable 将根据提供的范围为您提供一系列数字。

8 throwError

此操作符将创建一个 Observable,该 Observable 将通知错误。

9 timer

此操作符将创建一个 Observable,该 Observable 将在超时后发出值,并且每个调用后值将持续增加。

10 iif

此操作符将决定订阅哪个 Observable。

数学操作符

以下是我们将在数学操作符类别中讨论的操作符:

序号 操作符和描述
1 Count

count() 操作符接收一个包含值的 Observable,并将其转换为一个将发出单个值的 Observable。

2 Max

Max 方法将接收一个包含所有值的 Observable,并返回一个包含最大值的 Observable。

3 Min

Min 方法将接收一个包含所有值的 Observable,并返回一个包含最小值的 Observable。

4 Reduce

在 reduce 操作符中,累加器函数用于输入 Observable,累加器函数将以 Observable 的形式返回累加值,并可选地将种子值传递给累加器函数。

reduce() 函数将接收两个参数,一个累加器函数,第二个是种子值。

连接操作符

以下是我们将在连接操作符类别中讨论的操作符。

序号 操作符和描述
1 concat

此操作符将按顺序发出作为输入给出的 Observable,然后继续下一个。

2 forkJoin

此操作符将接收数组或字典对象作为输入,并将等待 Observable 完成,然后返回从给定 Observable 发出的最后一个值。

3 merge

此操作符将接收输入 Observable,并将发出来自 Observable 的所有值,并发出一个输出 Observable。

4 race

它将返回一个 Observable,该 Observable 将是第一个源 Observable 的镜像副本。

转换操作符

以下是我们将在转换操作符类别中讨论的操作符。

序号 操作符和描述
1 buffer

buffer 操作在一个 Observable 上,并接收一个 Observable 作为参数。它将开始将其原始 Observable 上发出的值缓冲到数组中,并在作为参数提供的 Observable 发出时发出这些值。一旦作为参数提供的 Observable 发出,缓冲区将重置并再次开始在原始 Observable 上缓冲,直到输入 Observable 发出,然后相同的场景重复。

2 bufferCount

在 buffercount() 操作符的情况下,它将收集在其调用的 Observable 上的值,并在给定给 buffercount 的缓冲区大小匹配时发出这些值。

3 bufferTime

这类似于 bufferCount,因此在这里,它将收集在其调用的 Observable 上的值,并在完成 bufferTimeSpan 时发出这些值。它接收一个参数,即 bufferTimeSpan

4 bufferToggle

在 bufferToggle() 的情况下,它接收两个参数,openings 和 closingSelector。opening 参数是可订阅的或 Promise 以启动缓冲区,第二个参数 closingSelector 又是可订阅的或 Promise,指示关闭缓冲区并发出收集的值。

5 bufferWhen

此操作符将以数组形式给出值,它接收一个函数作为参数,该函数将决定何时关闭、发出和重置缓冲区。

6 expand

expand 操作符接收一个函数作为参数,该函数递归地应用于源 Observable 以及输出 Observable。最终值为一个 Observable。

7 groupBy

在 groupBy 操作符中,输出根据特定条件进行分组,这些组项目作为 GroupedObservable 发出。

8 map

在 map 操作符的情况下,投影函数应用于源 Observable 上的每个值,并将相同的输出作为 Observable 发出。

9 mapTo

每次源 Observable 发出值时,都会输出一个常数值以及 Observable。

10 mergeMap

在 mergeMap 操作符的情况下,投影函数应用于每个源值,其输出与输出 Observable 合并。

11 switchMap

在 switchMap 操作符的情况下,投影函数应用于每个源值,其输出与输出 Observable 合并,并且给定的值为最新的投影 Observable。

12 window

它接收一个参数 windowboundaries,它是一个 Observable,并在给定的 windowboundaries 发出时返回一个嵌套的 Observable。

过滤操作符

以下是我们将在过滤操作符类别中讨论的操作符。

序号 操作符和描述
1 debounce

源 Observable 在一段时间后发出的值,并且发射由作为 Observable 或 Promise 给出的另一个输入确定。

2 debounceTime

它将仅在时间完成后从源 Observable 发出值。

3 distinct

此操作符将给出来自源 Observable 的所有与前一个值相比不同的值。

4 elementAt

此操作符将根据给定的索引从源 Observable 给出一个值。

5 filter

此操作符将根据给定的谓词函数过滤来自源 Observable 的值。

6 first

此操作符将给出源 Observable 发出的第一个值。

7 last

此操作符将给出源 Observable 发出的最后一个值。

8 ignoreElements

此操作符将忽略来自源 Observable 的所有值,并且仅执行对 complete 或 error 回调函数的调用。

9 sample

此操作符将给出来自源 Observable 的最新值,并且输出将取决于传递给它的参数的发射。

10 skip

此操作符将返回一个 Observable,该 Observable 将跳过作为输入接收的前 count 个项目的第一个出现。

11 throttle

此操作符将输出以及忽略来自源 Observable 的值,持续时间由作为参数接收的输入函数确定,并且将重复相同的过程。

实用程序操作符

以下是我们将在实用程序操作符类别中讨论的操作符。

序号 操作符和描述
1 tap

此操作符将输出与源 Observable 相同,并且可用于从 Observable 向用户记录值。主要值、任何错误或任务是否完成。

2 delay

此操作符根据给定的超时延迟源 Observable 发出的值。

3 delayWhen

此操作符根据来自作为输入接收的另一个 Observable 的超时延迟源 Observable 发出的值。

4 observeOn

此操作符基于输入调度程序将重新发出来自源 Observable 的通知。

5 subscribeOn

此操作符有助于根据作为输入接收的调度程序异步订阅源 Observable。

6 timeInterval

此操作符将返回一个对象,该对象包含当前值以及使用接收的调度程序输入计算的当前值和前一个值之间经过的时间。

7 timestamp

返回与来自源 Observable 的值一起的时间戳,该时间戳说明了值发出时间。

8 timeout

如果源 Observable 在给定的超时后未发出值,则此操作符将抛出错误。

9 toArray

累积来自 Observable 的所有源值,并在源完成时将其作为数组输出。

条件操作符

下面我们将讨论条件运算符类别中的运算符。

序号 操作符和描述
1 defaultIfEmpty

如果源 Observable 为空,此运算符将返回一个默认值。

2 every

它将根据输入函数是否满足源 Observable 上每个值的条件返回一个 Observable。

3 find

当源 Observable 的第一个值满足作为输入的谓词函数的条件时,这将返回该 Observable。

4 findIndex

此操作符基于输入调度程序将重新发出来自源 Observable 的通知。

5 isEmpty

如果输入 Observable 在不发出任何值的情况下完成回调,则此运算符将输出 true;如果输入 Observable 发出任何值,则输出 false。

多播运算符

下面我们将讨论多播运算符类别中的运算符。

序号 操作符和描述
1 multicast

multicast 运算符共享与其他订阅者创建的单个订阅。multicast 接收的参数是一个主题或一个返回 ConnectableObservable 的工厂方法,该方法具有 connect() 方法。要订阅,必须调用 connect() 方法。

2 publish

此运算符返回 ConnectableObservable,需要使用 connect() 方法订阅 Observable。

3 publishBehavior

publishBehavior 使用 BehaviourSubject,并返回 ConnectableObservable。必须使用 connect() 方法订阅创建的 Observable。

4 publishLast

publishBehaviour 使用 AsyncSubject,并返回 ConnectableObservable。必须使用 connect() 方法订阅创建的 Observable。

5 publishReplay

publishReplay 使用行为主题,其中它可以缓冲值并将相同的值重播给新的订阅者,并返回 ConnectableObservable。必须使用 connect() 方法订阅创建的 Observable。

6 share

它是 mutlicast() 运算符的别名,唯一的区别是您不必手动调用 connect() 方法来启动订阅。

错误处理运算符

下面我们将讨论错误处理运算符类别中的运算符。

序号 操作符和描述
1 catchError

此运算符负责通过返回新的 Observable 或错误来捕获源 Observable 上的错误。

2 retry

如果源 Observable 发生错误,此运算符将负责重新尝试源 Observable,并且重试将根据给定的输入计数进行。

RxJS - 使用订阅

创建 Observable 后,要执行 Observable,我们需要订阅它。

count() 运算符

这是一个关于如何订阅 Observable 的简单示例。

示例 1

import { of } from 'rxjs';
import { count } from 'rxjs/operators';

let all_nums = of(1, 7, 5, 10, 10, 20);
let final_val = all_nums.pipe(count());
final_val.subscribe(x => console.log("The count is "+x));

Output

The count is 6

订阅有一个名为 unsubscribe() 的方法。调用 unsubscribe() 方法将删除用于该 Observable 的所有资源,即 Observable 将被取消。这是一个使用 unsubscribe() 方法的工作示例。

示例 2

import { of } from 'rxjs';
import { count } from 'rxjs/operators';

let all_nums = of(1, 7, 5, 10, 10, 20);
let final_val = all_nums.pipe(count());
let test = final_val.subscribe(x => console.log("The count is "+x));
test.unsubscribe();

订阅存储在变量 test 中。我们使用了 test.unsubscribe() 取消订阅 Observable。

Output

The count is 6

RxJS - 使用主题

主题是一个可以多播(即与许多观察者通信)的 Observable。考虑一个带有事件侦听器的按钮,使用 addListener 附加到事件的函数在用户每次单击按钮时都会被调用,主题的功能也类似。

我们将在本章中讨论以下主题:

  • 创建主题
  • Observable 和 Subject 之间有什么区别?
  • Behaviour Subject
  • Replay Subject
  • AsyncSubject

创建主题

要使用主题,我们需要导入 Subject,如下所示:

import { Subject } from 'rxjs';

您可以如下创建主题对象:

const subject_test = new Subject();

该对象是一个观察者,它具有三个方法:

  • next(v)
  • error(e)
  • complete()

订阅主题

您可以像下面这样在主题上创建多个订阅:

subject_test.subscribe({
   next: (v) => console.log(`From Subject : ${v}`)
});
subject_test.subscribe({
   next: (v) => console.log(`From Subject: ${v}`)
});

订阅注册到主题对象,就像我们之前讨论过的 addListener 一样。

将数据传递到主题

您可以使用 next() 方法将数据传递到创建的主题。

subject_test.next("A");

数据将传递到主题上添加的所有订阅。

示例

这是一个主题的工作示例:

import { Subject } from 'rxjs';

const subject_test = new Subject();

subject_test.subscribe({
   next: (v) => console.log(`From Subject : ${v}`)
});
subject_test.subscribe({
   next: (v) => console.log(`From Subject: ${v}`)
});
subject_test.next("A");
subject_test.next("B");

主题_test 对象是通过调用 new Subject() 创建的。主题_test 对象引用 next()、error() 和 complete() 方法。上面示例的输出如下所示:

Output

Passing Data

我们可以使用 complete() 方法停止主题执行,如下所示。

示例

import { Subject } from 'rxjs';

const subject_test = new Subject();

subject_test.subscribe({
   next: (v) => console.log(`From Subject : ${v}`)
});
subject_test.subscribe({
   next: (v) => console.log(`From Subject: ${v}`)
});
subject_test.next("A");
subject_test.complete();
subject_test.next("B");

一旦我们调用 complete,稍后调用的 next 方法将不会被调用。

Output

Passing Data Method

现在让我们看看如何调用 error() 方法。

示例

下面是一个工作示例:

import { Subject } from 'rxjs';

const subject_test = new Subject();

subject_test.subscribe({
   error: (e) => console.log(`From Subject : ${e}`)
});
subject_test.subscribe({
   error: (e) => console.log(`From Subject : ${e}`)
});
subject_test.error(new Error("There is an error"));

Output

Passing Data Error

Observable 和 Subject 之间有什么区别?

一个 Observable 将一对一地与订阅者通信。任何时候您订阅 Observable,执行都将从头开始。以使用 ajax 发出的 Http 调用为例,以及两个调用 Observable 的订阅者。您将在浏览器网络选项卡中看到 2 个 Http 请求。

示例

这是一个相同的工作示例:

import { ajax } from 'rxjs/ajax';
import { map } from 'rxjs/operators';

let final_val = ajax('https://jsonplaceholder.typicode.com/users').pipe(map(e => e.response));
let subscriber1 = final_val.subscribe(a => console.log(a));
let subscriber2 = final_val.subscribe(a => console.log(a));

Output

Observable

Observable Ex

现在,这里的问题是,我们希望共享相同的数据,但不是以 2 个 Http 调用的代价。我们希望发出一个 Http 调用并在订阅者之间共享数据。

这可以通过主题来实现。它是一个可以多播(即与许多观察者通信)的 Observable。它可以在订阅者之间共享值。

示例

这是一个使用主题的工作示例:

import { Subject } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { map } from 'rxjs/operators';

const subject_test = new Subject();

subject_test.subscribe({
   next: (v) => console.log(v)
});
subject_test.subscribe({
   next: (v) => console.log(v)
});

let final_val = ajax('https://jsonplaceholder.typicode.com/users').pipe(map(e => e.response));
let subscriber = final_val.subscribe(subject_test);

Output

Observable possible

现在您可以看到只有一个 Http 调用,并且相同的数据在调用的订阅者之间共享。

Observable subscribers

Behaviour Subject

Behaviour Subject 在被调用时将为您提供最新值。

您可以如下创建行为主题:

import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject("Testing Behaviour Subject"); 
// initialized the behaviour subject with value:Testing Behaviour Subject

示例

这是一个使用 Behaviour Subject 的工作示例:

import { BehaviorSubject } from 'rxjs';
const behavior_subject = new BehaviorSubject("Testing Behaviour Subject"); 
// 0 is the initial value

behavior_subject.subscribe({
   next: (v) => console.log(`observerA: ${v}`)
});

behavior_subject.next("Hello");
behavior_subject.subscribe({
   next: (v) => console.log(`observerB: ${v}`)
});
behavior_subject.next("Last call to Behaviour Subject");

Output

Behaviour Subject

Replay Subject

ReplaySubject 类似于 Behaviour Subject,它可以缓冲值并将相同的值重播给新的订阅者。

示例

这是一个 Replay Subject 的工作示例:

import { ReplaySubject } from 'rxjs';
const replay_subject = new ReplaySubject(2); 
// buffer 2 values but new subscribers

replay_subject.subscribe({
   next: (v) => console.log(`Testing Replay Subject A: ${v}`)
});

replay_subject.next(1);
replay_subject.next(2);
replay_subject.next(3);
replay_subject.subscribe({
   next: (v) => console.log(`Testing Replay Subject B: ${v}`)
});

replay_subject.next(5);

Replay Subject 上使用的缓冲区值为 2。因此,最后两个值将被缓冲并用于调用的新订阅者。

Output

Replay Subject

AsyncSubject

在 AsyncSubject 的情况下,最后一个调用的值将传递给订阅者,并且只有在调用 complete() 方法后才会完成。

示例

这是一个相同的工作示例:

import { AsyncSubject } from 'rxjs';

const async_subject = new AsyncSubject();

async_subject.subscribe({
   next: (v) => console.log(`Testing Async Subject A: ${v}`)
});

async_subject.next(1);
async_subject.next(2);
async_subject.complete();
async_subject.subscribe({
   next: (v) => console.log(`Testing Async Subject B: ${v}`)
});

这里,在调用 complete 之前,传递给主题的最后一个值为 2,并且将其提供给订阅者。

Output

Async Subject

RxJS - 使用调度器

调度器控制订阅何时开始和通知。

要使用调度程序,我们需要以下内容:

import { Observable, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';

这是一个工作示例,其中我们将使用调度程序来决定执行。

示例

import { Observable, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';

var observable = new Observable(function subscribe(subscriber) {
   subscriber.next("My First Observable");
   subscriber.next("Testing Observable");
   subscriber.complete();
}).pipe(
   observeOn(asyncScheduler)
);
console.log("Observable Created");
observable.subscribe(
   x => console.log(x),
   (e)=>console.log(e),
   ()=>console.log("Observable is complete")
);

console.log('Observable Subscribed');

Output

Scheduler

如果没有调度程序,输出将如下所示:

Scheduler Controls

使用 RxJS 和 Angular

在本章中,我们将了解如何在 Angular 中使用 RxJs。我们这里不会介绍 Angular 的安装过程,要了解 Angular 安装,请参考此链接:https://tutorialspoint.com/angular7/angular7_environment_setup.htm

我们将直接在一个示例上进行操作,其中将使用来自 RxJS 的 Ajax 加载数据。

示例

app.component.ts

import { Component } from '@angular/core';
import { environment } from './../environments/environment';
import { ajax } from 'rxjs/ajax';
import { map } from 'rxjs/operators'

@Component({
   selector: 'app-root',
   templateUrl: './app.component.html',
   styleUrls: ['./app.component.css']
})
export class AppComponent {
   title = '';
   data;
   constructor() {
      this.data = "";
      this.title = "Using RxJs with Angular";
      let a = this.getData();
   }
   getData() {
      const response =
      ajax('https://jsonplaceholder.typicode.com/users')
         .pipe(map(e => e.response));
      response.subscribe(res => {
         console.log(res);
         this.data = res;
      });
   }
}

app.component.html

<div>
   <h3>{{title}}</h3>
   <ul *ngFor="let i of data">
      <li>{{i.id}}: {{i.name}}</li>
   </ul>
</div>

<router-outlet></router-outlet>

我们使用了来自 RxJS 的 ajax,它将从以下 url 加载数据: https://jsonplaceholder.typicode.com/users

编译后,显示如下:

RxJs with Angular

使用 RxJS 和 ReactJS

在本章中,我们将了解如何在 ReactJS 中使用 RxJs。我们这里不会介绍 Reactjs 的安装过程,要了解 ReactJS 安装,请参考此链接:https://tutorialspoint.com/reactjs/reactjs_environment_setup.htm

示例

我们将直接在下面的示例上进行操作,其中将使用来自 RxJS 的 Ajax 加载数据。

index.js

import React, { Component } from "react";
import ReactDOM from "react-dom";
import { ajax } from 'rxjs/ajax';
import { map } from 'rxjs/operators';
class App extends Component {
   constructor() {
      super();
      this.state = { data: [] };
   }
   componentDidMount() {
      const response = ajax('https://jsonplaceholder.typicode.com/users').pipe(map(e => e.response));
      response.subscribe(res => {
         this.setState({ data: res });
      });
   }
   render() {
      return (
         <div>
            <h3>Using RxJS with ReactJS</h3>
            <ul>
               {this.state.data.map(el => (
                  <li>
                     {el.id}: {el.name}
                  </li>
               ))}
            </ul>
         </div>
      );
   }
}
ReactDOM.render(<App />, document.getElementById("root"));

index.html

<!DOCTYPE html>
<html>
   <head>
      <meta charset = "UTF-8" />
      <title>ReactJS Demo</title>
   <head>
   <body>
      <div id = "root"></div>
   </body>
</html>

我们使用了来自 RxJS 的 ajax,它将从以下 Url 加载数据: https://jsonplaceholder.typicode.com/users

编译后,显示如下:

RxJs with ReactJS
广告