如何在 Java 9 中使用 Flow API 来实现响应式流?
自 Java 9 起,Flow API 便是响应式流规范的官方支持。它是 **Iterator** 和 **Observer** 这两个模式的结合。Flow API 是交互操作规范,并非像 RxJava 那样的最终用户 API。
Flow API 由四个基本接口组成
- 订阅者:订阅者订阅发布者以进行回调。
- 发布者:发布者将数据项流发布给已注册的订阅者。
- 订阅:发布者和订阅者之间的链接。
- 处理器:处理器位于发布者和订阅者之间,将一个流转换到另一个流。
在以下示例中,我们创建了一个基本订阅者,请求一个数据对象、打印它并请求另一个。我们可以使用 Java 提供的发布者实现(SubmissionPublisher)来完成会话。
示例
import java.util.concurrent.Flow; import java.util.List; import java.util.concurrent.SubmissionPublisher; class MySubscriber<T>implements Flow.Subscriber<T> { private Flow.Subscription subscription; @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; this.subscription.request(1); } @Override public void onNext(T item) { System.out.println(item); subscription.request(1); } @Override public void onError(Throwable throwable) { throwable.printStackTrace(); } @Override public void onComplete() { System.out.println("Done"); } } // main class public class FlowTest { public static void main(String args[]) { List<String> items = List.of("1", "2", "3", "4", "5", "6", "7", "8", "9", "10"); SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); publisher.subscribe(new MySubscriber<>()); items.forEach(s -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } publisher.submit(s); }); publisher.close(); } }
输出
1 2 3 4 5 6 7 8 9 10 Done
广告