gRPC - 客户端流式RPC
现在让我们看看使用 gRPC 通信时客户端流是如何工作的。在这种情况下,客户端将搜索并将图书添加到购物车。一旦客户端完成添加所有图书,服务器将向客户端提供结账购物车价值。
.proto 文件
首先,让我们在 **common_proto_files** 中定义 **bookstore.proto** 文件:
syntax = "proto3";
option java_package = "com.tp.bookstore";
service BookStore {
rpc totalCartValue (stream Book) returns (Cart) {}
}
message BookSearch {
string name = 1;
string author = 2;
int32 price = 3;
}
message Cart {
int32 books = 1;
int32 price = 2;
}
这里,以下代码块表示服务的名称 **"BookStore"** 和可以调用的函数名 **"totalCartValue"**。“totalCartValue”函数接收类型为 **"Book"** 的输入,这是一个流。该函数返回类型为 "Cart" 的对象。因此,实际上,我们允许客户端以流式方式添加图书,一旦客户端完成,服务器就会向客户端提供总购物车价值。
service BookStore {
rpc totalCartValue (stream Book) returns (Cart) {}
}
现在让我们看看这些类型。
message Book {
string name = 1;
string author = 2;
int32 price = 3;
}
客户端将发送它想要购买的 **"Book"**。它可能不是完整的图书信息;它可以只是图书的标题。
message Cart {
int32 books = 1;
int32 price = 2;
}
服务器在获取图书列表后,将返回 **"Cart"** 对象,它只是客户端购买的图书总数和总价格。
请注意,我们已经完成了 Maven 设置,用于自动生成我们的类文件以及我们的 RPC 代码。因此,现在我们可以简单地编译我们的项目:
mvn clean install
这应该会自动生成我们使用 gRPC 所需的源代码。源代码将放置在:
Protobuf class code: target/generated-sources/protobuf/java/com.tp.bookstore Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.bookstore
设置 gRPC 服务器
现在我们已经定义了包含函数定义的 **proto** 文件,让我们设置一个可以服务于这些函数调用的服务器。
让我们编写我们的服务器代码来服务上述函数,并将其保存在 **com.tp.bookstore.BookeStoreServerClientStreaming.java** 中:
示例
package com.tp.bookstore;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
public class BookeStoreServerClientStreaming {
private static final Logger logger = Logger.getLoggerr(BookeStoreServerClientStreaming.class.getName());
static Map<String, Book> bookMap = new HashMap<>();
static {
bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
.setAuthor("Scott Fitzgerald")
.setPrice(300).build());
bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
.setAuthor("Harper Lee")
.setPrice(400).build());
bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
.setAuthor("E.M.Forster")
.setPrice(500).build());
bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
.setAuthor("Scott Fitzgerald")
.setPrice(600).build());
bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman")
.setAuthor("Harper Lee")
.setPrice(700).build());
}
private Server server;
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new BookStoreImpl()).build().start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
public static void main(String[] args) throws IOException, InterruptedException {
final BookeStoreServerClientStreaming greetServer = new BookeStoreServerClientStreaming();
greetServer.start();
greetServer.server.awaitTermination();
}
static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
@Override
public StreamObserver<Book> totalCartValue(StreamObserver<Cart> responseObserver) {
return new StreamObserver<Book>() {
ArrayList<Book> bookCart = new ArrayList<Book>();
@Override
public void onNext(Book book)
logger.info("Searching for book with title starting with: " + book.getName());
for (Entry<String, Book> bookEntry : bookMap.entrySet()) {
if(bookEntry.getValue().getName().startsWith(book.getName())){
logger.info("Found book, adding to cart:....");
bookCart.add(bookEntry.getValue());
}
}
}
@Override
public void onError(Throwable t) {
logger.info("Error while reading book stream: " + t);
}
@Override
public void onCompleted() {
int cartValue = 0;
for (Book book : bookCart) {
cartValue += book.getPrice();
}
responseObserver.onNext(Cart.newBuilder()
.setPrice(cartValue)
.setBooks(bookCart.size()).build());
responseObserver.onCompleted();
}
};
}
}
上面的代码在指定的端口启动一个 gRPC 服务器,并服务于我们在 **proto** 文件中编写的函数和服务。让我们一起浏览上面的代码:
从 **main** 方法开始,我们在指定的端口创建一个 gRPC 服务器。
但在启动服务器之前,我们将要运行的服务分配给服务器,即在我们的例子中,是 **BookStore** 服务。
为此,我们需要将服务实例传递给服务器,因此我们继续创建一个服务实例,即在我们的例子中,是 **BookStoreImpl**。
服务实例需要提供 **.proto 文件** 中存在的 method/function 的实现,即在我们的例子中,是 **totalCartValue** 方法。
现在,鉴于这是客户端流的情况,当客户端添加图书时,服务器将获得一个 Book 列表(在 **proto** 文件中定义)。因此,服务器返回一个 **自定义流观察者**。此流观察者实现了当找到新书时会发生什么以及流关闭时会发生什么。
当客户端添加一本 Book 时,gRPC 框架将调用 **onNext()** 方法。此时,服务器将其添加到购物车。在流式传输的情况下,服务器不会等待所有可用的图书。
当客户端完成添加图书时,将调用流观察者的 **onCompleted()** 方法。此方法实现了服务器在客户端完成添加图书时想要发送的内容,即它将 Cart 对象返回给客户端。
最后,我们还有一个关闭钩子,以确保在我们完成代码执行时服务器能够干净地关闭。
设置 gRPC 客户端
现在我们已经编写了服务器的代码,让我们设置一个可以调用这些函数的客户端。
让我们编写我们的客户端代码来调用上述函数,并将其保存在 **com.tp.bookstore.BookStoreClientServerStreamingBlocking.java** 中:
示例
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreGrpc.BookStoreFutureStub;
import com.tp.bookstore.BookStoreGrpc.BookStoreStub;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;
public class BookStoreClientStreamingClient {
private static final Logger logger = Logger.getLogger(BookStoreClientStreaming.class.getName());
private final BookStoreStub stub;
private boolean serverResponseCompleted = false;
StreamObserver<Book> streamClientSender;
public BookStoreClientStreamingClient(Channel channel) {
stub = BookStoreGrpc.newStub(channel);
}
public StreamObserver<Cart> getServerResponseObserver(){
StreamObserver<Cart> observer = new StreamObserver<Cart>(){
@Override
public void onNext(Cart cart) {
logger.info("Order summary:" + "\nTotal number of Books:" + cart.getBooks() +
"\nTotal Order Value:" + cart.getPrice());
}
@Override
public void onCompleted() {
//logger.info("Server: Done reading orderreading cart");
serverResponseCompleted = true;
}
};
return observer;
}
public void addBook(String book) {
logger.info("Adding book with title starting with: " + book);
Book request = Book.newBuilder().setName(book).build();
if(streamClientSender == null) {
streamClientSender = stub.totalCartValue(getServerResponseObserver());
}
try {
streamClientSender.onNext(request);
}
catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
}
}
public void completeOrder() {
logger.info("Done, waiting for server to create order summary...");
if(streamClientSender != null);
streamClientSender.onCompleted();
}
public static void main(String[] args) throws Exception {
String serverAddress = "localhost:50051";
ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
.usePlaintext()
.build();
try {
BookStoreClientStreamingClient client = new BookStoreClientStreamingClient(channel);
String bookName = "";
while(true) {
System.out.println("Type book name to be added to the cart....");
bookName = System.console().readLine();
if(bookName.equals("EXIT")) {
client.completeOrder();
break;
}
client.addBook(bookName);
}
while(client.serverResponseCompleted == false) {
Thread.sleep(2000);
}
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
上面的代码在指定的端口启动一个 gRPC 服务器,并服务于我们在 proto 文件中编写的函数和服务。让我们一起浏览上面的代码:
从 **main** 方法开始,我们接受一个参数,即我们要搜索的图书标题。
我们为与服务器的 gRPC 通信设置一个 Channel。
接下来,我们使用创建的 channel 创建一个 **非阻塞存根**。在这里,我们选择我们要调用其函数的服务 **"BookStore"**。
然后,我们简单地创建 **.proto** 文件中定义的预期输入,即在我们的例子中是 **Book**,并且我们添加了我们希望服务器添加的标题。
但是鉴于这是客户端流的情况,我们首先为服务器创建一个流观察者。此服务器流观察者列出了服务器响应时需要执行的操作的行为,即 **onNext()** 和 **onCompleted()**。
并且使用存根,我们还获得了客户端流观察者。我们使用此流观察者将数据(即 Book)发送到添加到购物车。最终,我们进行调用并在有效的图书上获得迭代器。当我们迭代时,我们得到服务器提供的相应图书。
一旦我们的订单完成,我们确保客户端流观察者关闭。它告诉服务器计算购物车价值并将其作为输出提供。
最后,我们关闭 channel 以避免任何资源泄漏。
所以,这就是我们的客户端代码。
客户端服务器调用
总而言之,我们要做的是:
启动 gRPC 服务器。
客户端通过向服务器通知它们来添加一系列图书。
服务器在其商店中搜索图书并将它们添加到购物车。
当客户端完成订购时,服务器会响应客户端的总购物车价值。
现在,我们已经定义了我们的 **proto** 文件,编写了我们的服务器和客户端代码,让我们继续执行此代码并查看实际情况。
要运行代码,请启动两个 shell。通过执行以下命令在第一个 shell 中启动服务器:
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerClientStreaming
我们将看到以下输出:
输出
Jul 03, 2021 10:37:21 PM com.tp.bookstore.BookeStoreServerStreaming start INFO: Server started, listening on 50051
以上输出表示服务器已启动。
现在,让我们启动客户端。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientServerStreamingClient
让我们向我们的客户端添加一些图书。
Type book name to be added to the cart.... Gr Jul 24, 2021 5:53:07 PM com.tp.bookstore.BookStoreClientStreamingClient addBook INFO: Adding book with title starting with: Great Type book name to be added to the cart.... Pa Jul 24, 2021 5:53:20 PM com.tp.bookstore.BookStoreClientStreamingClient addBook INFO: Adding book with title starting with: Passage Type book name to be added to the cart....
一旦我们添加了图书并且输入“EXIT”,服务器然后计算购物车价值,以下是我们得到的输出:
输出
EXIT Jul 24, 2021 5:53:33 PM com.tp.bookstore.BookStoreClientStreamingClient completeOrder INFO: Done, waiting for server to create order summary... Jul 24, 2021 5:53:33 PM com.tp.bookstore.BookStoreClientStreamingClient$1 onNext INFO: Order summary: Total number of Books: 2 Total Order Value: 800
因此,正如我们所看到的,客户端能够添加图书。一旦所有图书都被添加,服务器将响应图书总数和总价格。