RxJava 快速指南



RxJava - 概述

RxJava 是一个基于 Java 的 ReactiveX 扩展。它提供了 ReactiveX 项目在 Java 中的实现。以下是 RxJava 的主要特征。

  • 扩展观察者模式。

  • 支持数据/事件序列。

  • 提供运算符以声明方式将序列组合在一起。

  • 在内部处理线程、同步、线程安全和并发数据结构。

什么是 ReactiveX?

ReactiveX 是一个旨在为各种编程语言提供响应式编程概念的项目。响应式编程指的是程序在数据出现时做出反应的场景。它是一种基于事件的编程概念,事件可以传播到注册观察者。

根据 **响应式** 的说法,他们结合了观察者模式、迭代器模式和函数式模式的优点。

正确实现的观察者模式。ReactiveX 结合了观察者模式、迭代器模式和函数式编程的最佳理念。

函数式编程

函数式编程围绕着使用纯函数构建软件。纯函数不依赖于先前状态,并且对于传递的相同参数始终返回相同的结果。纯函数有助于避免与共享对象、可变数据和副作用相关的问题,这些问题在多线程环境中经常出现。

响应式编程

响应式编程指的是事件驱动的编程,其中数据流以异步方式进入并在到达时进行处理。

函数式响应式编程

RxJava 将这两个概念一起实现,其中流数据随时间变化,消费者函数相应地做出反应。

响应式宣言

响应式宣言 是一份在线文档,阐述了应用程序软件系统的较高标准。根据该宣言,响应式软件的主要属性如下:

  • **响应式** - 应该始终及时响应。

  • **消息驱动** - 组件之间应该使用异步消息传递,以便它们保持松耦合。

  • **弹性** - 即使在高负载下也应该保持响应。

  • **弹性** - 即使任何组件发生故障也应该保持响应。

RxJava 的关键组件

RxJava 具有两个关键组件:可观察对象和观察者。

  • **可观察对象** - 它表示类似于流的对象,可以发出零个或多个数据,可以发送错误消息,其速度可以在发出数据集时控制,可以发送有限数据和无限数据。

  • **观察者** - 它订阅可观察对象的序列数据并在每个可观察对象的项目上做出反应。每当可观察对象发出数据时,观察者都会收到通知。观察者逐个处理数据。

如果项目不存在或先前项目的回调未返回,则观察者永远不会收到通知。

RxJava - 环境设置

本地环境设置

RxJava 是一个 Java 库,因此首要要求是在您的机器上安装 JDK。

系统要求

JDK 1.5 或更高版本。
内存 没有最低要求。
磁盘空间 没有最低要求。
操作系统 没有最低要求。

步骤 1 - 验证您的机器上是否安装了 Java

首先,打开控制台并根据您正在使用的操作系统执行 java 命令。

操作系统 任务 命令
Windows 打开命令控制台 c:\> java -version
Linux 打开命令终端 $ java -version
Mac 打开终端 machine:< joseph$ java -version

让我们验证所有操作系统的输出:

操作系统 输出
Windows

java version "1.8.0_101"

Java(TM) SE Runtime Environment (build 1.8.0_101)

Linux

java version "1.8.0_101"

Java(TM) SE Runtime Environment (build 1.8.0_101)

Mac

java version "1.8.0_101"

Java(TM) SE Runtime Environment (build 1.8.0_101)

如果您的系统上没有安装 Java,请从以下链接下载 Java 软件开发工具包 (SDK) https://www.oracle.com。在本教程中,我们假设 Java 1.8.0_101 为已安装版本。

步骤 2 - 设置 JAVA 环境

设置 **JAVA_HOME** 环境变量以指向 Java 安装在您机器上的基本目录位置。例如。

操作系统 输出
Windows 将环境变量 JAVA_HOME 设置为 C:\Program Files\Java\jdk1.8.0_101
Linux export JAVA_HOME = /usr/local/java-current
Mac export JAVA_HOME = /Library/Java/Home

将 Java 编译器位置追加到系统路径。

操作系统 输出
Windows 在系统变量 **Path** 的末尾追加字符串 **C:\Program Files\Java\jdk1.8.0_101\bin**。
Linux export PATH = $PATH:$JAVA_HOME/bin/
Mac 不需要

如上所述,使用命令 **java -version** 验证 Java 安装。

步骤 3 - 下载 RxJava2 存档

RxJava @ MVNRepository 下载最新版本的 RxJava jar 文件及其依赖项 Reactive Streams @ MVNRepository。在撰写本教程时,我们已下载 rxjava-2.2.4.jar、reactive-streams-1.0.2.jar 并将其复制到 C:\>RxJava 文件夹中。

操作系统 存档名称
Windows rxjava-2.2.4.jar, reactive-streams-1.0.2.jar
Linux rxjava-2.2.4.jar, reactive-streams-1.0.2.jar
Mac rxjava-2.2.4.jar, reactive-streams-1.0.2.jar

步骤 4 - 设置 RxJava 环境

设置 **RX_JAVA** 环境变量以指向 RxJava jar 存储在您机器上的基本目录位置。假设我们将 rxjava-2.2.4.jar 和 reactive-streams-1.0.2.jar 存储在 RxJava 文件夹中。

序号 操作系统和说明
1

Windows

将环境变量 RX_JAVA 设置为 C:\RxJava

2

Linux

export RX_JAVA = /usr/local/RxJava

3

Mac

export RX_JAVA = /Library/RxJava

步骤 5 - 设置 CLASSPATH 变量

设置 **CLASSPATH** 环境变量以指向 RxJava jar 位置。

序号 操作系统和说明
1

Windows

将环境变量 CLASSPATH 设置为 %CLASSPATH%;%RX_JAVA%\rxjava-2.2.4.jar;%RX_JAVA%\reactive-streams-1.0.2.jar;.;

2

Linux

export CLASSPATH = $CLASSPATH:$RX_JAVA/rxjava-2.2.4.jar:reactive-streams-1.0.2.jar:.

3

Mac

export CLASSPATH = $CLASSPATH:$RX_JAVA/rxjava-2.2.4.jar:reactive-streams-1.0.2.jar:.

步骤 6 - 测试 RxJava 设置

创建一个名为 TestRx.java 的类,如下所示:

import io.reactivex.Flowable;
public class TestRx {
   public static void main(String[] args) {
      Flowable.just("Hello World!")
         .subscribe(System.out::println);
   }
}

步骤 7 - 验证结果

使用 **javac** 编译器编译类,如下所示:

C:\RxJava>javac Tester.java

验证输出。

Hello World!

RxJava - 可观察对象的工作原理

**可观察对象** 表示数据源,而 **观察者(订阅者)** 则监听它们。简而言之,可观察对象发出项目,然后订阅者使用这些项目。

可观察对象

  • 可观察对象在订阅者开始监听时提供数据。

  • 可观察对象可以发出任意数量的项目。

  • 可观察对象也可以仅发出完成信号,而没有项目。

  • 可观察对象可以成功终止。

  • 可观察对象可能永远不会终止。例如,按钮可以被点击任意次数。

  • 可观察对象可能在任何时候抛出错误。

订阅者

  • 可观察对象可以有多个订阅者。

  • 当可观察对象发出项目时,每个订阅者的 onNext() 方法都会被调用。

  • 当可观察对象完成发出项目时,每个订阅者的 onComplete() 方法都会被调用。

  • 如果可观察对象发出错误,则每个订阅者的 onError() 方法都会被调用。

RxJava - 创建可观察对象

以下是创建可观察对象的基本类。

  • **Flowable** - 0..N 流,发出 0 或 n 个项目。支持 Reactive-Streams 和背压。

  • **Observable** - 0..N 流,但没有背压。

  • **Single** - 1 个项目或错误。可以视为方法调用的响应式版本。

  • **Completable** - 没有发出项目。用作完成或错误的信号。可以视为 Runnable 的响应式版本。

  • **MayBe** - 既不发出项目也不发出 1 个项目。可以视为 Optional 的响应式版本。

以下是 Observable 类中创建可观察对象的便捷方法。

  • **just(T item)** - 返回一个可观察对象,该对象发出给定的(常量引用)项目,然后完成。

  • **fromIterable(Iterable source)** - 将 Iterable 序列转换为一个 ObservableSource,该序列发出序列中的项目。

  • **fromArray(T... items)** - 将数组转换为一个 ObservableSource,该数组发出数组中的项目。

  • **fromCallable(Callable supplier)** - 返回一个可观察对象,当观察者订阅它时,它会调用您指定的函数,然后发出该函数返回的值。

  • **fromFuture(Future future)** - 将 Future 转换为 ObservableSource。

  • **interval(long initialDelay, long period, TimeUnit unit)** - 返回一个可观察对象,该对象在 initialDelay 后发出 0L,然后在每个时间段之后发出递增的数字。

RxJava - 单一可观察对象

Single 类表示单值响应。Single 可观察对象只能发出单个成功值或错误。它不发出 onComplete 事件。

类声明

以下是 **io.reactivex.Single<T>** 类的声明:

public abstract class Single<T>
   extends Object
      implements SingleSource<T>

协议

以下是 Single 可观察对象操作的顺序协议:

onSubscribe (onSuccess | onError)?

Single 示例

使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {
      //Create the observable
      Single<String> testSingle = Single.just("Hello World");

      //Create an observer
      Disposable disposable = testSingle
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(
         new DisposableSingleObserver<String>() {

         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }
      }); 
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

验证结果

使用 **javac** 编译器编译类,如下所示:

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它应该产生以下输出:

Hello World

RxJava - 可能的可观察对象

MayBe 类表示延迟响应。MayBe 可观察对象可以发出单个成功值或不发出任何值。

类声明

以下是 **io.reactivex.Single<T>** 类的声明:

public abstract class Maybe<T>
   extends Object
      implements MaybeSource<T>

协议

以下是 MayBe 可观察对象操作的顺序协议:

onSubscribe (onSuccess | onError | OnComplete)?

MayBe 示例

使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Maybe;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      //Create an observer
      Disposable disposable = Maybe.just("Hello World")
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(new DisposableMaybeObserver<String>() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }

         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

验证结果

使用 **javac** 编译器编译类,如下所示:

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它应该产生以下输出:

Hello World

RxJava - 可完成的可观察对象

Completable 类表示延迟响应。Completable 可观察对象可以指示成功完成或错误。

类声明

以下是 **io.reactivex.Completable** 类的声明:

public abstract class Completable
extends Object
implements CompletableSource

协议

以下是 Completable 可观察对象操作的顺序协议:

onSubscribe (onError | onComplete)?

Completable 示例

使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Completable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableCompletableObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {

      //Create an observer
      Disposable disposable = Completable.complete()
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(new DisposableCompletableObserver() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }
         @Override
         public void onStart() {
            System.out.println("Started!");
         }
         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

验证结果

使用 **javac** 编译器编译类,如下所示:

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它应该产生以下输出:

Started!
Done!

RxJava - 使用 CompositeDisposable

CompositeDisposable 类表示一个容器,该容器可以保存多个可丢弃对象,并且提供添加和删除可丢弃对象的 O(1) 复杂度。

类声明

以下是 **io.reactivex.disposables.CompositeDisposable** 类的声明:

public final class CompositeDisposable
extends Object
implements Disposable, io.reactivex.internal.disposables.DisposableContainer

CompositeDisposable 示例

使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Maybe;
import io.reactivex.Single;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {    
      CompositeDisposable compositeDisposable = new CompositeDisposable();

      //Create an Single observer 
      Disposable disposableSingle = Single.just("Hello World")
      .delay(2, TimeUnit.SECONDS, Schedulers.io())
      .subscribeWith(
      new DisposableSingleObserver<String>() {
         @Override
         public void onError(Throwable e) {
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }
      }); 

      //Create an observer
      Disposable disposableMayBe = Maybe.just("Hi")
      .delay(2, TimeUnit.SECONDS, Schedulers.io())
      .subscribeWith(new DisposableMaybeObserver<String>() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }

         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 

      Thread.sleep(3000);

      compositeDisposable.add(disposableSingle);
      compositeDisposable.add(disposableMayBe);

      //start observing
      compositeDisposable.dispose();
   }
}

验证结果

使用 **javac** 编译器编译类,如下所示:

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它应该产生以下输出:

Hello World
Hi

RxJava - 创建运算符

以下是用于创建可观察对象的运算符。

序号。 运算符和描述
1

创建

从头开始创建可观察对象,并允许以编程方式调用观察者方法。

2

延迟

在观察者订阅之前不要创建 Observable。为每个观察者创建一个新的 Observable。

3

空/永不/抛出

创建一个具有有限行为的 Observable。

4

将对象/数据结构转换为 Observable。

5

间隔

创建一个 Observable,以指定的时间间隔依次发出整数。

6

将对象/数据结构转换为 Observable 以发出相同或相同类型的对象。

7

范围

创建一个 Observable,以给定范围依次发出整数。

8

重复

创建一个 Observable,重复依次发出整数。

9

开始

创建一个 Observable 以发出函数的返回值。

10

定时器

创建一个 Observable,在给定延迟后发出单个项目。

创建运算符示例

使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
//Using fromArray operator to create an Observable
public class ObservableTester  {
   public static void main(String[] args) { 
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .map(String::toUpperCase)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

验证结果

使用 **javac** 编译器编译类,如下所示:

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它应该产生以下输出:

ABCDEFG

RxJava - 转换运算符

以下是用于转换从 Observable 发出的项目的运算符。

序号。 运算符和描述
1

缓冲区

定期将 Observable 中的项目收集到捆绑包中,然后发出捆绑包而不是项目。

2

扁平映射

用于嵌套的 Observable。将项目转换为 Observable。然后将项目展平为单个 Observable。

3

分组依据

将 Observable 分为一组由键组织的 Observable,以发出不同组的项目。

4

地图

对每个发出的项目应用一个函数来转换它。

5

扫描

依次对每个发出的项目应用一个函数,然后发出后续的值。

6

窗口

定期将 Observable 中的项目收集到 Observable 窗口中,然后发出窗口而不是项目。

转换运算符示例

使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
//Using map operator to transform an Observable
public class ObservableTester  { 
   public static void main(String[] args) {    
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .map(String::toUpperCase)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

验证结果

使用 **javac** 编译器编译类,如下所示:

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它应该产生以下输出:

ABCDEFG

RxJava - 过滤运算符

以下是用于选择性地从 Observable 发出项目(s)的运算符。

序号。 运算符和描述
1

去抖动

仅在超时发生时发出项目,而不会发出另一个项目。

2

独特

仅发出唯一项目。

3

ElementAt

仅发出 Observable 发出的第 n 个索引处的项目。

4

过滤器

仅发出通过给定谓词函数的那些项目。

5

第一

发出第一个项目或通过给定条件的第一个项目。

6

忽略元素

不要从 Observable 发出任何项目,但标记完成。

7

最后

从 Observable 发出最后一个元素。

8

样本

以给定的时间间隔发出最新的项目。

9

跳过

跳过 Observable 中的前 n 个项目。

10

跳过最后

跳过 Observable 中的最后 n 个项目。

11

获取

从 Observable 中获取前 n 个项目。

12

获取最后

从 Observable 中获取最后 n 个项目。

过滤运算符示例

使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
//Using take operator to filter an Observable
public class ObservableTester  {
   public static void main(String[] args) {    
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .take(2)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

验证结果

使用 **javac** 编译器编译类,如下所示:

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它应该产生以下输出:

ab

RxJava - 组合运算符

以下是用于从多个 Observable 创建单个 Observable 的运算符。

序号。 运算符和描述
1 和/然后/何时

使用模式和计划中介组合项目集。

2 组合最新

通过指定的函数组合每个 Observable 发出的最新项目,并发出结果项目。

3 加入

如果在第二个 Observable 发出的项目的时段内发出,则组合两个 Observable 发出的项目。

4 合并

组合 Observable 发出的项目。

5 以…开始

在开始发出源 Observable 的项目之前发出指定的项目序列

6 切换

发出 Observable 发出的最新项目。

7 拉链

基于函数组合 Observable 的项目,并发出结果项目。

组合运算符示例

使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
//Using combineLatest operator to combine Observables
public class ObservableTester {
   public static void main(String[] args) {    
      Integer[] numbers = { 1, 2, 3, 4, 5, 6};
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable1 = Observable.fromArray(letters);
      Observable<Integer> observable2 = Observable.fromArray(numbers);
      Observable.combineLatest(observable1, observable2, (a,b) -> a + b)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

验证结果

使用 **javac** 编译器编译类,如下所示:

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它应该产生以下输出:

g1g2g3g4g5g6

RxJava - 实用运算符

以下是通常与 Observable 有用的运算符。

序号。 运算符和描述
1

延迟

注册操作以处理 Observable 生命周期事件。

2

具体化/非具体化

表示发出的项目和发送的通知。

3

ObserveOn

指定要观察的调度程序。

4

序列化

强制 Observable 进行序列化调用。

5

订阅

对项目的排放和通知(如来自 Observable 的完成)进行操作

6

SubscribeOn

指定 Observable 订阅时使用的调度程序。

7

时间间隔

转换 Observable 以发出排放之间经过的时间量的指示。

8

超时

如果指定的时间发生而没有发出任何项目,则发出错误通知。

9

时间戳

将时间戳附加到每个发出的项目。

9

使用

创建一个可处置的资源或与 Observable 具有相同的生命周期。

实用程序运算符示例

使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
//Using subscribe operator to subscribe to an Observable
public class ObservableTester  {
   public static void main(String[] args) {    
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable.subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

验证结果

使用 **javac** 编译器编译类,如下所示:

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它应该产生以下输出:

abcdefg

RxJava - 条件运算符

以下是评估一个或多个 Observable 或发出的项目的运算符。

序号。 运算符和描述
1

全部

评估所有发出的项目以满足给定条件。

2

模棱两可

仅在给定多个 Observable 时发出第一个 Observable 的所有项目。

3

包含

检查 Observable 是否发出特定项目。

4

默认如果为空

如果 Observable 没有发出任何内容,则发出默认项目。

5

序列相等

检查两个 Observable 是否发出相同的项目序列。

6

跳过直到

丢弃第一个 Observable 发出的项目,直到第二个 Observable 发出项目。

7

跳过一段时间

丢弃 Observable 发出的项目,直到给定条件变为假。

8

获取直到

在第二个 Observable 发出项目或终止后丢弃 Observable 发出的项目。

9

获取一段时间

在指定条件变为假后丢弃 Observable 发出的项目。

条件运算符示例

使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
//Using defaultIfEmpty operator to operate on an Observable
public class ObservableTester  {
   public static void main(String[] args) {    
      final StringBuilder result = new StringBuilder();
      Observable.empty()
      .defaultIfEmpty("No Data")
      .subscribe(s -> result.append(s));
      System.out.println(result);
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result1 = new StringBuilder();
      Observable.fromArray(letters)
      .firstElement()
      .defaultIfEmpty("No data")   
      .subscribe(s -> result1.append(s));
      System.out.println(result1);
   }
}

验证结果

使用 **javac** 编译器编译类,如下所示:

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它应该产生以下输出:

No Data
a

RxJava - 数学运算符

以下是对 Observable 发出的整个项目进行操作的运算符。

序号。 运算符和描述
1

平均

评估所有项目的平均值并发出结果。

2

连接

从多个 Observable 发出所有项目,而无需交错。

3

计数

计算所有项目并发出结果。

4

最大值

评估所有项目中最大值项目并发出结果。

5

最小值

评估所有项目中最小值项目并发出结果。

6

减少

对每个项目应用一个函数并返回结果。

7

总和

评估所有项目的总和并发出结果。

数学运算符示例

使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
//Using concat operator to operate on multiple Observables
public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {    
      Integer[] numbers = { 1, 2, 3, 4, 5, 6};
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable1 = Observable.fromArray(letters);
      Observable<Integer> observable2 = Observable.fromArray(numbers);
      Observable.concat(observable1, observable2)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

验证结果

使用 **javac** 编译器编译类,如下所示:

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它应该产生以下输出:

abcdefg123456

RxJava - 可连接运算符

以下是更精确地控制订阅的运算符。

序号。 运算符和描述
1

连接

指示可连接的 Observable 向其订阅者发出项目。

2

发布

将 Observable 转换为可连接的 Observable。

3

RefCount

将可连接的 Observable 转换为普通 Observable。

4

重播

确保每个订阅者都能看到相同的发出项目序列,即使在 Observable 开始发出项目并且订阅者稍后订阅之后也是如此。

可连接运算符示例

使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;
//Using connect operator on a ConnectableObservable
public class ObservableTester {
   public static void main(String[] args) {
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      ConnectableObservable<String> connectable = Observable.fromArray(letters).publish();      
      connectable.subscribe(letter -> result.append(letter));
      System.out.println(result.length());
      connectable.connect();
      System.out.println(result.length());
      System.out.println(result);
   }
}

验证结果

使用 **javac** 编译器编译类,如下所示:

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它应该产生以下输出:

0
7
abcdefg

RxJava - 主题

根据反应式,Subject 可以同时充当 Observable 和 Observer。

Subject 是一种桥梁或代理,在 ReactiveX 的某些实现中可用,它既充当观察者又充当 Observable。因为它是一个观察者,所以它可以订阅一个或多个 Observable,并且因为它是一个 Observable,所以它可以通过重新发出它观察到的项目来传递这些项目,它还可以发出新项目。

有四种类型的 Subject:

序号。 Subject 和描述
1

发布 Subject

仅发出订阅时间之后发出的那些项目。

2 重播 Subject

发出源 Observable 发出的所有项目,无论何时订阅 Observable。

3

行为 Subject

订阅后,发出最新的项目,然后继续发出源 Observable 发出的项目。

4

异步 Subject

在完成发出后,发出源 Observable 发出的最后一个项目。

RxJava - PublishSubject

PublishSubject 将项目发出给当前已订阅的观察者,并将终止事件发出给当前或之后的观察者。

类声明

以下是io.reactivex.subjects.PublishSubject<T>类的声明:

public final class PublishSubject<T>
extends Subject<T>

PublishSubject 示例

使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。

ObservableTester.java

import io.reactivex.subjects.PublishSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         

      PublishSubject<String> subject = PublishSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();

      //Output will be abcd 
      System.out.println(result1);
      //Output will be d only
      //as subscribed after c item emitted.
      System.out.println(result2);
   }
}

验证结果

使用 **javac** 编译器编译类,如下所示:

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它应该产生以下输出:

abcd
d

RxJava - BehaviorSubject

BehaviorSubject 发出它观察到的最新项目,然后将所有后续观察到的项目发出给每个已订阅的观察者。

类声明

以下是io.reactivex.subjects.BehaviorSubject<T>类的声明:

public final class BehaviorSubject<T>
extends Subject<T>

BehaviorSubject 示例

使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。

ObservableTester.java

import io.reactivex.subjects.BehaviorSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         
      BehaviorSubject<String> subject =  BehaviorSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();
      //Output will be abcd
      System.out.println(result1);
      //Output will be cd being BehaviorSubject 
      //(c is last item emitted before subscribe)
      System.out.println(result2);
   }
}

验证结果

使用 **javac** 编译器编译类,如下所示:

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它应该产生以下输出:

abcd
cd

RxJava - ReplaySubject

ReplaySubject 将事件/项目重播给当前和之后的观察者。

类声明

以下是io.reactivex.subjects.ReplaySubject<T>类的声明:

public final class ReplaySubject<T>
extends Subject<T>

ReplaySubject 示例

使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。

ObservableTester.java

import io.reactivex.subjects.ReplaySubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         

      ReplaySubject<String> subject = ReplaySubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();

      //Output will be abcd
      System.out.println(result1);
      //Output will be abcd being ReplaySubject
      //as ReplaySubject emits all the items
      System.out.println(result2);
   }
}

验证结果

使用 **javac** 编译器编译类,如下所示:

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它应该产生以下输出:

abcd
abcd

RxJava - AsyncSubject

AsyncSubject 仅发出最后一个值,然后是完成事件或接收到的错误给观察者。

类声明

以下是io.reactivex.subjects.AsyncSubject<T>类的声明:

public final class  AsyncSubject<T>
extends Subject<T>

AsyncSubject 示例

使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。

ObservableTester.java

import io.reactivex.subjects. AsyncSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         

      AsyncSubject<String> subject =  AsyncSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();

      //Output will be d being the last item emitted
      System.out.println(result1);
      //Output will be d being the last item emitted     
      System.out.println(result2);
   }
}

验证结果

使用 **javac** 编译器编译类,如下所示:

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它应该产生以下输出:

d
d

RxJava - 调度器

调度程序用于多线程环境中处理 Observable 运算符。

根据反应式,Scheduler 用于安排运算符链如何应用于不同的线程。

默认情况下,Observable 和您应用于它的运算符链将在调用其 Subscribe 方法的同一线程上执行其工作并通知其观察者。SubscribeOn 运算符通过指定 Observable 应在其上操作的不同 Scheduler 来更改此行为。ObserveOn 运算符指定 Observable 用于向其观察者发送通知的不同 Scheduler。

RxJava 中有以下类型的调度程序:

序号。 调度程序和描述
1

Schedulers.computation()

创建并返回一个用于计算工作的调度程序。要调度的线程数取决于系统中存在的 CPU。每个 CPU 允许一个线程。最适合事件循环或回调操作。

2

Schedulers.io()

创建并返回一个用于 IO 绑定工作的调度程序。线程池可以根据需要扩展。

3

Schedulers.newThread()

创建并返回一个为每个工作单元创建一个新线程的调度程序。

4

Schedulers.trampoline()

创建并返回一个在当前线程上将工作排队以在当前工作完成后执行的调度程序。

4

Schedulers.from(java.util.concurrent.Executor executor)

将 Executor 转换为新的 Scheduler 实例。

RxJava - Trampoline 调度器

Schedulers.trampoline() 方法创建并返回一个在当前线程上将工作排队以在当前工作完成后执行的调度程序。

Schedulers.trampoline() 示例

使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.trampoline()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

验证结果

使用 **javac** 编译器编译类,如下所示:

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它应该产生以下输出:

Processing Thread main
Receiver Thread main, Item length 1
Processing Thread main
Receiver Thread main, Item length 2
Processing Thread main
Receiver Thread main, Item length 3

RxJava - NewThread 调度器

Schedulers.newThread() 方法创建并返回一个为每个工作单元创建一个新线程的调度程序。

Schedulers.newThread() 示例

使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.newThread()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

验证结果

使用 **javac** 编译器编译类,如下所示:

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它应该产生以下输出:

Processing Thread RxNewThreadScheduler-1
Receiver Thread RxNewThreadScheduler-1, Item length 1
Processing Thread RxNewThreadScheduler-2
Receiver Thread RxNewThreadScheduler-2, Item length 2
Processing Thread RxNewThreadScheduler-3
Receiver Thread RxNewThreadScheduler-3, Item length 3

RxJava - Computation 调度器

Schedulers.computation() 方法创建并返回一个用于计算工作的调度程序。要调度的线程数取决于系统中存在的 CPU。每个 CPU 允许一个线程。最适合事件循环或回调操作。

Schedulers.computation() 示例

使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.computation()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

验证结果

使用 **javac** 编译器编译类,如下所示:

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它应该产生以下输出:

Processing Thread RxComputationThreadPool-1
Receiver Thread RxComputationThreadPool-1, Item length 1
Processing Thread RxComputationThreadPool-2
Receiver Thread RxComputationThreadPool-2, Item length 2
Processing Thread RxComputationThreadPool-3
Receiver Thread RxComputationThreadPool-3, Item length 3

RxJava - IO 调度器

Schedulers.io() 方法创建并返回一个用于 IO 绑定工作的调度程序。线程池可以根据需要扩展。最适合 I/O 密集型操作。

Schedulers.io() 示例

使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.io()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

验证结果

使用 **javac** 编译器编译类,如下所示:

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它应该产生以下输出:

Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 1
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 2
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 3

RxJava - 来自调度器

Schedulers.from(Executor) 方法将 Executor 转换为新的 Scheduler 实例。

Schedulers.from(Executor) 示例

使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。

ObservableTester.java

import java.util.Random;
import java.util.concurrent.Executors;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(3))))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

验证结果

使用 **javac** 编译器编译类,如下所示:

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它应该产生以下输出:

Processing Thread pool-1-thread-1
Processing Thread pool-3-thread-1
Receiver Thread pool-1-thread-1, Item length 1
Processing Thread pool-4-thread-1
Receiver Thread pool-4-thread-1, Item length 3
Receiver Thread pool-3-thread-1, Item length 2

RxJava - 缓冲

缓冲操作符允许将 Observable 发出的项目收集到列表或捆绑包中,并发出这些捆绑包而不是项目本身。在下面的示例中,我们创建了一个 Observable 来发出 9 个项目,并使用缓冲,3 个项目将一起发出。

缓冲示例

使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

import java.util.List;
import java.util.concurrent.TimeUnit;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      Observable<Integer> observable = Observable.just(1, 2, 3, 4,
         5, 6, 7, 8, 9);

      observable.subscribeOn(Schedulers.io())
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .buffer(3)
         .subscribe(new Observer<List<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
               System.out.println("Subscribed");
            }
            @Override
            public void onNext(List<Integer> integers) {
               System.out.println("onNext: ");
               for (Integer value : integers) {
                  System.out.println(value);
               }
            }
            @Override
            public void onError(Throwable e) {
               System.out.println("Error");
            }

            @Override
            public void onComplete() {
               System.out.println("Done! ");
            }
         });
      Thread.sleep(3000);
   }
}

验证结果

使用 **javac** 编译器编译类,如下所示:

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它应该产生以下输出:

Subscribed
onNext: 
1
2
3
onNext: 
4
5
6
onNext: 
7
8
9
Done! 

RxJava - 窗口

窗口操作符的工作原理类似于缓冲操作符,但它允许将 Observable 发出的项目收集到另一个 Observable 中,而不是集合中,并发出这些 Observable 而不是集合。在下面的示例中,我们创建了一个 Observable 来发出 9 个项目,并使用窗口操作符,3 个 Observable 将一起发出。

窗口示例

使用您选择的任何编辑器在 C:\> RxJava 中创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      Observable<Integer> observable = Observable.just(1, 2, 3, 4,
         5, 6, 7, 8, 9);

      observable.subscribeOn(Schedulers.io())
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .window(3)
         .subscribe(new Observer<Observable<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
               System.out.println("Subscribed");
            }
            @Override
            public void onNext(Observable<Integer> integers) {
               System.out.println("onNext: ");
               integers.subscribe(value -> System.out.println(value));
            }
            @Override
            public void onError(Throwable e) {
               System.out.println("Error");
            }

            @Override
            public void onComplete() {
               System.out.println("Done! ");
            }
         });
      Thread.sleep(3000);
   }
}

验证结果

使用 **javac** 编译器编译类,如下所示:

C:\RxJava>javac ObservableTester.java

现在运行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它应该产生以下输出:

Subscribed
onNext: 
1
2
3
onNext: 
4
5
6
onNext: 
7
8
9
Done! 
广告