gRPC - 快速指南
gRPC - 简介
在深入了解 gRPC 之前,让我们先简单了解一下远程过程调用,也就是 gRPC 所做的事情。
什么是远程过程调用?
远程过程调用是看起来像普通/本地函数调用一样的函数调用,但不同之处在于远程函数调用通常在不同的机器上执行。但是,对于编写代码的开发人员来说,函数调用和远程调用之间的区别很小。这些调用通常遵循客户端-服务器模型,其中执行调用的机器充当服务器。
为什么我们需要远程过程调用?
远程过程调用提供了一种在另一台机器上执行代码的方法。在大型、庞大的产品中,这变得至关重要,因为单台机器无法承载产品正常运行所需的所有代码。
在微服务架构中,应用程序被分解成小的服务,这些服务通过消息队列和 API 相互通信。所有这些通信都发生在网络上,不同的机器/节点根据它们所承载的服务提供不同的功能。因此,在分布式环境中工作时,创建远程过程调用成为一个关键方面。
为什么选择 gRPC?
Google 远程过程调用 (gRPC) 提供了一个执行远程过程调用的框架。但是,还有一些其他库和机制可以在远程机器上执行代码。那么,是什么让 gRPC 如此特别呢?让我们来了解一下。
语言无关性 - gRPC 在内部使用 Google Protocol Buffer。因此,可以使用多种语言,例如 Java、Python、Go、Dart 等。Java 客户端可以进行过程调用,而使用 Python 的服务器可以做出响应,从而有效地实现语言无关性。
高效的数据压缩 - 在微服务环境中,考虑到网络上会发生多次通信,因此我们发送的数据尽可能简洁至关重要。我们需要避免任何多余的数据,以确保数据快速传输。鉴于 gRPC 在内部使用 Google Protocol Buffer,因此它可以利用此功能。
高效的序列化和反序列化 - 在微服务环境中,考虑到网络上会发生多次通信,因此我们尽可能快速地序列化和反序列化数据至关重要。鉴于 gRPC 在内部使用 Google Protocol Buffer,因此它可以确保快速序列化和反序列化数据。
易于使用 - gRPC 已经拥有一个库和插件,可以自动生成过程代码(我们将在后续章节中看到)。对于简单的用例,它可以像本地函数调用一样使用。
gRPC 与使用 JSON 的 REST 的比较
让我们看看其他通过网络传输数据的方式与 Protobuf 相比如何。
功能 | gRPC | 使用 JSON/XML 的 HTTP |
---|---|---|
语言无关性 | 是 | 是 |
HTTP 版本 | HTTP/2 | HTTP 1.1 |
指定域模式 | .proto 文件(Google Protocol Buffer) | 无 |
序列化数据大小 | 最小 | 高(XML 更高) |
人类可读性 | 否,因为它使用单独的编码模式 | 是,因为它使用基于文本的格式 |
序列化速度 | 最快 | 较慢(XML 最慢) |
数据类型支持 | 更丰富。支持复杂数据类型,如 Any、oneof 等。 | 支持基本数据类型 |
支持演变模式 | 是 | 否 |
gRPC - 设置
Protoc 设置
请注意,此设置仅适用于 Python。对于 Java,所有这些都由 Maven 文件处理。让我们安装“proto”二进制文件,我们将使用它来自动生成“.proto”文件的代码。二进制文件可以在 https://github.com/protocolbuffers/protobuf/releases/ 中找到。根据操作系统选择正确的二进制文件。我们将在 Windows 上安装 proto 二进制文件,但 Linux 的步骤差别不大。
安装完成后,确保您可以通过命令行访问它 -
protoc --version libprotoc 3.15.6
这意味着 Protobuf 已正确安装。现在,让我们转到项目结构。
我们还需要设置 gRPC 代码生成所需的插件。
对于 Python,我们需要执行以下命令 -
python -m pip install grpcio python -m pip install grpcio-tools
它将安装所有必需的二进制文件并将它们添加到路径中。
项目结构
以下是我们将拥有的整体项目结构 -
与各个语言相关的代码进入各自的目录。我们将有一个单独的目录来存储我们的 proto 文件。并且,以下是我们将用于 Java 的项目结构 -
项目依赖项
现在我们已经安装了protoc,我们可以使用protoc从 proto 文件自动生成代码。让我们先创建一个 Java 项目。
以下是我们将用于 Java 项目的 Maven 配置。请注意,它还包含 Protobuf 所需的库。
示例
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.grpc.point</groupId> <artifactId>grpc-point</artifactId> <version>1.0</version> <packaging>jar</packaging> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-netty-shaded</artifactId> <version>1.38.0</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-protobuf</artifactId> <version>1.38.0</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-stub</artifactId> <version>1.38.0</version> </dependency> <dependency> <!-- necessary for Java 9+ --> <groupId>org.apache.tomcat</groupId> <artifactId>annotations-api</artifactId> <version>6.0.53</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4jsimple--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.30</version> </dependency> </dependencies> <build> <extensions> <extension> <groupId>kr.motd.maven</groupId> <artifactId>os-maven-plugin</artifactId> <version>1.6.2</version> </extension> </extensions> <plugins> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>build-helper-maven-plugin</artifactId> <version>1.1</version> <executions> <execution> <id>test</id> <phase>generate-sources</phase> <goals> <goal>add-source</goal> </goals> <configuration> <sources> <source>${basedir}/target/generated-sources</source> </sources> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.xolstice.maven.plugins</groupId> <artifactId>protobuf-maven-plugin</artifactId> <version>0.6.1</version> <configuration> <protocArtifact>com.google.protobuf:protoc:3.12.0:exe:${os.detected.classifier}</protocArtifact> <pluginId>grpc-java</pluginId> <pluginArtifact>io.grpc:protoc-gen-grpcjava:1.38.0:exe:${os.detected.classifier}</pluginArtifact> <protoSourceRoot>../common_proto_files</protoSourceRoot> </configuration> <executions> <execution> <goals> <goal>compile</goal> <goal>compile-custom</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <configuration> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
gRPC - 使用 Java 的 Hello World 应用
现在让我们创建一个基本的类似“Hello World”的应用程序,它将使用 gRPC 和 Java。
.proto 文件
首先让我们在common_proto_files中定义“greeting.proto”文件 -
syntax = "proto3"; option java_package = "com.tp.greeting"; service Greeter { rpc greet (ClientInput) returns (ServerOutput) {} } message ClientInput { string greeting = 1; string name = 2; } message ServerOutput { string message = 1; }
现在让我们仔细看看这个代码块,并了解每一行的作用 -
syntax = "proto3";
此处的“syntax”表示我们正在使用的 Protobuf 版本。因此,我们使用最新版本 3,因此模式可以使用所有对版本 3 有效的语法。
package tutorial;
此处的包用于冲突解决,例如,如果我们有多个名称相同的类/成员。
option java_package = "com.tp.greeting";
此参数特定于 Java,即从“.proto”文件自动生成代码的包。
service Greeter { rpc greet(ClientInput) returns (ServerOutput) {} }
此块表示服务“Greeter”的名称和可以调用的函数名称“greet”。“greet”函数接收类型为“ClientInput”的输入并返回类型为“ServerOutput”的输出。现在让我们看看这些类型。
message ClientInput { string greeting = 1; string name = 2; }
在上面的代码块中,我们定义了ClientInput,它包含两个属性“greeting”和“name”,它们都是字符串。客户端应该将类型为“ClientInput”的对象发送到服务器。
message ServerOutput { string message = 1; }
在上面的代码块中,我们定义了,给定一个“ClientInput”,服务器将返回“ServerOutput”,它将包含一个名为“message”的属性。服务器应该将类型为“ServerOutput”的对象发送到客户端。
请注意,我们已经完成了 Maven 设置以自动生成我们的类文件以及我们的 RPC 代码。因此,现在我们可以简单地编译我们的项目 -
mvn clean install
这应该会自动生成我们使用 gRPC 所需的源代码。源代码将放置在 -
Protobuf class code: target/generated-sources/protobuf/java/com.tp.greeting Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.greeting
设置 gRPC 服务器
现在我们已经定义了包含函数定义的 proto 文件,让我们设置一个可以服务于调用这些函数的服务器。
让我们编写服务器代码来服务于上述函数,并将其保存在com.tp.grpc.GreetServer.java中 -
示例
package com.tp.grpc; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import com.tp.greeting.GreeterGrpc; import com.tp.greeting.Greeting.ClientInput; import com.tp.greeting.Greeting.ServerOutput; public class GreetServer { private static final Logger logger = Logger.getLogger(GreetServer.class.getName()); private Server server; private void start() throws IOException { int port = 50051; server = ServerBuilder.forPort(port).addService(new GreeterImpl()).build().start(); logger.info("Server started, listening on " + port); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { System.err.println("Shutting down gRPC server"); try { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(System.err); } } }); } static class GreeterImpl extends GreeterGrpc.GreeterImplBase { @Override public void greet(ClientInput req, StreamObserver<ServerOutput> responseObserver) { logger.info("Got request from client: " + req); ServerOutput reply = ServerOutput.newBuilder().setMessage( "Server says " + "\"" + req.getGreeting() + " " + req.getName() + "\"" ).build(); responseObserver.onNext(reply); responseObserver.onCompleted(); } } public static void main(String[] args) throws IOException, InterruptedException { final GreetServer greetServer = new GreetServer(); greetServer.start(); greetServer.server.awaitTermination(); } }
上面的代码在指定的端口启动一个 gRPC 服务器,并服务于我们在 proto 文件中编写的函数和服务。让我们遍历上面的代码 -
从main方法开始,我们在指定的端口创建一个 gRPC 服务器。
但在启动服务器之前,我们将服务器分配给我们要运行的服务,即在我们的例子中,是Greeter服务。
为此,我们需要将服务实例传递给服务器,因此我们继续创建服务实例,即在我们的例子中,是GreeterImpl。
服务实例需要提供“.proto”文件中存在的函数/方法的实现,即在我们的例子中,是greet方法。
该方法期望一个类型与“.proto”文件中定义的类型相同的对象,即在我们的例子中,是ClientInput。
该方法处理上述输入,执行计算,然后应该在“.proto”文件中返回提到的输出,即在我们的例子中,是ServerOutput。
最后,我们还有一个shutdown钩子,以确保在完成代码执行后服务器能够干净地关闭。
设置 gRPC 客户端
现在我们已经编写了服务器的代码,让我们设置一个可以调用这些函数的客户端。
让我们编写客户端代码来调用上述函数,并将其保存在com.tp.grpc.GreetClient.java中 -
示例
package com.tp.grpc; import io.grpc.Channel; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.StatusRuntimeException; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import com.tp.greeting.GreeterGrpc; import com.tp.greeting.Greeting.ServerOutput; import com.tp.greeting.Greeting.ClientInput; public class GreetClient { private static final Logger logger = Logger.getLogger(GreetClient.class.getName()); private final GreeterGrpc.GreeterBlockingStub blockingStub; public GreetClient(Channel channel) { blockingStub = GreeterGrpc.newBlockingStub(channel); } public void makeGreeting(String greeting, String username) { logger.info("Sending greeting to server: " + greeting + " for name: " + username); ClientInput request = ClientInput.newBuilder().setName(username).setGreeting(greeting).build(); logger.info("Sending to server: " + request); ServerOutput response; try { response = blockingStub.greet(request); } catch (StatusRuntimeException e) { logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); return; } logger.info("Got following from the server: " + response.getMessage()); } public static void main(String[] args) throws Exception { String greeting = args[0]; String username = args[1]; String serverAddress = "localhost:50051"; ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress) .usePlaintext() .build(); try { GreetClient client = new GreetClient(channel); client.makeGreeting(greeting, username); } finally { channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } } }
上面的代码在指定的端口启动一个 gRPC 服务器,并服务于我们在 proto 文件中编写的函数和服务。让我们遍历上面的代码 -
从main方法开始,我们接受两个参数,即name和greeting。
我们为 gRPC 与服务器的通信设置了一个 Channel。
接下来,我们使用创建的 Channel 创建一个阻塞式存根。在这里,我们拥有服务“Greeter”,我们计划调用它的函数。存根只不过是一个包装器,它隐藏了远程调用的复杂性,使调用者无需关心。
然后,我们简单地创建“.proto”文件中定义的预期输入,即在我们的例子中,是ClientInput。
我们最终进行调用并等待服务器的响应。
最后,我们关闭 Channel 以避免任何资源泄漏。
因此,这就是我们的客户端代码。
客户端服务器调用
现在,我们已经定义了proto文件,编写了服务器和客户端代码,让我们继续执行此代码并查看实际效果。
要运行代码,请启动两个 shell。通过执行以下命令在第一个 shell 中启动服务器 -
java -cp .\target\grpc-point-1.0.jar com.tp.grpc.GreetServer
我们将看到以下输出 -
输出
Jul 03, 2021 1:04:23 PM com.tp.grpc.GreetServer start INFO: Server started, listening on 50051
上述输出意味着服务器已启动。
现在,让我们启动客户端。
java -cp .\target\grpc-point-1.0.jar com.tp.grpc.GreetClient Hello Jane
我们将看到以下输出 -
输出
Jul 03, 2021 1:05:59 PM com.tp.grpc.GreetClient greet INFO: Sending greeting to server: Hello for name: Jane Jul 03, 2021 1:05:59 PM com.tp.grpc.GreetClient greet INFO: Sending to server: greeting: "Hello" name: "Jane" Jul 03, 2021 1:06:00 PM com.tp.grpc.GreetClient greet INFO: Got following from the server: Server says "Hello Jane"
现在,如果我们打开服务器日志,我们将看到以下内容 -
Jul 03, 2021 1:04:23 PM com.tp.grpc.GreetServer start INFO: Server started, listening on 50051 Jul 03, 2021 1:06:00 PM com.tp.grpc.GreetServer$GreeterImpl greet INFO: Got request from client: greeting: "Hello" name: "Jane"
因此,客户端能够按预期调用服务器,并且服务器通过向客户端发送问候进行了响应。
gRPC - 使用 Python 的 Hello World 应用
现在让我们创建一个基本的类似“Hello World”的应用程序,它将使用 gRPC 和 Python。
.proto 文件
首先让我们在common_proto_files中定义greeting.proto文件 -
syntax = "proto3"; service Greeter { rpc greet (ClientInput) returns (ServerOutput) {} } message ClientInput { string greeting = 1; string name = 2; } message ServerOutput { string message = 1; }
现在让我们仔细看看上面代码块中的每一行 -
syntax = "proto3";
此处的“syntax”表示我们正在使用的 Protobuf 版本。因此,我们使用最新版本 3,因此模式可以使用所有对版本 3 有效的语法。
package tutorial;
此处的package用于冲突解决,例如,如果我们有多个名称相同的类/成员。
service Greeter { rpc greet(ClientInput) returns (ServerOutput) {} }
此块表示服务“Greeter”的名称和可以调用的函数名称“greet”。“greet”函数接收类型为“ClientInput”的输入并返回类型为“ServerOutput”的输出。现在让我们看看这些类型。
message ClientInput { string greeting = 1; string name = 2; }
在上面的代码块中,我们定义了ClientInput,它包含两个属性“greeting”和“name”,它们都是字符串。客户端应该将类型为“ClientInput”的对象发送到服务器。
message ServerOutput { string message = 1; }
这里,我们还定义了,给定一个“ClientInput”,服务器将返回一个包含单个属性“message”的“ServerOutput”。服务器应该将类型为“ServerOutput”的对象发送到客户端。
现在,让我们为 Protobuf 类和 gRPC 类生成底层代码。为此,我们需要执行以下命令:
python -m grpc_tools.protoc -I ..\common_proto_files\ -- python_out=../python --grpc_python_out=. greeting.proto
但是,请注意,要执行该命令,我们需要安装教程的设置部分中提到的正确依赖项。
这应该会自动生成我们使用 gRPC 所需的源代码。源代码将放置在 -
Protobuf class code: python/greeting_pb2.py Protobuf gRPC code: python/greeting_pb2_grpcpb2.py
设置 gRPC 服务器
现在我们已经定义了包含函数定义的 proto 文件,让我们设置一个可以调用这些函数的服务器。
让我们编写我们的服务器代码来服务上述函数,并将其保存在server.py中:
示例
from concurrent import futures import grpc import greeting_pb2 import greeting_pb2_grpc class Greeter(greeting_pb2_grpc.GreeterServicer): def greet(self, request, context): print("Got request " + str(request)) return greeting_pb2.ServerOutput(message='{0} {1}!'.format(request.greeting, request.name)) def server(): server = grpc.server(futures.ThreadPoolExecutor(max_workers=2)) greeting_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) server.add_insecure_port('[::]:50051') print("gRPC starting") server.start() server.wait_for_termination() server()
上面的代码在指定的端口启动一个 gRPC 服务器,并服务于我们在 proto 文件中编写的函数和服务。让我们遍历上面的代码 -
从main方法开始,我们在指定的端口上创建一个 gRPC 服务器。
但在启动服务器之前,我们将服务器分配给我们要运行的服务,即在我们的例子中,是Greeter服务。
为此,我们需要将服务实例传递给服务器,因此我们继续创建一个服务实例,即在我们的例子中为Greeter。
服务实例需要提供.proto文件中存在的 method/function 的实现,即在我们的例子中为greet方法。
该方法期望一个类型为 .proto 文件中定义的对象,即对我们来说是request。
该方法处理上述输入,进行计算,然后应该返回.proto文件中提到的输出,即在我们的例子中为ServerOutput。
设置 gRPC 客户端
现在我们已经编写了服务器的代码,让我们设置一个可以调用这些函数的客户端。
让我们编写我们的客户端代码来调用上述函数,并将其保存在client.py中:
示例
import grpc import greeting_pb2 import greeting_pb2_grpc def run(): with grpc.insecure_channel('localhost:50051') as channel: stub = greeting_pb2_grpc.GreeterStub(channel) response = stub.greet(greeting_pb2.ClientInput(name='John', greeting = "Yo")) print("Greeter client received following from server: " + response.message) run()
上面的代码在指定的端口启动一个 gRPC 服务器,并服务于我们在 proto 文件中编写的函数和服务。让我们遍历上面的代码 -
从main方法开始,我们为与服务器的 gRPC 通信设置了一个 Channel。
然后,我们使用 channel 创建一个stub。在这里,我们使用服务“Greeter”,我们计划调用其函数。stub 只是一个包装器,它隐藏了远程调用对调用者的复杂性。
然后,我们简单地创建 proto 文件中定义的预期输入,即在我们的例子中为ClientInput。我们硬编码了两个参数,即name和greeting。
我们最终进行调用并等待服务器的结果。
因此,这就是我们的客户端代码。
客户端服务器调用
现在,我们已经定义了proto文件,编写了服务器和客户端代码,让我们继续执行此代码并查看实际情况。
要运行代码,请启动两个 shell。通过执行以下命令在第一个 shell 中启动服务器 -
python .\server.py
输出
我们将得到以下输出:
gRPC starting
上述输出表示服务器已启动。
现在,让我们启动客户端。
python .\client.py
我们将看到以下输出 -
输出
Greeter client received following from server: Yo John!
现在,如果我们打开服务器日志,我们将看到以下数据:
gRPC starting Got request greeting: "Yo" name: "John"
因此,正如我们所看到的,客户端能够按预期调用服务器,并且服务器响应了向客户端回送问候。
gRPC - 一元 gRPC
我们现在将了解 gRPC 框架支持的各种类型的通信。我们将使用书店的例子,客户可以在其中搜索和下单送书。
让我们看看一元 gRPC 通信,我们让客户端搜索标题并随机返回一个与查询的标题匹配的书籍。
.proto 文件
首先让我们在 common_proto_files 中定义 bookstore.proto 文件:
syntax = "proto3"; option java_package = "com.tp.bookstore"; service BookStore { rpc first (BookSearch) returns (Book) {} } message BookSearch { string name = 1; string author = 2; string genre = 3; } message Book { string name = 1; string author = 2; int32 price = 3; }
现在让我们仔细看看上面代码块中的每一行。
syntax = "proto3";
语法
这里的“语法”表示我们使用的 Protobuf 版本。我们使用最新的版本 3,因此该模式可以使用所有对版本 3 有效的语法。
package tutorial;
此处的package用于冲突解决,例如,如果我们有多个名称相同的类/成员。
option java_package = "com.tp.bookstore";
此参数特定于 Java,即从 .proto 文件自动生成代码的包。
service BookStore { rpc first (BookSearch) returns (Book) {} }
这表示服务的名称"BookStore"和可以调用的函数名称"first"。"first"函数接收类型为"BookSearch"的输入并返回类型为"Book"的输出。因此,实际上,我们让客户端搜索标题并返回一个与查询的标题匹配的书籍。
现在让我们看看这些类型。
message BookSearch { string name = 1; string author = 2; string genre = 3; }
在上面的代码块中,我们定义了 BookSearch,它包含诸如name、author和genre之类的属性。客户端应该将类型为"BookSearch"的对象发送到服务器。
message Book { string name = 1; string author = 2; int32 price = 3; }
这里,我们还定义了,给定一个“BookSearch”,服务器将返回一个包含书籍属性以及书籍价格的“Book”。服务器应该将类型为“Book”的对象发送到客户端。
请注意,我们已经完成了 Maven 的设置,以便自动生成我们的类文件以及我们的 RPC 代码。因此,现在我们可以简单地编译我们的项目:
mvn clean install
这应该会自动生成我们使用 gRPC 所需的源代码。源代码将放置在 -
Protobuf class code: target/generated-sources/protobuf/java/com.tp.bookstore Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.bookstore
设置 gRPC 服务器
现在我们已经定义了包含函数定义的 proto 文件,让我们设置一个可以调用这些函数的服务器。
让我们编写我们的服务器代码来服务上述函数,并将其保存在com.tp.bookstore.BookeStoreServerUnary.java中:
示例
package com.tp.bookstore; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import java.util.stream.Collectors; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; public class BookeStoreServerUnary { private static final Logger logger = Logger.getLogger(BookeStoreServerUnary.class.getName()); static Map<String, Book> bookMap = new HashMap<>(); static { bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby") .setAuthor("Scott Fitzgerald") .setPrice(300).build()); bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird") .setAuthor("Harper Lee") .setPrice(400).build()); bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India") .setAuthor("E.M.Forster") .setPrice(500).build()); bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise") .setAuthor("Scott Fitzgerald") .setPrice(600).build()); bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman") .setAuthor("Harper Lee") .setPrice(700).build()); } private Server server; private void start() throws IOException { int port = 50051; server = ServerBuilder.forPort(port) .addService(new BookStoreImpl()).build().start(); logger.info("Server started, listening on " + port); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { System.err.println("Shutting down gRPC server"); try { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(System.err); } } }); } public static void main(String[] args) throws IOException, InterruptedException { final BookeStoreServerUnary greetServer = new BookeStoreServerUnary(); greetServer.start(); greetServer.server.awaitTermination(); } static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase { @Override public void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) { logger.info("Searching for book with title: " + searchQuery.getName()); List<String> matchingBookTitles = bookMap.keySet().stream().filter(title -> title.startsWith(searchQuery.getName().trim())).collect(Collectors.toList()); Book foundBook = null; if(matchingBookTitles.size() > 0) { foundBook = bookMap.get(matchingBookTitles.get(0)); } responseObserver.onNext(foundBook); responseObserver.onCompleted(); } } }
上面的代码在指定的端口启动一个 gRPC 服务器,并服务于我们在 proto 文件中编写的函数和服务。让我们遍历上面的代码 -
从main方法开始,我们在指定的端口创建一个 gRPC 服务器。
但在启动服务器之前,我们将要运行的服务分配给服务器,即在我们的例子中为BookStore服务。
为此,我们需要将服务实例传递给服务器,因此我们继续创建一个服务实例,即在我们的例子中为BookStoreImpl
服务实例需要提供.proto文件中存在的 method/function 的实现,即在我们的例子中为first方法。
该方法期望一个类型为 .proto 文件中定义的对象,即对我们来说是BookSearch
该方法在可用的 bookMap 中搜索书籍,然后通过调用onNext()方法返回Book。完成后,服务器通过调用onCompleted()宣布它已完成输出。
最后,我们还有一个关闭钩子,以确保在我们完成代码执行时服务器能够干净地关闭。
设置 gRPC 客户端
现在我们已经编写了服务器的代码,让我们设置一个可以调用这些函数的客户端。
让我们编写我们的客户端代码来调用上述函数,并将其保存在com.tp.bookstore.BookStoreClientUnaryBlocking.java中:
示例
package com.tp.bookstore; import io.grpc.Channel; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.StatusRuntimeException; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; import com.tp.greeting.GreeterGrpc; import com.tp.greeting.Greeting.ServerOutput; import com.tp.greeting.Greeting.ClientInput; public class BookStoreClientUnaryBlocking { private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlocking.class.getName()); private final BookStoreGrpc.BookStoreBlockingStub blockingStub; public BookStoreClientUnaryBlocking(Channel channel) { blockingStub = BookStoreGrpc.newBlockingStub(channel); } public void getBook(String bookName) { logger.info("Querying for book with title: " + bookName); BookSearch request = BookSearch.newBuilder().setName(bookName).build(); Book response; try { response = blockingStub.first(request); } catch (StatusRuntimeException e) { logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); return; } logger.info("Got following book from server: " + response); } public static void main(String[] args) throws Exception { String bookName = args[0]; String serverAddress = "localhost:50051"; ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress) .usePlaintext() .build(); try { BookStoreClientUnaryBlocking client = new BookStoreClientUnaryBlocking(channel); client.getBook(bookName); } finally { channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } } }
上述代码在指定的端口上启动一个 gRPC 服务器,并为我们在proto文件中编写的函数和服务提供服务。让我们逐步了解上面的代码:
从main方法开始,我们接受一个参数,即我们要搜索的书籍的标题。
我们为与服务器的 gRPC 通信设置了一个 Channel。
然后,我们使用 channel 创建一个阻塞 stub。在这里,我们选择服务“BookStore”,我们计划调用其函数。“stub”只是一个包装器,它隐藏了远程调用对调用者的复杂性。
然后,我们简单地创建 proto 文件中定义的预期输入,即在我们的例子中为BookSearch,并添加我们希望服务器搜索的标题名称。
我们最终进行调用并等待服务器的结果。
最后,我们关闭 Channel 以避免任何资源泄漏。
因此,这就是我们的客户端代码。
客户端服务器调用
总而言之,我们要做的是:
启动 gRPC 服务器。
客户端查询服务器以获取具有给定名称/标题的书籍。
服务器在其商店中搜索书籍。
服务器然后响应书籍及其其他属性。
现在,我们已经定义了proto文件,编写了服务器和客户端代码,让我们继续执行此代码并查看实际情况。
要运行代码,请启动两个 shell。通过执行以下命令在第一个 shell 中启动服务器 -
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerUnary
我们将看到以下输出 -
输出
Jul 03, 2021 7:21:58 PM com.tp.bookstore.BookeStoreServerUnary start INFO: Server started, listening on 50051
上述输出表示服务器已启动。
现在,让我们启动客户端。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientUnaryBlocking "To Kill"
我们将看到以下输出 -
输出
Jul 03, 2021 7:22:03 PM com.tp.bookstore.BookStoreClientUnaryBlocking getBook INFO: Querying for book with title: To Kill Jul 03, 2021 7:22:04 PM com.tp.bookstore.BookStoreClientUnaryBlocking getBook INFO: Got following book from server: name: "To Kill MockingBird" author: "Harper Lee" price: 400
因此,正如我们所看到的,客户端能够通过使用书籍名称查询服务器来获取书籍详细信息。
gRPC - 服务器端流式 RPC
现在让我们讨论在使用 gRPC 通信时服务器流是如何工作的。在这种情况下,客户端将搜索具有给定作者的书籍。假设服务器需要一些时间才能遍历所有书籍。服务器不是在遍历完所有书籍后等待提供所有书籍,而是会以流式方式提供书籍,即一旦找到一本就提供一本。
.proto 文件
首先让我们在 common_proto_files 中定义 bookstore.proto 文件:
syntax = "proto3"; option java_package = "com.tp.bookstore"; service BookStore { rpc first (BookSearch) returns (stream Book) {} } message BookSearch { string name = 1; string author = 2; string genre = 3; } message Book { string name = 1; string author = 2; int32 price = 3; }
以下代码块表示服务的名称"BookStore"和可以调用的函数名称"searchByAuthor"。"searchByAuthor"函数接收类型为"BookSearch"的输入并返回类型为"Book"的流。因此,实际上,我们让客户端搜索标题并返回一个与查询的作者匹配的书籍。
service BookStore { rpc searchByAuthor (BookSearch) returns (stream Book) {} }
现在让我们看看这些类型。
message BookSearch { string name = 1; string author = 2; string genre = 3; }
这里,我们定义了BookSearch,它包含一些属性,如name、author和genre。客户端应该将类型为"BookSearch"的对象发送到服务器。
message Book { string name = 1; string author = 2; int32 price = 3; }
我们还定义了,给定一个"BookSearch",服务器将返回一个包含书籍属性以及书籍价格的"Book"流。服务器应该发送一个"Book"流。
请注意,我们已经完成了 Maven 的设置,以便自动生成我们的类文件以及我们的 RPC 代码。因此,现在我们可以简单地编译我们的项目:
mvn clean install
这应该会自动生成我们使用 gRPC 所需的源代码。源代码将放置在 -
Protobuf class code: target/generated-sources/protobuf/java/com.tp.bookstore Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.bookstore
设置 gRPC 服务器
现在我们已经定义了包含函数定义的 proto 文件,让我们设置一个可以服务调用这些函数的服务器。
让我们编写我们的服务器代码来服务上述函数,并将其保存在com.tp.bookstore.BookeStoreServerStreaming.java中:
示例
package com.tp.bookstore; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import java.util.stream.Collectors; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; public class BookeStoreServerUnary { private static final Logger logger = Logger.getLogger(BookeStoreServerrStreaming.class.getName()); static Map<String, Book> bookMap = new HashMap<>(); static { bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby") .setAuthor("Scott Fitzgerald") .setPrice(300).build()); bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird") .setAuthor("Harper Lee") .setPrice(400).build()); bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India") .setAuthor("E.M.Forster") .setPrice(500).build()); bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise") .setAuthor("Scott Fitzgerald") .setPrice(600).build()); bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman") .setAuthor("Harper Lee") .setPrice(700).build()); } private Server server; private void start() throws IOException { int port = 50051; server = ServerBuilder.forPort(port) .addService(new BookStoreImpl()).build().start(); logger.info("Server started, listening on " + port); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { System.err.println("Shutting down gRPC server"); try { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(System.err); } } }); } public static void main(String[] args) throws IOException, InterruptedException { final BookeStoreServerUnary greetServer = new BookeStoreServerUnary(); greetServer.start(); greetServer.server.awaitTermination(); } static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase { @Override public void searchByAuthor(BookSearch searchQuery, StreamObserver<Book> responseObserver) { logger.info("Searching for book with author: " + searchQuery.getAuthor()); for (Entry<String, Book> bookEntry : bookMap.entrySet()) { try { logger.info("Going through more books...."); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } if(bookEntry.getValue().getAuthor().startsWith(searchQuery.getAuthor())){ logger.info("Found book with required author: " + bookEntry.getValue().getName()+ ". Sending...."); responseObserver.onNext(bookEntry.getValue()); } } responseObserver.onCompleted(); } } }
上述代码在指定的端口上启动一个 gRPC 服务器,并为我们在proto文件中编写的函数和服务提供服务。让我们逐步了解上面的代码:
从main方法开始,我们在指定的端口创建一个 gRPC 服务器。
但在启动服务器之前,我们将要运行的服务分配给服务器,即在我们的例子中为BookStore服务。
为此,我们需要将服务实例传递给服务器,因此我们继续创建一个服务实例,即在我们的例子中为BookStoreImpl
服务实例需要提供.proto 文件中存在的 method/function 的实现,即在我们的例子中为searchByAuthor方法。
该方法期望一个类型为 .proto 文件中定义的对象,即对我们来说是BookSearch
请注意,我们添加了一个 sleep 来模拟遍历所有书籍的操作。在流式传输的情况下,服务器不会等待所有搜索到的书籍都可用。它在书籍可用时通过使用onNext()调用返回书籍。
当服务器完成请求时,它通过调用onCompleted()关闭通道。
最后,我们还有一个关闭钩子,以确保在我们完成代码执行时服务器能够干净地关闭。
设置 gRPC 客户端
现在我们已经编写了服务器的代码,让我们设置一个可以调用这些函数的客户端。
让我们编写我们的客户端代码来调用上述函数,并将其保存在com.tp.bookstore.BookStoreClientServerStreamingBlocking.java 中:
示例
package com.tp.bookstore; import io.grpc.Channel; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.StatusRuntimeException; import java.util.Iterator; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; import com.tp.greeting.GreeterGrpc; import com.tp.greeting.Greeting.ServerOutput; import com.tp.greeting.Greeting.ClientInput; public class BookStoreClientServerStreamingBlocking { private static final Logger logger = Logger.getLogger(BookStoreClientServerStreamingBlocking.class.getName()); private final BookStoreGrpc.BookStoreBlockingStub blockingStub; public BookStoreClientServerStreamingBlocking(Channel channel) { blockingStub = BookStoreGrpc.newBlockingStub(channel); } public void getBook((String author) { logger.info("Querying for book with author: " + author); BookSearch request = BookSearch.newBuilder()..setAuthor(author).build(); Iterator<Book> response; try { response = blockingStub.searchByAuthor(request); while(response.hasNext()) { logger.info("Found book: " + response.next()); } } catch (StatusRuntimeException e) { logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); return; } } public static void main(String[] args) throws Exception { String authorName = args[0]; String serverAddress = "localhost:50051"; ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress) .usePlaintext() .build(); try { BookStoreClientServerStreamingBlocking client = new BookStoreClientUnaryBlocking(channel); client.getBook(authorName); } finally { channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } } }
上面的代码在指定的端口启动一个 gRPC 服务器,并服务于我们在 proto 文件中编写的函数和服务。让我们遍历上面的代码 -
从main方法开始,我们接受一个参数,即我们要搜索的书籍的title。
我们为与服务器的 gRPC 通信设置了一个 Channel。
然后,我们使用 channel 创建一个阻塞 stub。在这里,我们选择服务“BookStore”,我们计划调用其函数。
然后,我们简单地创建 proto 文件中定义的预期输入,即在我们的例子中为 BookSearch,并添加我们希望服务器搜索的标题。
我们最终进行调用并在有效的 Books 上获取一个迭代器。当我们迭代时,我们得到服务器提供的相应 Books。
最后,我们关闭 Channel 以避免任何资源泄漏。
因此,这就是我们的客户端代码。
客户端服务器调用
总而言之,我们要做的是:
启动 gRPC 服务器。
客户端查询服务器以获取具有给定作者的书籍。
服务器在其商店中搜索书籍,这是一个耗时的过程。
服务器在找到符合给定条件的书籍时做出响应。服务器不会等待所有有效的书籍都可用。它在找到一本时立即发送输出。然后重复此过程。
现在,我们已经定义了proto文件,编写了服务器和客户端代码,让我们继续执行此代码并查看实际情况。
要运行代码,请启动两个 shell。通过执行以下命令在第一个 shell 中启动服务器 -
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerStreaming
我们将看到以下输出 -
输出
Jul 03, 2021 10:37:21 PM com.tp.bookstore.BookeStoreServerStreaming start INFO: Server started, listening on 50051
上述输出表示服务器已启动。
现在,让我们启动客户端。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientServerStreamingBlocking "Har"
我们将看到以下输出 -
输出
Jul 03, 2021 10:40:31 PM com.tp.bookstore.BookStoreClientServerStreamingBlocking getBook INFO: Querying for book with author: Har Jul 03, 2021 10:40:37 PM com.tp.bookstore.BookStoreClientServerStreamingBlocking getBook INFO: Found book: name: "Go Set a Watchman" author: "Harper Lee" price: 700 Jul 03, 2021 10:40:42 PM com.tp.bookstore.BookStoreClientServerStreamingBlocking getBook INFO: Found book: name: "To Kill MockingBird" author: "Harper Lee" price: 400
因此,正如我们所看到的,客户端能够通过使用书籍名称查询服务器来获取书籍详细信息。但更重要的是,客户端在不同的时间戳获取了第一本书和第二本书,即大约 5 秒的间隔。
gRPC - 客户端流式 RPC
现在让我们看看在使用 gRPC 通信时客户端流是如何工作的。在这种情况下,客户端将搜索并将书籍添加到购物车。一旦客户端完成添加所有书籍,服务器将向客户端提供结账购物车值。
.proto 文件
首先让我们在common_proto_files中定义bookstore.proto文件:
syntax = "proto3"; option java_package = "com.tp.bookstore"; service BookStore { rpc totalCartValue (stream Book) returns (Cart) {} } message BookSearch { string name = 1; string author = 2; int32 price = 3; } message Cart { int32 books = 1; int32 price = 2; }
这里,以下代码块表示服务名称 **"BookStore"** 和可以调用的函数名称 **"totalCartValue"**。“totalCartValue” 函数接收类型为 **"Book"** 的输入,该输入是一个流。函数返回类型为 "Cart" 的对象。因此,实际上,我们允许客户端以流的方式添加书籍,一旦客户端完成,服务器就会向客户端提供购物车总价值。
service BookStore { rpc totalCartValue (stream Book) returns (Cart) {} }
现在让我们看看这些类型。
message Book { string name = 1; string author = 2; int32 price = 3; }
客户端将发送它想要购买的 **"Book"**。它可能不是完整的书籍信息;它可以仅仅是书籍的标题。
message Cart { int32 books = 1; int32 price = 2; }
服务器在获取书籍列表后,将返回 **"Cart"** 对象,该对象仅仅是客户端购买的书籍总数和总价。
请注意,我们已经完成了 Maven 的设置,以便自动生成我们的类文件以及我们的 RPC 代码。因此,现在我们可以简单地编译我们的项目:
mvn clean install
这应该会自动生成我们使用 gRPC 所需的源代码。源代码将放置在 -
Protobuf class code: target/generated-sources/protobuf/java/com.tp.bookstore Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.bookstore
设置 gRPC 服务器
现在我们已经定义了包含函数定义的 **proto** 文件,让我们设置一个可以调用这些函数的服务器。
让我们编写我们的服务器代码来服务于上述函数,并将其保存在 **com.tp.bookstore.BookeStoreServerClientStreaming.java** 中 -
示例
package com.tp.bookstore; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import java.util.stream.Collectors; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; import com.tp.bookstore.BookStoreOuterClass.Cart; public class BookeStoreServerClientStreaming { private static final Logger logger = Logger.getLoggerr(BookeStoreServerClientStreaming.class.getName()); static Map<String, Book> bookMap = new HashMap<>(); static { bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby") .setAuthor("Scott Fitzgerald") .setPrice(300).build()); bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird") .setAuthor("Harper Lee") .setPrice(400).build()); bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India") .setAuthor("E.M.Forster") .setPrice(500).build()); bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise") .setAuthor("Scott Fitzgerald") .setPrice(600).build()); bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman") .setAuthor("Harper Lee") .setPrice(700).build()); } private Server server; private void start() throws IOException { int port = 50051; server = ServerBuilder.forPort(port) .addService(new BookStoreImpl()).build().start(); logger.info("Server started, listening on " + port); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { System.err.println("Shutting down gRPC server"); try { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(System.err); } } }); } public static void main(String[] args) throws IOException, InterruptedException { final BookeStoreServerClientStreaming greetServer = new BookeStoreServerClientStreaming(); greetServer.start(); greetServer.server.awaitTermination(); } static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase { @Override public StreamObserver<Book> totalCartValue(StreamObserver<Cart> responseObserver) { return new StreamObserver<Book>() { ArrayList<Book> bookCart = new ArrayList<Book>(); @Override public void onNext(Book book) logger.info("Searching for book with title starting with: " + book.getName()); for (Entry<String, Book> bookEntry : bookMap.entrySet()) { if(bookEntry.getValue().getName().startsWith(book.getName())){ logger.info("Found book, adding to cart:...."); bookCart.add(bookEntry.getValue()); } } } @Override public void onError(Throwable t) { logger.info("Error while reading book stream: " + t); } @Override public void onCompleted() { int cartValue = 0; for (Book book : bookCart) { cartValue += book.getPrice(); } responseObserver.onNext(Cart.newBuilder() .setPrice(cartValue) .setBooks(bookCart.size()).build()); responseObserver.onCompleted(); } }; } }
上述代码在指定的端口上启动一个 gRPC 服务器,并为我们在proto文件中编写的函数和服务提供服务。让我们逐步了解上面的代码:
从main方法开始,我们在指定的端口创建一个 gRPC 服务器。
但在启动服务器之前,我们将要运行的服务分配给服务器,即在我们的例子中为BookStore服务。
为此,我们需要将服务实例传递给服务器,因此我们继续创建一个服务实例,即在我们的例子中为BookStoreImpl
服务实例需要提供 **.proto 文件** 中存在的 method/function 的实现,即在我们的例子中,是 **totalCartValue** 方法。
现在,鉴于这是客户端流的情况,服务器将在客户端添加书籍时获得一个 Book 列表(在 **proto** 文件中定义)。因此,服务器返回一个 **自定义流观察器**。此流观察器实现了当找到新 Book 时以及当流关闭时会发生什么。
当客户端添加一个 Book 时,gRPC 框架将调用 **onNext()** 方法。此时,服务器将其添加到购物车中。在流式传输的情况下,服务器不会等待所有可用的书籍。
当客户端完成添加书籍时,将调用流观察器的 **onCompleted()** 方法。此方法实现了服务器在客户端完成添加书籍时想要发送的内容,即它将 Cart 对象返回给客户端。
最后,我们还有一个关闭钩子,以确保在我们完成代码执行时服务器能够干净地关闭。
设置 gRPC 客户端
现在我们已经编写了服务器的代码,让我们设置一个可以调用这些函数的客户端。
让我们编写我们的客户端代码来调用上述函数,并将其保存在 **com.tp.bookstore.BookStoreClientServerStreamingBlocking.java** 中 -
示例
package com.tp.bookstore; import io.grpc.Channel; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import java.util.Iterator; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import com.tp.bookstore.BookStoreGrpc.BookStoreFutureStub; import com.tp.bookstore.BookStoreGrpc.BookStoreStub; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; import com.tp.bookstore.BookStoreOuterClass.Cart; import com.tp.greeting.GreeterGrpc; import com.tp.greeting.Greeting.ServerOutput; import com.tp.greeting.Greeting.ClientInput; public class BookStoreClientStreamingClient { private static final Logger logger = Logger.getLogger(BookStoreClientStreaming.class.getName()); private final BookStoreStub stub; private boolean serverResponseCompleted = false; StreamObserver<Book> streamClientSender; public BookStoreClientStreamingClient(Channel channel) { stub = BookStoreGrpc.newStub(channel); } public StreamObserver<Cart> getServerResponseObserver(){ StreamObserver<Cart> observer = new StreamObserver<Cart>(){ @Override public void onNext(Cart cart) { logger.info("Order summary:" + "\nTotal number of Books:" + cart.getBooks() + "\nTotal Order Value:" + cart.getPrice()); } @Override public void onCompleted() { //logger.info("Server: Done reading orderreading cart"); serverResponseCompleted = true; } }; return observer; } public void addBook(String book) { logger.info("Adding book with title starting with: " + book); Book request = Book.newBuilder().setName(book).build(); if(streamClientSender == null) { streamClientSender = stub.totalCartValue(getServerResponseObserver()); } try { streamClientSender.onNext(request); } catch (StatusRuntimeException e) { logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); } } public void completeOrder() { logger.info("Done, waiting for server to create order summary..."); if(streamClientSender != null); streamClientSender.onCompleted(); } public static void main(String[] args) throws Exception { String serverAddress = "localhost:50051"; ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress) .usePlaintext() .build(); try { BookStoreClientStreamingClient client = new BookStoreClientStreamingClient(channel); String bookName = ""; while(true) { System.out.println("Type book name to be added to the cart...."); bookName = System.console().readLine(); if(bookName.equals("EXIT")) { client.completeOrder(); break; } client.addBook(bookName); } while(client.serverResponseCompleted == false) { Thread.sleep(2000); } } finally { channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } } }
上面的代码在指定的端口启动一个 gRPC 服务器,并服务于我们在 proto 文件中编写的函数和服务。让我们遍历上面的代码 -
从main方法开始,我们接受一个参数,即我们要搜索的书籍的标题。
我们为与服务器的 gRPC 通信设置了一个 Channel。
接下来,我们使用创建的通道创建一个 **非阻塞存根**。在这里,我们选择计划调用其函数的服务 **"BookStore"**。
然后,我们简单地创建 **.proto** 文件中定义的预期输入,即在我们的例子中,是 **Book**,并添加我们希望服务器添加的标题。
但是鉴于这是客户端流的情况,我们首先为服务器创建一个流观察器。此服务器流观察器列出了服务器响应时需要执行的操作的行为,即 **onNext()** 和 **onCompleted()**
并且使用存根,我们也获得了客户端流观察器。我们使用此流观察器发送要添加到购物车的书籍数据。最终,我们进行调用并在有效书籍上获取迭代器。当我们迭代时,我们得到服务器提供的相应书籍。
并且一旦我们的订单完成,我们确保客户端流观察器已关闭。它告诉服务器计算购物车价值并将其作为输出提供。
最后,我们关闭 Channel 以避免任何资源泄漏。
因此,这就是我们的客户端代码。
客户端服务器调用
总而言之,我们要做的是:
启动 gRPC 服务器。
客户端通过向服务器通知它们来添加书籍流。
服务器在其商店中搜索书籍并将其添加到购物车。
当客户端完成订购时,服务器会响应客户端的购物车总价值。
现在,我们已经定义了proto文件,编写了服务器和客户端代码,让我们继续执行此代码并查看实际情况。
要运行代码,请启动两个 shell。通过执行以下命令在第一个 shell 中启动服务器 -
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerClientStreaming
我们将看到以下输出 -
输出
Jul 03, 2021 10:37:21 PM com.tp.bookstore.BookeStoreServerStreaming start INFO: Server started, listening on 50051
上述输出表示服务器已启动。
现在,让我们启动客户端。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientServerStreamingClient
让我们向我们的客户端添加一些书籍。
Type book name to be added to the cart.... Gr Jul 24, 2021 5:53:07 PM com.tp.bookstore.BookStoreClientStreamingClient addBook INFO: Adding book with title starting with: Great Type book name to be added to the cart.... Pa Jul 24, 2021 5:53:20 PM com.tp.bookstore.BookStoreClientStreamingClient addBook INFO: Adding book with title starting with: Passage Type book name to be added to the cart....
一旦我们添加了书籍并输入“EXIT”,服务器将计算购物车价值,以下是我们获得的输出 -
输出
EXIT Jul 24, 2021 5:53:33 PM com.tp.bookstore.BookStoreClientStreamingClient completeOrder INFO: Done, waiting for server to create order summary... Jul 24, 2021 5:53:33 PM com.tp.bookstore.BookStoreClientStreamingClient$1 onNext INFO: Order summary: Total number of Books: 2 Total Order Value: 800
因此,正如我们所看到的,客户端能够添加书籍。并且一旦所有书籍都添加完毕,服务器将响应书籍总数和总价。
gRPC - 双向 RPC
现在让我们看看在使用 gRPC 通信时客户端-服务器流是如何工作的。在这种情况下,客户端将搜索并将书籍添加到购物车。服务器将在每次添加书籍时响应实时购物车价值。
.proto 文件
首先让我们在common_proto_files中定义bookstore.proto文件:
syntax = "proto3"; option java_package = "com.tp.bookstore"; service BookStore { rpc liveCartValue (stream Book) returns (stream Cart) {} } message Book { string name = 1; string author = 2; int32 price = 3; } message Cart { int32 books = 1; int32 price = 2; }
以下代码块表示服务名称 **"BookStore"** 和可以调用的函数名称 "liveCartValue"。**"liveCartValue"** 函数接收类型为 **"Book"** 的输入,该输入是一个流。函数返回类型为 **"Cart"** 的对象的流。因此,实际上,我们允许客户端以流的方式添加书籍,并且每当添加新书籍时,服务器都会将当前购物车价值响应给客户端。
service BookStore { rpc liveCartValue (stream Book) returns (stream Cart) {} }
现在让我们看看这些类型。
message Book { string name = 1; string author = 2; int32 price = 3; }
客户端将发送它想要购买的 **"Book"**。它不必是完整的书籍信息;它可以仅仅是书籍的标题。
message Cart { int32 books = 1; int32 price = 2; }
服务器在获取书籍列表后,将返回 **"Cart"** 对象,该对象仅仅是客户端购买的书籍总数和总价。
请注意,我们已经完成了 Maven 设置,用于自动生成我们的类文件以及我们的 RPC 代码。因此,现在我们可以简单地编译我们的项目。
mvn clean install
这应该会自动生成我们使用 gRPC 所需的源代码。源代码将放置在
Protobuf class code: target/generatedsources/protobuf/java/com.tp.bookstore Protobuf gRPC code: target/generated-sources/protobuf/grpcjava/com.tp.bookstore
设置 gRPC 服务器
现在我们已经定义了包含函数定义的 proto 文件,让我们设置一个可以调用这些函数的服务器。
让我们编写我们的服务器代码来服务于上述函数,并将其保存在 **com.tp.bookstore.BookeStoreServerBothStreaming.java** 中 -
示例
package com.tp.bookstore; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import java.util.stream.Collectors; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; import com.tp.bookstore.BookStoreOuterClass.Cart; public class BookeStoreServerBothStreaming { private static final Logger logger =Logger.getLogger(BookeStoreServerBothStreaming.class.getName()); static Map<String, Book> bookMap = new HashMap<>(); static { bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby") .setAuthor("Scott Fitzgerald") .setPrice(300).build()); bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird") .setAuthor("Harper Lee") .setPrice(400).build()); bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India") .setAuthor("E.M.Forster") .setPrice(500).build()); bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise") .setAuthor("Scott Fitzgerald") .setPrice(600).build()); bookMap.put("Go Set a Watchman",Book.newBuilder().setName("Go Set a Watchman") .setAuthor("Harper Lee") .setPrice(700).build()); } private Server server; private void start() throws IOException { int port = 50051; server = ServerBuilder.forPort(port) .addService(new BookStoreImpl()).build().start(); logger.info("Server started, listening on " + port); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { System.err.println("Shutting down gRPC server"); try { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(System.err); } } }); } public static void main(String[] args) throws IOException,InterruptedException { final BookeStoreServerBothStreaming greetServer = newBookeStoreServerBothStreaming(); greetServer.start(); greetServer.server.awaitTermination(); } static class BookStoreImpl extendsBookStoreGrpc.BookStoreImplBase { @Override public StreamObserver<Book>liveCartValue(StreamObserver<Cart> responseObserver) { return new StreamObserver<Book>() { ArrayList<Book> bookCart = new ArrayList<Book>(); int cartValue = 0; @Override public void onNext(Book book) { logger.info("Searching for book with titlestarting with: " + book.getName()); for (Entry<String, Book> bookEntry :bookMap.entrySet()) { if(bookEntry.getValue().getName().startsWith(book.getName())){ logger.info("Found book, adding tocart:...."); bookCart.add(bookEntry.getValue()); cartValue +=bookEntry.getValue().getPrice(); } } logger.info("Updating cart value..."); responseObserver.onNext(Cart.newBuilder() .setPrice(cartValue) .setBooks(bookCart.size()).build()); } @Override public void onError(Throwable t) { logger.info("Error while reading book stream: " + t); } @Override public void onCompleted() { logger.info("Order completed"); responseObserver.onCompleted(); } }; } } }
上述代码在指定的端口上启动一个 gRPC 服务器,并为我们在proto文件中编写的函数和服务提供服务。让我们逐步了解上面的代码:
从 **main** 方法开始,我们在指定的端口上创建一个 gRPC 服务器。
但在启动服务器之前,我们将要运行的服务分配给服务器,即在我们的例子中为BookStore服务。
为此,我们需要将服务实例传递给服务器,因此我们继续创建服务实例,即在我们的例子中是 **BookStoreImpl**
服务实例需要提供 **proto** 文件中存在的 method/function 的实现,即在我们的例子中,是 **totalCartValue** 方法。
现在,鉴于这是服务器和客户端流的情况,服务器将在客户端添加书籍时获得一个 **Books** 列表(在 **proto** 文件中定义)。因此,服务器返回一个自定义流观察器。此流观察器实现了当找到新 Book 时以及当流关闭时会发生什么。
当客户端添加一个 Book 时,gRPC 框架将调用 **onNext()** 方法。此时,服务器将其添加到购物车中并使用响应观察器返回购物车价值。在流式传输的情况下,服务器不会等待所有有效的书籍可用。
当客户端完成添加书籍时,将调用流观察器的 **onCompleted()** 方法。此方法实现了服务器在客户端完成添加 **Books** 时想要执行的操作,即声明它已完成接收客户端订单。
最后,我们还有一个关闭钩子,以确保在我们完成代码执行时服务器能够干净地关闭。
设置 gRPC 客户端
现在我们已经编写了服务器的代码,让我们设置一个可以调用这些函数的客户端。
让我们编写我们的客户端代码来调用上述函数,并将其保存在 **com.tp.bookstore.BookStoreClientBothStreaming.java** 中 -
示例
package com.tp.bookstore; import io.grpc.Channel; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import java.util.Iterator; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import com.tp.bookstore.BookStoreGrpc.BookStoreFutureStub; import com.tp.bookstore.BookStoreGrpc.BookStoreStub; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; import com.tp.bookstore.BookStoreOuterClass.Cart; import com.tp.greeting.GreeterGrpc; import com.tp.greeting.Greeting.ServerOutput; import com.tp.greeting.Greeting.ClientInput; public class BookStoreClientBothStreaming { private static final Logger logger = Logger.getLogger(BookStoreClientBothStreaming.class.getName()); private final BookStoreStub stub; private boolean serverIntermediateResponseCompleted = true; private boolean serverResponseCompleted = false; StreamObserver<Book> streamClientSender; public BookStoreClientBothStreaming(Channel channel) { stub = BookStoreGrpc.newStub(channel); } public StreamObserver>Cart< getServerResponseObserver(){ StreamObserver>Cart< observer = new StreamObserver<Cart>(){ @Override public void onNext(Cart cart) { logger.info("Order summary:" + "\nTotal number of Books:" + cart.getBooks()+ "\nTotal Order Value:" cart.getPrice()); serverIntermediateResponseCompleted = true; } @Override public void onError(Throwable t) { logger.info("Error while reading response fromServer: " + t); } @Override public void onCompleted() { //logger.info("Server: Done reading orderreading cart"); serverResponseCompleted = true; } }; return observer; } public void addBook(String book) { logger.info("Adding book with title starting with: " + book); Book request = Book.newBuilder().setName(book).build(); if(streamClientSender == null) { streamClientSender =stub.liveCartValue(getServerResponseObserver()); } try { streamClientSender.onNext(request); } catch (StatusRuntimeException e) { logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); } } public void completeOrder() { logger.info("Done, waiting for server to create ordersummary..."); if(streamClientSender != null); { streamClientSender.onCompleted(); } } public static void main(String[] args) throws Exception { String serverAddress = "localhost:50051"; ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress) .usePlaintext() .build(); try { BookStoreClientBothStreaming client = new BookStoreClientBothStreaming(channel); String bookName = ""; while(true) { if(client.serverIntermediateResponseCompleted ==true) { System.out.println("Type book name to beadded to the cart...."); bookName = System.console().readLine(); if(bookName.equals("EXIT")) { client.completeOrder(); break; } client.serverIntermediateResponseCompleted = false; client.addBook(bookName); Thread.sleep(500); } } while(client.serverResponseCompleted == false) { Thread.sleep(2000); } } finally { channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } } }
以上代码启动一个 gRPC 客户端并连接到指定端口上的服务器,并调用我们在 **proto** 文件中编写的函数和服务。让我们一起浏览以上代码 -
从 **main** 方法开始,我们接受要添加到购物车的书籍名称。一旦所有书籍都将要添加,用户预计会打印“EXIT”。
我们为与服务器的 gRPC 通信设置了一个 Channel。
接下来,我们使用通道创建一个非阻塞存根。在这里,我们选择计划调用其函数的服务 **"BookStore"**。
然后,我们简单地创建 **.proto** 文件中定义的预期输入,即在我们的例子中,是 **Book**,并添加我们希望服务器添加的标题。
但是鉴于这是服务器和客户端流的情况,我们首先为服务器创建一个 **流观察器**。此服务器流观察器列出了服务器响应时需要执行的操作的行为,即 **onNext()** 和 **onCompleted()**。
并且使用存根,我们也获得了客户端流观察器。我们使用此流观察器发送要添加到购物车的书籍数据。
并且一旦我们的订单完成,我们确保客户端流观察器已关闭。这告诉服务器关闭流并执行清理。
最后,我们关闭 Channel 以避免任何资源泄漏。
因此,这就是我们的客户端代码。
客户端服务器调用
总而言之,我们要做的是:
启动 gRPC 服务器。
客户端通过向服务器通知它们来添加书籍流。
服务器在其商店中搜索书籍并将其添加到购物车。
每次添加书籍时,服务器都会告诉客户端购物车价值。
当客户端完成订购时,服务器和客户端都会关闭流。
现在我们已经定义了我们的 **proto** 文件,编写了我们的服务器和客户端代码,让我们现在执行此代码并查看实际情况。
要运行代码,请启动两个 shell。通过执行以下命令在第一个 shell 中启动服务器 -
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerClientStreaming
我们将看到以下输出 -
输出
Jul 03, 2021 10:37:21 PM com.tp.bookstore.BookeStoreServerStreaming start INFO: Server started, listening on 50051
此输出表示服务器已启动。
现在,让我们启动客户端。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientBothStreaming
让我们向我们的客户端添加一本书。
Jul 24, 2021 7:21:45 PM com.tp.bookstore.BookStoreClientBothStreaming main Type book name to be added to the cart.... Great Jul 24, 2021 7:21:48 PM com.tp.bookstore.BookStoreClientBothStreaming addBook INFO: Adding book with title starting with: Gr Jul 24, 2021 7:21:48 PM com.tp.bookstore.BookStoreClientBothStreaming$1 onNext INFO: Order summary: Total number of Books: 1 Total Order Value: 300
因此,正如我们所看到的,我们得到了订单的当前购物车价值。现在让我们向我们的客户端再添加一本书。
Type book name to be added to the cart.... Passage Jul 24, 2021 7:21:51 PM com.tp.bookstore.BookStoreClientBothStreaming addBook INFO: Adding book with title starting with: Pa Jul 24, 2021 7:21:51 PM com.tp.bookstore.BookStoreClientBothStreaming$1 onNext INFO: Order summary: Total number of Books: 2 Total Order Value: 800
一旦我们添加了书籍并输入“EXIT”,客户端将关闭。
Type book name to be added to the cart.... EXIT Jul 24, 2021 7:21:59 PM com.tp.bookstore.BookStoreClientBothStreaming completeOrder INFO: Done, waiting for server to create order summary...
因此,正如我们所看到的,客户端能够添加书籍。并且随着书籍的添加,服务器将响应当前购物车价值。
gRPC - 客户端调用
gRPC 客户端支持两种类型的客户端调用,即客户端如何调用服务器。以下是两种方式 -
阻塞式客户端调用
异步客户端调用
在本章中,我们将逐一查看它们。
阻塞式客户端调用
gRPC 支持阻塞式客户端调用。这意味着,一旦客户端对服务进行调用,客户端将不会继续执行其余代码,直到它从服务器收到响应。请注意,对于单向调用和服务器流式调用,阻塞式客户端调用是可能的。
请注意,对于单向调用和服务器流式调用,阻塞式客户端调用是可能的。
这是一个单向阻塞式客户端调用的示例。
示例
package com.tp.bookstore; import io.grpc.Channel; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.StatusRuntimeException; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; import com.tp.greeting.GreeterGrpc; import com.tp.greeting.Greeting.ServerOutput; import com.tp.greeting.Greeting.ClientInput; public class BookStoreClientUnaryBlocking { private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlocking.class.getName()); private final BookStoreGrpc.BookStoreBlockingStubblockingStub; public BookStoreClientUnaryBlocking(Channel channel) { blockingStub = BookStoreGrpc.newBlockingStub(channel); } public void getBook(String bookName) { logger.info("Querying for book with title: " + bookName); BookSearch request = BookSearch.newBuilder().setName(bookName).build(); Book response; try { response = blockingStub.first(request); } catch (StatusRuntimeException e) { logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); return; } logger.info("Got following book from server: " + response); } public static void main(String[] args) throws Exception { String bookName = args[0]; String serverAddress = "localhost:50051"; ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress) .usePlaintext() .build(); try { BookStoreClientUnaryBlocking client = new BookStoreClientUnaryBlocking(channel); client.getBook(bookName); } finally { channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } } }
在以上示例中,我们有,
public BookStoreClientUnaryBlocking(Channel channel) { blockingStub = BookStoreGrpc.newBlockingStub(channel); }
这意味着我们将使用阻塞式 RPC 调用。
然后,我们有,
BookSearch request = BookSearch.newBuilder().setName(bookName).build(); Book response; response = blockingStub.first(request);
在这里,我们使用 **blockingStub** 调用 RPC **method first()** 来获取书籍详细信息。
类似地,对于服务器流,我们可以使用阻塞式存根 -
logger.info("Querying for book with author: " + author); BookSearch request = BookSearch.newBuilder().setAuthor(author).build(); Iterator<Book> response; try { response = blockingStub.searchByAuthor(request); while(response.hasNext()) { logger.info("Found book: " + response.next()); }
在这里,我们调用 RPC 方法 **searchByAuthor** 方法,并迭代响应,直到服务器流结束。
非阻塞式客户端调用
gRPC 支持非阻塞式客户端调用。这意味着,当客户端对服务进行调用时,它不需要等待服务器响应。为了处理服务器响应,客户端可以简单地传递观察器,该观察器指示在收到响应时该做什么。请注意,对于单向调用和流式调用,非阻塞式客户端调用是可能的。但是,我们将专门关注服务器流式调用的情况,以将其与阻塞式调用进行比较。
请注意,对于单向调用和流式调用,非阻塞式客户端调用是可能的。但是,我们将专门关注服务器流式调用的情况,以将其与阻塞式调用进行比较。
这是一个服务器流式非阻塞式客户端调用的示例。
示例
package com.tp.bookstore; import io.grpc.Channel; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import java.util.Iterator; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; import com.tp.greeting.GreeterGrpc; import com.tp.greeting.Greeting.ServerOutput; import com.tp.greeting.Greeting.ClientInput; public class BookStoreClientServerStreamingNonBlocking { private static final Logger logger = Logger.getLogger(BookStoreClientServerStreamingNonBlocking.class.getName()); private final BookStoreGrpc.BookStoreStub nonBlockingStub; public BookStoreClientServerStreamingNonBlocking(Channelchannel) { nonBlockingStub = BookStoreGrpc.newStub(channel); } public StreamObserver<Book> getServerResponseObserver(){ StreamObserver<Book> observer = new StreamObserver<Book>(){ @Override public void onNext(Book book) { logger.info("Server returned following book: " +book); } @Override public void onError(Throwable t) { logger.info("Error while reading response fromServer: " + t); } @Override public void onCompleted() { logger.info("Server returned following book: " + book); } }; return observer; } public void getBook(String author) { logger.info("Querying for book with author: " + author); BookSearch request = BookSearch.newBuilder().setAuthor(author).build(); try { nonBlockingStub.searchByAuthor(request,getServerResponseObserver()); } catch (StatusRuntimeException e) { logger.log(Level.WARNING, "RPC failed: {0}",e.getStatus()); return; } } public static void main(String[] args) throws Exception { String authorName = args[0]; String serverAddress = "localhost:50051"; ManagedChannel channel =ManagedChannelBuilder.forTarget(serverAddress) .usePlaintext() .build(); try { BookStoreClientServerStreamingNonBlocking client = new BookStoreClientServerStreamingNonBlocking(channel); client.getBook(authorName); } finally { channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } } }
正如我们在以上示例中看到的,
public BookStoreClientUnaryNonBlocking(Channel channel) { nonBlockingStub = BookStoreGrpc.newStub(channel); }
它定义了存根是非阻塞式的。类似地,以下代码用于处理我们从服务器接收到的响应。一旦服务器发送响应,我们就记录输出。
public StreamObserver<Book> getServerResponseObserver(){ StreamObserver<Book> observer = new StreamObserver<Book>(){ .... .... return observer; }
以下 gRPC 调用是非阻塞式调用。
logger.info("Querying for book with author: " + author); BookSearch request = BookSearch.newBuilder().setAuthor(author).build(); try { nonBlockingStub.searchByAuthor(request, getServerResponseObserver()); }
这就是我们确保客户端不需要等到服务器完成 **searchByAuthor** 执行的方式。这将由流观察器对象在服务器返回 Book 对象时直接处理。
gRPC - 超时和取消
gRPC 支持为请求分配超时。这是一种执行请求取消的方法。它有助于避免为客户端和服务器使用资源,以执行其结果对客户端无用的请求。
请求超时
gRPC 支持为客户端和服务器指定超时。
客户端可以在运行时指定它希望等待多长时间,然后取消请求。
服务器也可以在其端检查请求是否需要处理,或者客户端是否已放弃请求。
让我们来看一个例子,客户端期望在2秒内收到响应,但服务器花费了更长的时间。所以,这是我们的服务器代码。
示例
package com.tp.bookstore; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import java.util.stream.Collectors; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; public class BookeStoreServerUnaryTimeout { private static final Logger logger = Logger.getLogger(BookeStoreServerUnaryTimeout.class.getName()); static Map<String, Book> bookMap = new HashMap<>(); static { bookMap.put("Great Gatsby",Book.newBuilder().setName("Great Gatsby") .setAuthor("Scott Fitzgerald") .setPrice(300).build()); bookMap.put("To Kill MockingBird",Book.newBuilder().setName("To Kill MockingBird") .setAuthor("Harper Lee") .setPrice(400).build()); bookMap.put("Passage to India",Book.newBuilder().setName("Passage to India") .setAuthor("E.M.Forster") .setPrice(500).build()); bookMap.put("The Side of Paradise",Book.newBuilder().setName("The Side of Paradise") .setAuthor("Scott Fitzgerald") .setPrice(600).build()); bookMap.put("Go Set a Watchman",Book.newBuilder().setName("Go Set a Watchman") .setAuthor("Harper Lee") .setPrice(700).build()); } private Server server; private void start() throws IOException { int port = 50051; server = ServerBuilder.forPort(port) .addService(new BookStoreImpl()).build().start(); logger.info("Server started, listening on " + port); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { System.err.println("Shutting down gRPC server"); try { server.shutdown().awaitTermination(30,TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(System.err); } } }); } public static void main(String[] args) throws IOException,InterruptedException { final BookeStoreServerUnaryTimeout greetServer = new BookeStoreServerUnaryTimeout(); greetServer.start(); greetServer.server.awaitTermination(); } static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase { @Override public void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) { logger.info("Searching for book with title: " +searchQuery.getName()); logger.info("This may take more time..."); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } List<String> matchingBookTitles = bookMap.keySet().stream() .filter(title -> title.startsWith(searchQuery.getName().trim())) .collect(Collectors.toList()); Book foundBook = null; if(matchingBookTitles.size() > 0) { foundBook = bookMap.get(matchingBookTitles.get(0)); } responseObserver.onNext(foundBook); responseObserver.onCompleted(); } } }
在上面的代码中,服务器搜索客户端提供的书籍标题。我们添加了一个模拟休眠,以便我们可以看到请求超时。
这是我们的客户端代码
示例
package com.tp.bookstore; import io.grpc.Channel; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.StatusRuntimeException; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; import com.tp.greeting.GreeterGrpc; import com.tp.greeting.Greeting.ServerOutput; import com.tp.greeting.Greeting.ClientInput; public class BookStoreClientUnaryBlockingTimeout { private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlockingTimeout.class.getName()); private final BookStoreGrpc.BookStoreBlockingStubblockingStub; public BookStoreClientUnaryBlockingTimeout(Channel channel){ blockingStub = BookStoreGrpc.newBlockingStub(channel); } public void getBook(String bookName) { logger.info("Querying for book with title: " + bookName); BookSearch request =BookSearch.newBuilder().setName(bookName).build(); Book response; try { response = blockingStub.withDeadlineAfter(2,TimeUnit.SECONDS).first(request); } catch (StatusRuntimeException e) { logger.log(Level.WARNING, "RPC failed: {0}",e.getStatus()); return; } logger.info("Got following book from server: " +response); } public static void main(String[] args) throws Exception { String bookName = args[0]; String serverAddress = "localhost:50051"; ManagedChannel channel =ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().build(); try { BookStoreClientUnaryBlockingTimeout client = newBookStoreClientUnaryBlockingTimeout(channel); client.getBook(bookName); } finally { channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } } }
上面的代码使用标题调用服务器进行搜索。但更重要的是,它为gRPC调用提供了2秒的超时时间。
现在让我们看看它的运行情况。要运行代码,请启动两个shell。在第一个shell上执行以下命令启动服务器:
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerUnaryTimeout
我们将看到以下输出 -
输出
Jul 31, 2021 12:29:31 PM com.tp.bookstore.BookeStoreServerUnaryTimeout start INFO: Server started, listening on 50051
以上输出表明服务器已启动。
Jul 31, 2021 12:29:35 PM com.tp.bookstore.BookeStoreServerUnaryTimeout$BookStoreImpl first INFO: Searching for book with title: Great Jul 31, 2021 12:29:35 PM com.tp.bookstore.BookeStoreServerUnaryTimeout$BookStoreImpl first INFO: This may take more time...
现在,让我们启动客户端。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientUnaryBlockingTimeout Great
我们将得到以下输出:
输出
Jul 31, 2021 12:29:34 PM com.tp.bookstore.BookStoreClientUnaryBlockingTimeout getBook INFO: Querying for book with title: Great Jul 31, 2021 12:29:36 PM com.tp.bookstore.BookStoreClientUnaryBlockingTimeout getBook WARNING: RPC failed: Status{code=DEADLINE_EXCEEDED, description=deadline exceeded after 1.970455800s. [buffered_nanos=816522700, remote_addr=localhost/127.0.0.1:50051], cause=null}
因此,正如我们所看到的,客户端在2秒内没有收到响应,因此它取消了请求并将其称为超时,即DEADLINE_EXCEEDED
请求取消
gRPC支持客户端和服务器端都取消请求。客户端可以在运行时指定它希望在取消请求之前等待的时间。服务器也可以在其端检查请求是否需要处理,或者客户端是否已经放弃了请求。
让我们看一个客户端流式传输的例子,其中客户端调用取消。所以,这是我们的服务器代码
示例
package com.tp.bookstore; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import java.util.stream.Collectors; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; import com.tp.bookstore.BookStoreOuterClass.Cart; public class BookeStoreServerClientStreaming { private static final Logger logger = Logger.getLogger(BookeStoreServerClientStreaming.class.getName()); static Map<String, Book> bookMap = new HashMap<>(); static { bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby").setAuthor("Scott Fitzgerald").setPrice(300).build()); bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird").setAuthor("Harper Lee") .setPrice(400).build()); bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India").setAuthor("E.M.Forster") .setPrice(500).build()); bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise").setAuthor("Scott Fitzgerald") .setPrice(600).build()); bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman").setAuthor("Harper Lee") .setPrice(700).build()); } private Server server; private void start() throws IOException { int port = 50051; server = ServerBuilder.forPort(port).addService(new BookStoreImpl()).build().start(); logger.info("Server started, listening on " + port); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { System.err.println("Shutting down gRPC server"); try { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(System.err); } } }); } public static void main(String[] args) throws IOException,InterruptedException { final BookeStoreServerClientStreaming greetServer = newBookeStoreServerClientStreaming(); greetServer.start(); greetServer.server.awaitTermination(); } static class BookStoreImpl extendsBookStoreGrpc.BookStoreImplBase { @Override public StreamObserver<Book> totalCartValue(StreamObserver<Cart> responseObserver) { return new StreamObserver<Book>() { ArrayList<Book> bookCart = new ArrayList<Book>(); @Override public void onNext(Book book) { logger.info("Searching for book with titlestarting with: " + book.getName()); for (Entry<String, Book> bookEntry : bookMap.entrySet()) { if(bookEntry.getValue().getName().startsWith(book.getName())){ logger.info("Found book, adding to cart:...."); bookCart.add(bookEntry.getValue()); } } } @Override public void onError(Throwable t) { logger.info("Error while reading book stream:" + t); } @Override public void onCompleted() { int cartValue = 0; for (Book book : bookCart) { cartValue += book.getPrice(); } responseObserver.onNext(Cart.newBuilder().setPrice(cartValue).setBooks(bookCart.size()).build()); responseObserver.onCompleted(); } }; } } }
此服务器代码是客户端流式传输的一个简单示例。服务器简单地跟踪客户端想要的书籍,最后,它提供订单的总购物车价值。
但是这里关于请求取消没有什么特别之处,因为这是客户端将调用的。所以,让我们看看客户端代码。
package com.tp.bookstore; import io.grpc.Channel; import io.grpc.Context; import io.grpc.Context.CancellableContext; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import com.tp.bookstore.BookStoreGrpc.BookStoreStub; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.Cart; public class BookStoreClientStreamingClientCancelation { private static final Logger logger = Logger.getLogger(BookStoreClientStreamingClientCancelation.class.getName()); private final BookStoreStub stub; StreamObserver<Book> streamClientSender; private CancellableContext withCancellation; public BookStoreClientStreamingClientCancelation(Channel channel) { stub = BookStoreGrpc.newStub(channel); } public StreamObserver<Cart> getServerResponseObserver(){ StreamObserver<Cart> observer = new StreamObserver>Cart<(){ @Override public void onNext(Cart cart) { logger.info("Order summary:" + "\nTotal number of Books: " + cart.getBooks() + "\nTotal Order Value: " + cart.getPrice()); } @Override public void onError(Throwable t) { logger.info("Error while reading response from Server: " + t); } @Override public void onCompleted() { } }; return observer; } public void addBook(String book) { logger.info("Adding book with title starting with: " + book); Book request = Book.newBuilder().setName(book).build(); if(streamClientSender == null) { withCancellation = Context.current().withCancellation(); streamClientSender = stub.totalCartValue(getServerResponseObserver()); } try { streamClientSender.onNext(request); } catch (StatusRuntimeException e) { logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); } } public void completeOrder() { logger.info("Done, waiting for server to create order summary..."); if(streamClientSender != null); streamClientSender.onCompleted(); } public void cancelOrder() { withCancellation.cancel(null); } public static void main(String[] args) throws Exception { String serverAddress = "localhost:50051"; ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().build(); try { BookStoreClientStreamingClientCancelation client = new BookStoreClientStreamingClientCancelation(channel); String bookName = ""; while(true) { System.out.println("Type book name to be added to the cart...."); bookName = System.console().readLine(); if(bookName.equals("EXIT")) { client.completeOrder(); break; } if(bookName.equals("CANCEL")) { client.cancelOrder(); break; } client.addBook(bookName); } } finally { channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } } }
所以,如果我们看到上面的代码,以下行定义了一个启用了取消的上下文。
withCancellation = Context.current().withCancellation();
这是当用户输入取消时将调用的方法。这将取消订单,并让服务器知道。
public void cancelOrder() { withCancellation.cancel(null); }
现在让我们看看它的运行情况。要运行代码,请启动两个shell。在第一个shell上执行以下命令启动服务器。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerClientStreaming
我们将看到以下输出:
输出
Jul 31, 2021 3:29:58 PM com.tp.bookstore.BookeStoreServerClientStreaming start INFO: Server started, listening on 50051
上述输出意味着服务器已启动。
现在,让我们启动客户端
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientStreamingClientCancelation
我们将得到以下输出:
输出
Type book name to be added to the cart.... Great Jul 31, 2021 3:30:55 PM com.tp.bookstore.BookStoreClientStreamingClientCancelation addBook INFO: Adding book with title starting with: Great Type book name to be added to the cart.... CANCEL Jul 31, 2021 3:30:58 PM com.tp.bookstore.BookStoreClientStreamingClientCancelation$1 onError INFO: Error while reading response from Server: io.grpc.StatusRuntimeException: UNAVAILABLE: Channel shutdownNow invoked
我们将在服务器日志中获得以下数据:
INFO: Searching for book with title starting with: Great Jul 31, 2021 3:30:56 PM com.tp.bookstore.BookeStoreServerClientStreaming$BookStoreImp l$1 onNext INFO: Found book, adding to cart:.... Jul 31, 2021 3:30:58 PM com.tp.bookstore.BookeStoreServerClientStreaming$BookStoreImp l$1 onError INFO: Error while reading book stream: io.grpc.StatusRuntimeException: CANCELLED: client cancelled
因此,正如我们所看到的,客户端启动了对其发送到服务器的请求的取消。服务器也收到了关于取消的通知。
gRPC - 发送/接收元数据
gRPC支持发送元数据。元数据基本上是我们想要发送的一组数据,这些数据不是业务逻辑的一部分,同时进行gRPC调用。
让我们看看以下两种情况:
- 客户端发送元数据,服务器读取它。
- 服务器发送元数据,客户端读取它。
我们将逐一介绍这两种情况。
客户端发送元数据
如前所述,gRPC支持客户端发送服务器可以读取的元数据。gRPC支持扩展客户端和服务器拦截器,这些拦截器可分别用于写入和读取元数据。让我们举一个例子来更好地理解它。这是我们的客户端代码,它发送主机名作为元数据:
让我们举一个例子来更好地理解它。这是我们的客户端代码,它发送主机名作为元数据:
示例
package com.tp.bookstore; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; import io.grpc.ForwardingClientCall; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.StatusRuntimeException; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import static io.grpc.Metadata.ASCII_STRING_MARSHALLER; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; public class BookStoreClientUnaryBlockingMetadata { private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlockingMetadata.class.getName()); private final BookStoreGrpc.BookStoreBlockingStub blockingStub; public BookStoreClientUnaryBlockingMetadata(Channel channel) { blockingStub = BookStoreGrpc.newBlockingStub(channel); } static class BookClientInterceptor implements ClientInterceptor { @Override public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next ) { return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) { @Override public void start(Listener<RespT> responseListener, Metadata headers) { logger.info("Added metadata"); headers.put(Metadata.Key.of("HOSTNAME", ASCII_STRING_MARSHALLER), "MY_HOST"); super.start(responseListener, headers); } }; } } public void getBook(String bookName) { logger.info("Querying for book with title: " + bookName); BookSearch request = BookSearch.newBuilder().setName(bookName).build(); Book response; CallOptions.Key<String> metaDataKey = CallOptions.Key.create("my_key"); try { response = blockingStub.withOption(metaDataKey, "bar").first(request); } catch (StatusRuntimeException e) { logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); return; } logger.info("Got following book from server: " + response); } public static void main(String[] args) throws Exception { String bookName = args[0]; String serverAddress = "localhost:50051"; ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().intercept(new BookClientInterceptor()).build(); try { BookStoreClientUnaryBlockingMetadata client = new BookStoreClientUnaryBlockingMetadata(channel); client.getBook(bookName); } finally { channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } } }
这里有趣的部分是拦截器。
static class BookClientInterceptor implements ClientInterceptor{ @Override public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) { @Override public void start(Listener<RespT>responseListener, Metadata headers) { logger.info("Added metadata"); headers.put(Metadata.Key.of("HOSTNAME", ASCII_STRING_MARSHALLER), "MY_HOST"); super.start(responseListener, headers); } }; } }
我们拦截客户端发出的任何调用,然后在进一步调用之前向其添加主机名元数据。
服务器读取元数据
现在,让我们看看读取此元数据的服务器代码:
package com.tp.bookstore; import io.grpc.CallOptions; import io.grpc.Context; import io.grpc.Metadata; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.ServerCall; import io.grpc.ServerCall.Listener; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import java.util.stream.Collectors; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; public class BookeStoreServerMetadata { private static final Logger logger = Logger.getLogger(BookeStoreServerMetadata.class.getName()); static Map<String, Book> bookMap = new HashMap<>(); static { bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby").setAuthor("Scott Fitzgerald").setPrice(300).build()); } private Server server; class BookServerInterceptor implements ServerInterceptor{ @Override public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) { logger.info("Recieved following metadata: " + headers); return next.startCall(call, headers); } } private void start() throws IOException { int port = 50051; server = ServerBuilder.forPort(port).addService(new BookStoreImpl()).intercept(new BookServerInterceptor()).build().start(); logger.info("Server started, listening on " + port); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { System.err.println("Shutting down gRPC server"); try { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(System.err); } } }); } public static void main(String[] args) throws IOException, InterruptedException { final BookeStoreServerMetadata greetServer = new BookeStoreServerMetadata(); greetServer.start(); greetServer.server.awaitTermination(); } static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase { @Override public void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) { logger.info("Searching for book with title: " + searchQuery.getName()); List<String> matchingBookTitles = bookMap.keySet().stream().filter(title ->title.startsWith(searchQuery.getName().trim())).collect(Collectors.toList()); Book foundBook = null; if(matchingBookTitles.size() > 0) { foundBook = bookMap.get(matchingBookTitles.get(0)); } responseObserver.onNext(foundBook); responseObserver.onCompleted(); } } }
同样,这里有趣的部分是拦截器。
class BookServerInterceptor implements ServerInterceptor{ @Override public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,ServerCallHandler<ReqT, RespT> next) { logger.info("Recieved following metadata: " + headers); return next.startCall(call, headers); } }
我们拦截任何传入服务器的调用,然后在实际方法处理调用之前记录元数据。
客户端-服务器调用
现在让我们看看它的运行情况。要运行代码,请启动两个shell。在第一个shell上执行以下命令启动服务器:
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerMetadata
我们将看到以下输出 -
输出
Jul 31, 2021 5:29:14 PM com.tp.bookstore.BookeStoreServerMetadata start INFO: Server started, listening on 50051
上述输出意味着服务器已启动。
现在,让我们启动客户端。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientUnaryBlockingMetadata Great
我们将看到以下输出 -
输出
Jul 31, 2021 5:29:39 PM com.tp.bookstore.BookStoreClientUnaryBlockingMetadata getBook INFO: Querying for book with title: Great Jul 31, 2021 5:29:39 PM com.tp.bookstore.BookStoreClientUnaryBlockingMetadata$BookCli entInterceptor$1 start INFO: Added metadata Jul 31, 2021 5:29:40 PM com.tp.bookstore.BookStoreClientUnaryBlockingMetadata getBook INFO: Got following book from server: name: "Great Gatsby" author: "Scott Fitzgerald" price: 300
我们将在服务器日志中获得以下数据:
Jul 31, 2021 5:29:40 PM com.tp.bookstore.BookeStoreServerMetadata$BookServerIntercept or interceptCall INFO: Recieved following metadata: Metadata(content-type=application/grpc,user-agent=grpc-java-netty/1.38.0,hostname=MY_HOST,grpc-accept-encoding=gzip) Jul 31, 2021 5:29:40 PM com.tp.bookstore.BookeStoreServerMetadata$BookStoreImpl first INFO: Searching for book with title: Great
正如我们所看到的,服务器能够读取元数据:hostname=MY_HOST,该元数据由客户端添加。