如何在 Java 9 中使用 Flow API 来实现响应式流?
自 Java 9 起,Flow API 便是响应式流规范的官方支持。它是 **Iterator** 和 **Observer** 这两个模式的结合。Flow API 是交互操作规范,并非像 RxJava 那样的最终用户 API。
Flow API 由四个基本接口组成
- 订阅者:订阅者订阅发布者以进行回调。
- 发布者:发布者将数据项流发布给已注册的订阅者。
- 订阅:发布者和订阅者之间的链接。
- 处理器:处理器位于发布者和订阅者之间,将一个流转换到另一个流。
在以下示例中,我们创建了一个基本订阅者,请求一个数据对象、打印它并请求另一个。我们可以使用 Java 提供的发布者实现(SubmissionPublisher)来完成会话。
示例
import java.util.concurrent.Flow;
import java.util.List;
import java.util.concurrent.SubmissionPublisher;
class MySubscriber<T>implements Flow.Subscriber<T> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(T item) {
System.out.println(item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
}
// main class
public class FlowTest {
public static void main(String args[]) {
List<String> items = List.of("1", "2", "3", "4", "5", "6", "7", "8", "9", "10");
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
publisher.subscribe(new MySubscriber<>());
items.forEach(s -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
publisher.submit(s);
});
publisher.close();
}
}输出
1 2 3 4 5 6 7 8 9 10 Done
广告
数据结构
网络
RDBMS
操作系统
Java
iOS
HTML
CSS
Android
Python
C 编程
C++
C#
MongoDB
MySQL
JavaScript
PHP