如何在 Java 9 中使用 Flow API 来实现响应式流?


自 Java 9 起,Flow API 便是响应式流规范的官方支持。它是 **Iterator** 和 **Observer** 这两个模式的结合。Flow API 是交互操作规范,并非像 RxJava 那样的最终用户 API。

Flow API 由四个基本接口组成

  • 订阅者:订阅者订阅发布者以进行回调。
  • 发布者:发布者将数据项流发布给已注册的订阅者。
  • 订阅:发布者和订阅者之间的链接。
  • 处理器:处理器位于发布者和订阅者之间,将一个流转换到另一个流。

在以下示例中,我们创建了一个基本订阅者,请求一个数据对象、打印它并请求另一个。我们可以使用 Java 提供的发布者实现(SubmissionPublisher)来完成会话。

示例

import java.util.concurrent.Flow;
import java.util.List;
import java.util.concurrent.SubmissionPublisher;

class MySubscriber<T>implements Flow.Subscriber<T> {
   private Flow.Subscription subscription;
   @Override
   public void onSubscribe(Flow.Subscription subscription) {
      this.subscription = subscription;
      this.subscription.request(1);
   }
   @Override
   public void onNext(T item) {
      System.out.println(item);
      subscription.request(1);
   }
   @Override
   public void onError(Throwable throwable) {
      throwable.printStackTrace();
   }
   @Override
   public void onComplete() {
      System.out.println("Done");
   }
}

// main class
public class FlowTest {
   public static void main(String args[]) {
      List<String> items = List.of("1", "2", "3", "4", "5", "6", "7", "8", "9", "10");
      SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
      publisher.subscribe(new MySubscriber<>());
      items.forEach(s -> {
         try {
            Thread.sleep(1000);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
         publisher.submit(s);
      });
      publisher.close();
   }
}

输出

1
2
3
4
5
6
7
8
9
10
Done

更新于:2020-03-27

2K+ 浏览

启动您的 职业生涯

完成课程获得认证

开始
广告