gRPC - 快速指南
gRPC - 简介
在深入了解 gRPC 之前,让我们先简单了解一下远程过程调用,也就是 gRPC 所做的事情。
什么是远程过程调用?
远程过程调用是看起来像普通/本地函数调用一样的函数调用,但不同之处在于远程函数调用通常在不同的机器上执行。但是,对于编写代码的开发人员来说,函数调用和远程调用之间的区别很小。这些调用通常遵循客户端-服务器模型,其中执行调用的机器充当服务器。
为什么我们需要远程过程调用?
远程过程调用提供了一种在另一台机器上执行代码的方法。在大型、庞大的产品中,这变得至关重要,因为单台机器无法承载产品正常运行所需的所有代码。
在微服务架构中,应用程序被分解成小的服务,这些服务通过消息队列和 API 相互通信。所有这些通信都发生在网络上,不同的机器/节点根据它们所承载的服务提供不同的功能。因此,在分布式环境中工作时,创建远程过程调用成为一个关键方面。
为什么选择 gRPC?
Google 远程过程调用 (gRPC) 提供了一个执行远程过程调用的框架。但是,还有一些其他库和机制可以在远程机器上执行代码。那么,是什么让 gRPC 如此特别呢?让我们来了解一下。
语言无关性 - gRPC 在内部使用 Google Protocol Buffer。因此,可以使用多种语言,例如 Java、Python、Go、Dart 等。Java 客户端可以进行过程调用,而使用 Python 的服务器可以做出响应,从而有效地实现语言无关性。
高效的数据压缩 - 在微服务环境中,考虑到网络上会发生多次通信,因此我们发送的数据尽可能简洁至关重要。我们需要避免任何多余的数据,以确保数据快速传输。鉴于 gRPC 在内部使用 Google Protocol Buffer,因此它可以利用此功能。
高效的序列化和反序列化 - 在微服务环境中,考虑到网络上会发生多次通信,因此我们尽可能快速地序列化和反序列化数据至关重要。鉴于 gRPC 在内部使用 Google Protocol Buffer,因此它可以确保快速序列化和反序列化数据。
易于使用 - gRPC 已经拥有一个库和插件,可以自动生成过程代码(我们将在后续章节中看到)。对于简单的用例,它可以像本地函数调用一样使用。
gRPC 与使用 JSON 的 REST 的比较
让我们看看其他通过网络传输数据的方式与 Protobuf 相比如何。
| 功能 | gRPC | 使用 JSON/XML 的 HTTP |
|---|---|---|
| 语言无关性 | 是 | 是 |
| HTTP 版本 | HTTP/2 | HTTP 1.1 |
| 指定域模式 | .proto 文件(Google Protocol Buffer) | 无 |
| 序列化数据大小 | 最小 | 高(XML 更高) |
| 人类可读性 | 否,因为它使用单独的编码模式 | 是,因为它使用基于文本的格式 |
| 序列化速度 | 最快 | 较慢(XML 最慢) |
| 数据类型支持 | 更丰富。支持复杂数据类型,如 Any、oneof 等。 | 支持基本数据类型 |
| 支持演变模式 | 是 | 否 |
gRPC - 设置
Protoc 设置
请注意,此设置仅适用于 Python。对于 Java,所有这些都由 Maven 文件处理。让我们安装“proto”二进制文件,我们将使用它来自动生成“.proto”文件的代码。二进制文件可以在 https://github.com/protocolbuffers/protobuf/releases/ 中找到。根据操作系统选择正确的二进制文件。我们将在 Windows 上安装 proto 二进制文件,但 Linux 的步骤差别不大。
安装完成后,确保您可以通过命令行访问它 -
protoc --version libprotoc 3.15.6
这意味着 Protobuf 已正确安装。现在,让我们转到项目结构。
我们还需要设置 gRPC 代码生成所需的插件。
对于 Python,我们需要执行以下命令 -
python -m pip install grpcio python -m pip install grpcio-tools
它将安装所有必需的二进制文件并将它们添加到路径中。
项目结构
以下是我们将拥有的整体项目结构 -
与各个语言相关的代码进入各自的目录。我们将有一个单独的目录来存储我们的 proto 文件。并且,以下是我们将用于 Java 的项目结构 -
项目依赖项
现在我们已经安装了protoc,我们可以使用protoc从 proto 文件自动生成代码。让我们先创建一个 Java 项目。
以下是我们将用于 Java 项目的 Maven 配置。请注意,它还包含 Protobuf 所需的库。
示例
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.grpc.point</groupId>
<artifactId>grpc-point</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.38.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.38.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.38.0</version>
</dependency>
<dependency> <!-- necessary for Java 9+ -->
<groupId>org.apache.tomcat</groupId>
<artifactId>annotations-api</artifactId>
<version>6.0.53</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4jsimple-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.2</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.1</version>
<executions>
<execution>
<id>test</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${basedir}/target/generated-sources</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.12.0:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpcjava:1.38.0:exe:${os.detected.classifier}</pluginArtifact>
<protoSourceRoot>../common_proto_files</protoSourceRoot>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<configuration>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
gRPC - 使用 Java 的 Hello World 应用
现在让我们创建一个基本的类似“Hello World”的应用程序,它将使用 gRPC 和 Java。
.proto 文件
首先让我们在common_proto_files中定义“greeting.proto”文件 -
syntax = "proto3";
option java_package = "com.tp.greeting";
service Greeter {
rpc greet (ClientInput) returns (ServerOutput) {}
}
message ClientInput {
string greeting = 1;
string name = 2;
}
message ServerOutput {
string message = 1;
}
现在让我们仔细看看这个代码块,并了解每一行的作用 -
syntax = "proto3";
此处的“syntax”表示我们正在使用的 Protobuf 版本。因此,我们使用最新版本 3,因此模式可以使用所有对版本 3 有效的语法。
package tutorial;
此处的包用于冲突解决,例如,如果我们有多个名称相同的类/成员。
option java_package = "com.tp.greeting";
此参数特定于 Java,即从“.proto”文件自动生成代码的包。
service Greeter {
rpc greet(ClientInput) returns (ServerOutput) {}
}
此块表示服务“Greeter”的名称和可以调用的函数名称“greet”。“greet”函数接收类型为“ClientInput”的输入并返回类型为“ServerOutput”的输出。现在让我们看看这些类型。
message ClientInput {
string greeting = 1;
string name = 2;
}
在上面的代码块中,我们定义了ClientInput,它包含两个属性“greeting”和“name”,它们都是字符串。客户端应该将类型为“ClientInput”的对象发送到服务器。
message ServerOutput {
string message = 1;
}
在上面的代码块中,我们定义了,给定一个“ClientInput”,服务器将返回“ServerOutput”,它将包含一个名为“message”的属性。服务器应该将类型为“ServerOutput”的对象发送到客户端。
请注意,我们已经完成了 Maven 设置以自动生成我们的类文件以及我们的 RPC 代码。因此,现在我们可以简单地编译我们的项目 -
mvn clean install
这应该会自动生成我们使用 gRPC 所需的源代码。源代码将放置在 -
Protobuf class code: target/generated-sources/protobuf/java/com.tp.greeting Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.greeting
设置 gRPC 服务器
现在我们已经定义了包含函数定义的 proto 文件,让我们设置一个可以服务于调用这些函数的服务器。
让我们编写服务器代码来服务于上述函数,并将其保存在com.tp.grpc.GreetServer.java中 -
示例
package com.tp.grpc;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ClientInput;
import com.tp.greeting.Greeting.ServerOutput;
public class GreetServer {
private static final Logger logger = Logger.getLogger(GreetServer.class.getName());
private Server server;
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port).addService(new GreeterImpl()).build().start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
@Override
public void greet(ClientInput req, StreamObserver<ServerOutput> responseObserver) {
logger.info("Got request from client: " + req);
ServerOutput reply = ServerOutput.newBuilder().setMessage(
"Server says " + "\"" + req.getGreeting() + " " + req.getName() + "\""
).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
final GreetServer greetServer = new GreetServer();
greetServer.start();
greetServer.server.awaitTermination();
}
}
上面的代码在指定的端口启动一个 gRPC 服务器,并服务于我们在 proto 文件中编写的函数和服务。让我们遍历上面的代码 -
从main方法开始,我们在指定的端口创建一个 gRPC 服务器。
但在启动服务器之前,我们将服务器分配给我们要运行的服务,即在我们的例子中,是Greeter服务。
为此,我们需要将服务实例传递给服务器,因此我们继续创建服务实例,即在我们的例子中,是GreeterImpl。
服务实例需要提供“.proto”文件中存在的函数/方法的实现,即在我们的例子中,是greet方法。
该方法期望一个类型与“.proto”文件中定义的类型相同的对象,即在我们的例子中,是ClientInput。
该方法处理上述输入,执行计算,然后应该在“.proto”文件中返回提到的输出,即在我们的例子中,是ServerOutput。
最后,我们还有一个shutdown钩子,以确保在完成代码执行后服务器能够干净地关闭。
设置 gRPC 客户端
现在我们已经编写了服务器的代码,让我们设置一个可以调用这些函数的客户端。
让我们编写客户端代码来调用上述函数,并将其保存在com.tp.grpc.GreetClient.java中 -
示例
package com.tp.grpc;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;
public class GreetClient {
private static final Logger logger = Logger.getLogger(GreetClient.class.getName());
private final GreeterGrpc.GreeterBlockingStub blockingStub;
public GreetClient(Channel channel) {
blockingStub = GreeterGrpc.newBlockingStub(channel);
}
public void makeGreeting(String greeting, String username) {
logger.info("Sending greeting to server: " + greeting + " for name: " + username);
ClientInput request = ClientInput.newBuilder().setName(username).setGreeting(greeting).build();
logger.info("Sending to server: " + request);
ServerOutput response;
try {
response = blockingStub.greet(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
logger.info("Got following from the server: " + response.getMessage());
}
public static void main(String[] args) throws Exception {
String greeting = args[0];
String username = args[1];
String serverAddress = "localhost:50051";
ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
.usePlaintext()
.build();
try {
GreetClient client = new GreetClient(channel);
client.makeGreeting(greeting, username);
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
上面的代码在指定的端口启动一个 gRPC 服务器,并服务于我们在 proto 文件中编写的函数和服务。让我们遍历上面的代码 -
从main方法开始,我们接受两个参数,即name和greeting。
我们为 gRPC 与服务器的通信设置了一个 Channel。
接下来,我们使用创建的 Channel 创建一个阻塞式存根。在这里,我们拥有服务“Greeter”,我们计划调用它的函数。存根只不过是一个包装器,它隐藏了远程调用的复杂性,使调用者无需关心。
然后,我们简单地创建“.proto”文件中定义的预期输入,即在我们的例子中,是ClientInput。
我们最终进行调用并等待服务器的响应。
最后,我们关闭 Channel 以避免任何资源泄漏。
因此,这就是我们的客户端代码。
客户端服务器调用
现在,我们已经定义了proto文件,编写了服务器和客户端代码,让我们继续执行此代码并查看实际效果。
要运行代码,请启动两个 shell。通过执行以下命令在第一个 shell 中启动服务器 -
java -cp .\target\grpc-point-1.0.jar com.tp.grpc.GreetServer
我们将看到以下输出 -
输出
Jul 03, 2021 1:04:23 PM com.tp.grpc.GreetServer start INFO: Server started, listening on 50051
上述输出意味着服务器已启动。
现在,让我们启动客户端。
java -cp .\target\grpc-point-1.0.jar com.tp.grpc.GreetClient Hello Jane
我们将看到以下输出 -
输出
Jul 03, 2021 1:05:59 PM com.tp.grpc.GreetClient greet INFO: Sending greeting to server: Hello for name: Jane Jul 03, 2021 1:05:59 PM com.tp.grpc.GreetClient greet INFO: Sending to server: greeting: "Hello" name: "Jane" Jul 03, 2021 1:06:00 PM com.tp.grpc.GreetClient greet INFO: Got following from the server: Server says "Hello Jane"
现在,如果我们打开服务器日志,我们将看到以下内容 -
Jul 03, 2021 1:04:23 PM com.tp.grpc.GreetServer start INFO: Server started, listening on 50051 Jul 03, 2021 1:06:00 PM com.tp.grpc.GreetServer$GreeterImpl greet INFO: Got request from client: greeting: "Hello" name: "Jane"
因此,客户端能够按预期调用服务器,并且服务器通过向客户端发送问候进行了响应。
gRPC - 使用 Python 的 Hello World 应用
现在让我们创建一个基本的类似“Hello World”的应用程序,它将使用 gRPC 和 Python。
.proto 文件
首先让我们在common_proto_files中定义greeting.proto文件 -
syntax = "proto3";
service Greeter {
rpc greet (ClientInput) returns (ServerOutput) {}
}
message ClientInput {
string greeting = 1;
string name = 2;
}
message ServerOutput {
string message = 1;
}
现在让我们仔细看看上面代码块中的每一行 -
syntax = "proto3";
此处的“syntax”表示我们正在使用的 Protobuf 版本。因此,我们使用最新版本 3,因此模式可以使用所有对版本 3 有效的语法。
package tutorial;
此处的package用于冲突解决,例如,如果我们有多个名称相同的类/成员。
service Greeter {
rpc greet(ClientInput) returns (ServerOutput) {}
}
此块表示服务“Greeter”的名称和可以调用的函数名称“greet”。“greet”函数接收类型为“ClientInput”的输入并返回类型为“ServerOutput”的输出。现在让我们看看这些类型。
message ClientInput {
string greeting = 1;
string name = 2;
}
在上面的代码块中,我们定义了ClientInput,它包含两个属性“greeting”和“name”,它们都是字符串。客户端应该将类型为“ClientInput”的对象发送到服务器。
message ServerOutput {
string message = 1;
}
这里,我们还定义了,给定一个“ClientInput”,服务器将返回一个包含单个属性“message”的“ServerOutput”。服务器应该将类型为“ServerOutput”的对象发送到客户端。
现在,让我们为 Protobuf 类和 gRPC 类生成底层代码。为此,我们需要执行以下命令:
python -m grpc_tools.protoc -I ..\common_proto_files\ -- python_out=../python --grpc_python_out=. greeting.proto
但是,请注意,要执行该命令,我们需要安装教程的设置部分中提到的正确依赖项。
这应该会自动生成我们使用 gRPC 所需的源代码。源代码将放置在 -
Protobuf class code: python/greeting_pb2.py Protobuf gRPC code: python/greeting_pb2_grpcpb2.py
设置 gRPC 服务器
现在我们已经定义了包含函数定义的 proto 文件,让我们设置一个可以调用这些函数的服务器。
让我们编写我们的服务器代码来服务上述函数,并将其保存在server.py中:
示例
from concurrent import futures
import grpc
import greeting_pb2
import greeting_pb2_grpc
class Greeter(greeting_pb2_grpc.GreeterServicer):
def greet(self, request, context):
print("Got request " + str(request))
return greeting_pb2.ServerOutput(message='{0} {1}!'.format(request.greeting, request.name))
def server():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=2))
greeting_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
server.add_insecure_port('[::]:50051')
print("gRPC starting")
server.start()
server.wait_for_termination()
server()
上面的代码在指定的端口启动一个 gRPC 服务器,并服务于我们在 proto 文件中编写的函数和服务。让我们遍历上面的代码 -
从main方法开始,我们在指定的端口上创建一个 gRPC 服务器。
但在启动服务器之前,我们将服务器分配给我们要运行的服务,即在我们的例子中,是Greeter服务。
为此,我们需要将服务实例传递给服务器,因此我们继续创建一个服务实例,即在我们的例子中为Greeter。
服务实例需要提供.proto文件中存在的 method/function 的实现,即在我们的例子中为greet方法。
该方法期望一个类型为 .proto 文件中定义的对象,即对我们来说是request。
该方法处理上述输入,进行计算,然后应该返回.proto文件中提到的输出,即在我们的例子中为ServerOutput。
设置 gRPC 客户端
现在我们已经编写了服务器的代码,让我们设置一个可以调用这些函数的客户端。
让我们编写我们的客户端代码来调用上述函数,并将其保存在client.py中:
示例
import grpc
import greeting_pb2
import greeting_pb2_grpc
def run():
with grpc.insecure_channel('localhost:50051') as channel:
stub = greeting_pb2_grpc.GreeterStub(channel)
response = stub.greet(greeting_pb2.ClientInput(name='John', greeting = "Yo"))
print("Greeter client received following from server: " + response.message)
run()
上面的代码在指定的端口启动一个 gRPC 服务器,并服务于我们在 proto 文件中编写的函数和服务。让我们遍历上面的代码 -
从main方法开始,我们为与服务器的 gRPC 通信设置了一个 Channel。
然后,我们使用 channel 创建一个stub。在这里,我们使用服务“Greeter”,我们计划调用其函数。stub 只是一个包装器,它隐藏了远程调用对调用者的复杂性。
然后,我们简单地创建 proto 文件中定义的预期输入,即在我们的例子中为ClientInput。我们硬编码了两个参数,即name和greeting。
我们最终进行调用并等待服务器的结果。
因此,这就是我们的客户端代码。
客户端服务器调用
现在,我们已经定义了proto文件,编写了服务器和客户端代码,让我们继续执行此代码并查看实际情况。
要运行代码,请启动两个 shell。通过执行以下命令在第一个 shell 中启动服务器 -
python .\server.py
输出
我们将得到以下输出:
gRPC starting
上述输出表示服务器已启动。
现在,让我们启动客户端。
python .\client.py
我们将看到以下输出 -
输出
Greeter client received following from server: Yo John!
现在,如果我们打开服务器日志,我们将看到以下数据:
gRPC starting Got request greeting: "Yo" name: "John"
因此,正如我们所看到的,客户端能够按预期调用服务器,并且服务器响应了向客户端回送问候。
gRPC - 一元 gRPC
我们现在将了解 gRPC 框架支持的各种类型的通信。我们将使用书店的例子,客户可以在其中搜索和下单送书。
让我们看看一元 gRPC 通信,我们让客户端搜索标题并随机返回一个与查询的标题匹配的书籍。
.proto 文件
首先让我们在 common_proto_files 中定义 bookstore.proto 文件:
syntax = "proto3";
option java_package = "com.tp.bookstore";
service BookStore {
rpc first (BookSearch) returns (Book) {}
}
message BookSearch {
string name = 1;
string author = 2;
string genre = 3;
}
message Book {
string name = 1;
string author = 2;
int32 price = 3;
}
现在让我们仔细看看上面代码块中的每一行。
syntax = "proto3";
语法
这里的“语法”表示我们使用的 Protobuf 版本。我们使用最新的版本 3,因此该模式可以使用所有对版本 3 有效的语法。
package tutorial;
此处的package用于冲突解决,例如,如果我们有多个名称相同的类/成员。
option java_package = "com.tp.bookstore";
此参数特定于 Java,即从 .proto 文件自动生成代码的包。
service BookStore {
rpc first (BookSearch) returns (Book) {}
}
这表示服务的名称"BookStore"和可以调用的函数名称"first"。"first"函数接收类型为"BookSearch"的输入并返回类型为"Book"的输出。因此,实际上,我们让客户端搜索标题并返回一个与查询的标题匹配的书籍。
现在让我们看看这些类型。
message BookSearch {
string name = 1;
string author = 2;
string genre = 3;
}
在上面的代码块中,我们定义了 BookSearch,它包含诸如name、author和genre之类的属性。客户端应该将类型为"BookSearch"的对象发送到服务器。
message Book {
string name = 1;
string author = 2;
int32 price = 3;
}
这里,我们还定义了,给定一个“BookSearch”,服务器将返回一个包含书籍属性以及书籍价格的“Book”。服务器应该将类型为“Book”的对象发送到客户端。
请注意,我们已经完成了 Maven 的设置,以便自动生成我们的类文件以及我们的 RPC 代码。因此,现在我们可以简单地编译我们的项目:
mvn clean install
这应该会自动生成我们使用 gRPC 所需的源代码。源代码将放置在 -
Protobuf class code: target/generated-sources/protobuf/java/com.tp.bookstore Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.bookstore
设置 gRPC 服务器
现在我们已经定义了包含函数定义的 proto 文件,让我们设置一个可以调用这些函数的服务器。
让我们编写我们的服务器代码来服务上述函数,并将其保存在com.tp.bookstore.BookeStoreServerUnary.java中:
示例
package com.tp.bookstore;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
public class BookeStoreServerUnary {
private static final Logger logger = Logger.getLogger(BookeStoreServerUnary.class.getName());
static Map<String, Book> bookMap = new HashMap<>();
static {
bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
.setAuthor("Scott Fitzgerald")
.setPrice(300).build());
bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
.setAuthor("Harper Lee")
.setPrice(400).build());
bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
.setAuthor("E.M.Forster")
.setPrice(500).build());
bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
.setAuthor("Scott Fitzgerald")
.setPrice(600).build());
bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman")
.setAuthor("Harper Lee")
.setPrice(700).build());
}
private Server server;
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new BookStoreImpl()).build().start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
public static void main(String[] args) throws IOException, InterruptedException {
final BookeStoreServerUnary greetServer = new BookeStoreServerUnary();
greetServer.start();
greetServer.server.awaitTermination();
}
static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
@Override
public void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
logger.info("Searching for book with title: " + searchQuery.getName());
List<String> matchingBookTitles = bookMap.keySet().stream().filter(title ->
title.startsWith(searchQuery.getName().trim())).collect(Collectors.toList());
Book foundBook = null;
if(matchingBookTitles.size() > 0) {
foundBook = bookMap.get(matchingBookTitles.get(0));
}
responseObserver.onNext(foundBook);
responseObserver.onCompleted();
}
}
}
上面的代码在指定的端口启动一个 gRPC 服务器,并服务于我们在 proto 文件中编写的函数和服务。让我们遍历上面的代码 -
从main方法开始,我们在指定的端口创建一个 gRPC 服务器。
但在启动服务器之前,我们将要运行的服务分配给服务器,即在我们的例子中为BookStore服务。
为此,我们需要将服务实例传递给服务器,因此我们继续创建一个服务实例,即在我们的例子中为BookStoreImpl
服务实例需要提供.proto文件中存在的 method/function 的实现,即在我们的例子中为first方法。
该方法期望一个类型为 .proto 文件中定义的对象,即对我们来说是BookSearch
该方法在可用的 bookMap 中搜索书籍,然后通过调用onNext()方法返回Book。完成后,服务器通过调用onCompleted()宣布它已完成输出。
最后,我们还有一个关闭钩子,以确保在我们完成代码执行时服务器能够干净地关闭。
设置 gRPC 客户端
现在我们已经编写了服务器的代码,让我们设置一个可以调用这些函数的客户端。
让我们编写我们的客户端代码来调用上述函数,并将其保存在com.tp.bookstore.BookStoreClientUnaryBlocking.java中:
示例
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;
public class BookStoreClientUnaryBlocking {
private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlocking.class.getName());
private final BookStoreGrpc.BookStoreBlockingStub blockingStub;
public BookStoreClientUnaryBlocking(Channel channel) {
blockingStub = BookStoreGrpc.newBlockingStub(channel);
}
public void getBook(String bookName) {
logger.info("Querying for book with title: " + bookName);
BookSearch request = BookSearch.newBuilder().setName(bookName).build();
Book response;
try {
response = blockingStub.first(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
logger.info("Got following book from server: " + response);
}
public static void main(String[] args) throws Exception {
String bookName = args[0];
String serverAddress = "localhost:50051";
ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
.usePlaintext()
.build();
try {
BookStoreClientUnaryBlocking client = new
BookStoreClientUnaryBlocking(channel);
client.getBook(bookName);
} finally {
channel.shutdownNow().awaitTermination(5,
TimeUnit.SECONDS);
}
}
}
上述代码在指定的端口上启动一个 gRPC 服务器,并为我们在proto文件中编写的函数和服务提供服务。让我们逐步了解上面的代码:
从main方法开始,我们接受一个参数,即我们要搜索的书籍的标题。
我们为与服务器的 gRPC 通信设置了一个 Channel。
然后,我们使用 channel 创建一个阻塞 stub。在这里,我们选择服务“BookStore”,我们计划调用其函数。“stub”只是一个包装器,它隐藏了远程调用对调用者的复杂性。
然后,我们简单地创建 proto 文件中定义的预期输入,即在我们的例子中为BookSearch,并添加我们希望服务器搜索的标题名称。
我们最终进行调用并等待服务器的结果。
最后,我们关闭 Channel 以避免任何资源泄漏。
因此,这就是我们的客户端代码。
客户端服务器调用
总而言之,我们要做的是:
启动 gRPC 服务器。
客户端查询服务器以获取具有给定名称/标题的书籍。
服务器在其商店中搜索书籍。
服务器然后响应书籍及其其他属性。
现在,我们已经定义了proto文件,编写了服务器和客户端代码,让我们继续执行此代码并查看实际情况。
要运行代码,请启动两个 shell。通过执行以下命令在第一个 shell 中启动服务器 -
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerUnary
我们将看到以下输出 -
输出
Jul 03, 2021 7:21:58 PM com.tp.bookstore.BookeStoreServerUnary start INFO: Server started, listening on 50051
上述输出表示服务器已启动。
现在,让我们启动客户端。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientUnaryBlocking "To Kill"
我们将看到以下输出 -
输出
Jul 03, 2021 7:22:03 PM com.tp.bookstore.BookStoreClientUnaryBlocking getBook INFO: Querying for book with title: To Kill Jul 03, 2021 7:22:04 PM com.tp.bookstore.BookStoreClientUnaryBlocking getBook INFO: Got following book from server: name: "To Kill MockingBird" author: "Harper Lee" price: 400
因此,正如我们所看到的,客户端能够通过使用书籍名称查询服务器来获取书籍详细信息。
gRPC - 服务器端流式 RPC
现在让我们讨论在使用 gRPC 通信时服务器流是如何工作的。在这种情况下,客户端将搜索具有给定作者的书籍。假设服务器需要一些时间才能遍历所有书籍。服务器不是在遍历完所有书籍后等待提供所有书籍,而是会以流式方式提供书籍,即一旦找到一本就提供一本。
.proto 文件
首先让我们在 common_proto_files 中定义 bookstore.proto 文件:
syntax = "proto3";
option java_package = "com.tp.bookstore";
service BookStore {
rpc first (BookSearch) returns (stream Book) {}
}
message BookSearch {
string name = 1;
string author = 2;
string genre = 3;
}
message Book {
string name = 1;
string author = 2;
int32 price = 3;
}
以下代码块表示服务的名称"BookStore"和可以调用的函数名称"searchByAuthor"。"searchByAuthor"函数接收类型为"BookSearch"的输入并返回类型为"Book"的流。因此,实际上,我们让客户端搜索标题并返回一个与查询的作者匹配的书籍。
service BookStore {
rpc searchByAuthor (BookSearch) returns (stream Book) {}
}
现在让我们看看这些类型。
message BookSearch {
string name = 1;
string author = 2;
string genre = 3;
}
这里,我们定义了BookSearch,它包含一些属性,如name、author和genre。客户端应该将类型为"BookSearch"的对象发送到服务器。
message Book {
string name = 1;
string author = 2;
int32 price = 3;
}
我们还定义了,给定一个"BookSearch",服务器将返回一个包含书籍属性以及书籍价格的"Book"流。服务器应该发送一个"Book"流。
请注意,我们已经完成了 Maven 的设置,以便自动生成我们的类文件以及我们的 RPC 代码。因此,现在我们可以简单地编译我们的项目:
mvn clean install
这应该会自动生成我们使用 gRPC 所需的源代码。源代码将放置在 -
Protobuf class code: target/generated-sources/protobuf/java/com.tp.bookstore Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.bookstore
设置 gRPC 服务器
现在我们已经定义了包含函数定义的 proto 文件,让我们设置一个可以服务调用这些函数的服务器。
让我们编写我们的服务器代码来服务上述函数,并将其保存在com.tp.bookstore.BookeStoreServerStreaming.java中:
示例
package com.tp.bookstore;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
public class BookeStoreServerUnary {
private static final Logger logger = Logger.getLogger(BookeStoreServerrStreaming.class.getName());
static Map<String, Book> bookMap = new HashMap<>();
static {
bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
.setAuthor("Scott Fitzgerald")
.setPrice(300).build());
bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
.setAuthor("Harper Lee")
.setPrice(400).build());
bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
.setAuthor("E.M.Forster")
.setPrice(500).build());
bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
.setAuthor("Scott Fitzgerald")
.setPrice(600).build());
bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman")
.setAuthor("Harper Lee")
.setPrice(700).build());
}
private Server server;
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new BookStoreImpl()).build().start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
public static void main(String[] args) throws IOException, InterruptedException {
final BookeStoreServerUnary greetServer = new BookeStoreServerUnary();
greetServer.start();
greetServer.server.awaitTermination();
}
static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
@Override
public void searchByAuthor(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
logger.info("Searching for book with author: " + searchQuery.getAuthor());
for (Entry<String, Book> bookEntry : bookMap.entrySet()) {
try {
logger.info("Going through more books....");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(bookEntry.getValue().getAuthor().startsWith(searchQuery.getAuthor())){
logger.info("Found book with required author: " + bookEntry.getValue().getName()+ ". Sending....");
responseObserver.onNext(bookEntry.getValue());
}
}
responseObserver.onCompleted();
}
}
}
上述代码在指定的端口上启动一个 gRPC 服务器,并为我们在proto文件中编写的函数和服务提供服务。让我们逐步了解上面的代码:
从main方法开始,我们在指定的端口创建一个 gRPC 服务器。
但在启动服务器之前,我们将要运行的服务分配给服务器,即在我们的例子中为BookStore服务。
为此,我们需要将服务实例传递给服务器,因此我们继续创建一个服务实例,即在我们的例子中为BookStoreImpl
服务实例需要提供.proto 文件中存在的 method/function 的实现,即在我们的例子中为searchByAuthor方法。
该方法期望一个类型为 .proto 文件中定义的对象,即对我们来说是BookSearch
请注意,我们添加了一个 sleep 来模拟遍历所有书籍的操作。在流式传输的情况下,服务器不会等待所有搜索到的书籍都可用。它在书籍可用时通过使用onNext()调用返回书籍。
当服务器完成请求时,它通过调用onCompleted()关闭通道。
最后,我们还有一个关闭钩子,以确保在我们完成代码执行时服务器能够干净地关闭。
设置 gRPC 客户端
现在我们已经编写了服务器的代码,让我们设置一个可以调用这些函数的客户端。
让我们编写我们的客户端代码来调用上述函数,并将其保存在com.tp.bookstore.BookStoreClientServerStreamingBlocking.java 中:
示例
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;
public class BookStoreClientServerStreamingBlocking {
private static final Logger logger = Logger.getLogger(BookStoreClientServerStreamingBlocking.class.getName());
private final BookStoreGrpc.BookStoreBlockingStub blockingStub;
public BookStoreClientServerStreamingBlocking(Channel channel) {
blockingStub = BookStoreGrpc.newBlockingStub(channel);
}
public void getBook((String author) {
logger.info("Querying for book with author: " + author);
BookSearch request = BookSearch.newBuilder()..setAuthor(author).build();
Iterator<Book> response;
try {
response = blockingStub.searchByAuthor(request);
while(response.hasNext()) {
logger.info("Found book: " + response.next());
}
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
}
public static void main(String[] args) throws Exception {
String authorName = args[0];
String serverAddress = "localhost:50051";
ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
.usePlaintext()
.build();
try {
BookStoreClientServerStreamingBlocking client = new BookStoreClientUnaryBlocking(channel);
client.getBook(authorName);
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
上面的代码在指定的端口启动一个 gRPC 服务器,并服务于我们在 proto 文件中编写的函数和服务。让我们遍历上面的代码 -
从main方法开始,我们接受一个参数,即我们要搜索的书籍的title。
我们为与服务器的 gRPC 通信设置了一个 Channel。
然后,我们使用 channel 创建一个阻塞 stub。在这里,我们选择服务“BookStore”,我们计划调用其函数。
然后,我们简单地创建 proto 文件中定义的预期输入,即在我们的例子中为 BookSearch,并添加我们希望服务器搜索的标题。
我们最终进行调用并在有效的 Books 上获取一个迭代器。当我们迭代时,我们得到服务器提供的相应 Books。
最后,我们关闭 Channel 以避免任何资源泄漏。
因此,这就是我们的客户端代码。
客户端服务器调用
总而言之,我们要做的是:
启动 gRPC 服务器。
客户端查询服务器以获取具有给定作者的书籍。
服务器在其商店中搜索书籍,这是一个耗时的过程。
服务器在找到符合给定条件的书籍时做出响应。服务器不会等待所有有效的书籍都可用。它在找到一本时立即发送输出。然后重复此过程。
现在,我们已经定义了proto文件,编写了服务器和客户端代码,让我们继续执行此代码并查看实际情况。
要运行代码,请启动两个 shell。通过执行以下命令在第一个 shell 中启动服务器 -
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerStreaming
我们将看到以下输出 -
输出
Jul 03, 2021 10:37:21 PM com.tp.bookstore.BookeStoreServerStreaming start INFO: Server started, listening on 50051
上述输出表示服务器已启动。
现在,让我们启动客户端。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientServerStreamingBlocking "Har"
我们将看到以下输出 -
输出
Jul 03, 2021 10:40:31 PM com.tp.bookstore.BookStoreClientServerStreamingBlocking getBook INFO: Querying for book with author: Har Jul 03, 2021 10:40:37 PM com.tp.bookstore.BookStoreClientServerStreamingBlocking getBook INFO: Found book: name: "Go Set a Watchman" author: "Harper Lee" price: 700 Jul 03, 2021 10:40:42 PM com.tp.bookstore.BookStoreClientServerStreamingBlocking getBook INFO: Found book: name: "To Kill MockingBird" author: "Harper Lee" price: 400
因此,正如我们所看到的,客户端能够通过使用书籍名称查询服务器来获取书籍详细信息。但更重要的是,客户端在不同的时间戳获取了第一本书和第二本书,即大约 5 秒的间隔。
gRPC - 客户端流式 RPC
现在让我们看看在使用 gRPC 通信时客户端流是如何工作的。在这种情况下,客户端将搜索并将书籍添加到购物车。一旦客户端完成添加所有书籍,服务器将向客户端提供结账购物车值。
.proto 文件
首先让我们在common_proto_files中定义bookstore.proto文件:
syntax = "proto3";
option java_package = "com.tp.bookstore";
service BookStore {
rpc totalCartValue (stream Book) returns (Cart) {}
}
message BookSearch {
string name = 1;
string author = 2;
int32 price = 3;
}
message Cart {
int32 books = 1;
int32 price = 2;
}
这里,以下代码块表示服务名称 **"BookStore"** 和可以调用的函数名称 **"totalCartValue"**。“totalCartValue” 函数接收类型为 **"Book"** 的输入,该输入是一个流。函数返回类型为 "Cart" 的对象。因此,实际上,我们允许客户端以流的方式添加书籍,一旦客户端完成,服务器就会向客户端提供购物车总价值。
service BookStore {
rpc totalCartValue (stream Book) returns (Cart) {}
}
现在让我们看看这些类型。
message Book {
string name = 1;
string author = 2;
int32 price = 3;
}
客户端将发送它想要购买的 **"Book"**。它可能不是完整的书籍信息;它可以仅仅是书籍的标题。
message Cart {
int32 books = 1;
int32 price = 2;
}
服务器在获取书籍列表后,将返回 **"Cart"** 对象,该对象仅仅是客户端购买的书籍总数和总价。
请注意,我们已经完成了 Maven 的设置,以便自动生成我们的类文件以及我们的 RPC 代码。因此,现在我们可以简单地编译我们的项目:
mvn clean install
这应该会自动生成我们使用 gRPC 所需的源代码。源代码将放置在 -
Protobuf class code: target/generated-sources/protobuf/java/com.tp.bookstore Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.bookstore
设置 gRPC 服务器
现在我们已经定义了包含函数定义的 **proto** 文件,让我们设置一个可以调用这些函数的服务器。
让我们编写我们的服务器代码来服务于上述函数,并将其保存在 **com.tp.bookstore.BookeStoreServerClientStreaming.java** 中 -
示例
package com.tp.bookstore;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
public class BookeStoreServerClientStreaming {
private static final Logger logger = Logger.getLoggerr(BookeStoreServerClientStreaming.class.getName());
static Map<String, Book> bookMap = new HashMap<>();
static {
bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
.setAuthor("Scott Fitzgerald")
.setPrice(300).build());
bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
.setAuthor("Harper Lee")
.setPrice(400).build());
bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
.setAuthor("E.M.Forster")
.setPrice(500).build());
bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
.setAuthor("Scott Fitzgerald")
.setPrice(600).build());
bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman")
.setAuthor("Harper Lee")
.setPrice(700).build());
}
private Server server;
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new BookStoreImpl()).build().start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
public static void main(String[] args) throws IOException, InterruptedException {
final BookeStoreServerClientStreaming greetServer = new BookeStoreServerClientStreaming();
greetServer.start();
greetServer.server.awaitTermination();
}
static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
@Override
public StreamObserver<Book> totalCartValue(StreamObserver<Cart> responseObserver) {
return new StreamObserver<Book>() {
ArrayList<Book> bookCart = new ArrayList<Book>();
@Override
public void onNext(Book book)
logger.info("Searching for book with title starting with: " + book.getName());
for (Entry<String, Book> bookEntry : bookMap.entrySet()) {
if(bookEntry.getValue().getName().startsWith(book.getName())){
logger.info("Found book, adding to cart:....");
bookCart.add(bookEntry.getValue());
}
}
}
@Override
public void onError(Throwable t) {
logger.info("Error while reading book stream: " + t);
}
@Override
public void onCompleted() {
int cartValue = 0;
for (Book book : bookCart) {
cartValue += book.getPrice();
}
responseObserver.onNext(Cart.newBuilder()
.setPrice(cartValue)
.setBooks(bookCart.size()).build());
responseObserver.onCompleted();
}
};
}
}
上述代码在指定的端口上启动一个 gRPC 服务器,并为我们在proto文件中编写的函数和服务提供服务。让我们逐步了解上面的代码:
从main方法开始,我们在指定的端口创建一个 gRPC 服务器。
但在启动服务器之前,我们将要运行的服务分配给服务器,即在我们的例子中为BookStore服务。
为此,我们需要将服务实例传递给服务器,因此我们继续创建一个服务实例,即在我们的例子中为BookStoreImpl
服务实例需要提供 **.proto 文件** 中存在的 method/function 的实现,即在我们的例子中,是 **totalCartValue** 方法。
现在,鉴于这是客户端流的情况,服务器将在客户端添加书籍时获得一个 Book 列表(在 **proto** 文件中定义)。因此,服务器返回一个 **自定义流观察器**。此流观察器实现了当找到新 Book 时以及当流关闭时会发生什么。
当客户端添加一个 Book 时,gRPC 框架将调用 **onNext()** 方法。此时,服务器将其添加到购物车中。在流式传输的情况下,服务器不会等待所有可用的书籍。
当客户端完成添加书籍时,将调用流观察器的 **onCompleted()** 方法。此方法实现了服务器在客户端完成添加书籍时想要发送的内容,即它将 Cart 对象返回给客户端。
最后,我们还有一个关闭钩子,以确保在我们完成代码执行时服务器能够干净地关闭。
设置 gRPC 客户端
现在我们已经编写了服务器的代码,让我们设置一个可以调用这些函数的客户端。
让我们编写我们的客户端代码来调用上述函数,并将其保存在 **com.tp.bookstore.BookStoreClientServerStreamingBlocking.java** 中 -
示例
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreGrpc.BookStoreFutureStub;
import com.tp.bookstore.BookStoreGrpc.BookStoreStub;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;
public class BookStoreClientStreamingClient {
private static final Logger logger = Logger.getLogger(BookStoreClientStreaming.class.getName());
private final BookStoreStub stub;
private boolean serverResponseCompleted = false;
StreamObserver<Book> streamClientSender;
public BookStoreClientStreamingClient(Channel channel) {
stub = BookStoreGrpc.newStub(channel);
}
public StreamObserver<Cart> getServerResponseObserver(){
StreamObserver<Cart> observer = new StreamObserver<Cart>(){
@Override
public void onNext(Cart cart) {
logger.info("Order summary:" + "\nTotal number of Books:" + cart.getBooks() +
"\nTotal Order Value:" + cart.getPrice());
}
@Override
public void onCompleted() {
//logger.info("Server: Done reading orderreading cart");
serverResponseCompleted = true;
}
};
return observer;
}
public void addBook(String book) {
logger.info("Adding book with title starting with: " + book);
Book request = Book.newBuilder().setName(book).build();
if(streamClientSender == null) {
streamClientSender = stub.totalCartValue(getServerResponseObserver());
}
try {
streamClientSender.onNext(request);
}
catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
}
}
public void completeOrder() {
logger.info("Done, waiting for server to create order summary...");
if(streamClientSender != null);
streamClientSender.onCompleted();
}
public static void main(String[] args) throws Exception {
String serverAddress = "localhost:50051";
ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
.usePlaintext()
.build();
try {
BookStoreClientStreamingClient client = new BookStoreClientStreamingClient(channel);
String bookName = "";
while(true) {
System.out.println("Type book name to be added to the cart....");
bookName = System.console().readLine();
if(bookName.equals("EXIT")) {
client.completeOrder();
break;
}
client.addBook(bookName);
}
while(client.serverResponseCompleted == false) {
Thread.sleep(2000);
}
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
上面的代码在指定的端口启动一个 gRPC 服务器,并服务于我们在 proto 文件中编写的函数和服务。让我们遍历上面的代码 -
从main方法开始,我们接受一个参数,即我们要搜索的书籍的标题。
我们为与服务器的 gRPC 通信设置了一个 Channel。
接下来,我们使用创建的通道创建一个 **非阻塞存根**。在这里,我们选择计划调用其函数的服务 **"BookStore"**。
然后,我们简单地创建 **.proto** 文件中定义的预期输入,即在我们的例子中,是 **Book**,并添加我们希望服务器添加的标题。
但是鉴于这是客户端流的情况,我们首先为服务器创建一个流观察器。此服务器流观察器列出了服务器响应时需要执行的操作的行为,即 **onNext()** 和 **onCompleted()**
并且使用存根,我们也获得了客户端流观察器。我们使用此流观察器发送要添加到购物车的书籍数据。最终,我们进行调用并在有效书籍上获取迭代器。当我们迭代时,我们得到服务器提供的相应书籍。
并且一旦我们的订单完成,我们确保客户端流观察器已关闭。它告诉服务器计算购物车价值并将其作为输出提供。
最后,我们关闭 Channel 以避免任何资源泄漏。
因此,这就是我们的客户端代码。
客户端服务器调用
总而言之,我们要做的是:
启动 gRPC 服务器。
客户端通过向服务器通知它们来添加书籍流。
服务器在其商店中搜索书籍并将其添加到购物车。
当客户端完成订购时,服务器会响应客户端的购物车总价值。
现在,我们已经定义了proto文件,编写了服务器和客户端代码,让我们继续执行此代码并查看实际情况。
要运行代码,请启动两个 shell。通过执行以下命令在第一个 shell 中启动服务器 -
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerClientStreaming
我们将看到以下输出 -
输出
Jul 03, 2021 10:37:21 PM com.tp.bookstore.BookeStoreServerStreaming start INFO: Server started, listening on 50051
上述输出表示服务器已启动。
现在,让我们启动客户端。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientServerStreamingClient
让我们向我们的客户端添加一些书籍。
Type book name to be added to the cart.... Gr Jul 24, 2021 5:53:07 PM com.tp.bookstore.BookStoreClientStreamingClient addBook INFO: Adding book with title starting with: Great Type book name to be added to the cart.... Pa Jul 24, 2021 5:53:20 PM com.tp.bookstore.BookStoreClientStreamingClient addBook INFO: Adding book with title starting with: Passage Type book name to be added to the cart....
一旦我们添加了书籍并输入“EXIT”,服务器将计算购物车价值,以下是我们获得的输出 -
输出
EXIT Jul 24, 2021 5:53:33 PM com.tp.bookstore.BookStoreClientStreamingClient completeOrder INFO: Done, waiting for server to create order summary... Jul 24, 2021 5:53:33 PM com.tp.bookstore.BookStoreClientStreamingClient$1 onNext INFO: Order summary: Total number of Books: 2 Total Order Value: 800
因此,正如我们所看到的,客户端能够添加书籍。并且一旦所有书籍都添加完毕,服务器将响应书籍总数和总价。
gRPC - 双向 RPC
现在让我们看看在使用 gRPC 通信时客户端-服务器流是如何工作的。在这种情况下,客户端将搜索并将书籍添加到购物车。服务器将在每次添加书籍时响应实时购物车价值。
.proto 文件
首先让我们在common_proto_files中定义bookstore.proto文件:
syntax = "proto3";
option java_package = "com.tp.bookstore";
service BookStore {
rpc liveCartValue (stream Book) returns (stream Cart) {}
}
message Book {
string name = 1;
string author = 2;
int32 price = 3;
}
message Cart {
int32 books = 1;
int32 price = 2;
}
以下代码块表示服务名称 **"BookStore"** 和可以调用的函数名称 "liveCartValue"。**"liveCartValue"** 函数接收类型为 **"Book"** 的输入,该输入是一个流。函数返回类型为 **"Cart"** 的对象的流。因此,实际上,我们允许客户端以流的方式添加书籍,并且每当添加新书籍时,服务器都会将当前购物车价值响应给客户端。
service BookStore {
rpc liveCartValue (stream Book) returns (stream Cart) {}
}
现在让我们看看这些类型。
message Book {
string name = 1;
string author = 2;
int32 price = 3;
}
客户端将发送它想要购买的 **"Book"**。它不必是完整的书籍信息;它可以仅仅是书籍的标题。
message Cart {
int32 books = 1;
int32 price = 2;
}
服务器在获取书籍列表后,将返回 **"Cart"** 对象,该对象仅仅是客户端购买的书籍总数和总价。
请注意,我们已经完成了 Maven 设置,用于自动生成我们的类文件以及我们的 RPC 代码。因此,现在我们可以简单地编译我们的项目。
mvn clean install
这应该会自动生成我们使用 gRPC 所需的源代码。源代码将放置在
Protobuf class code: target/generatedsources/protobuf/java/com.tp.bookstore Protobuf gRPC code: target/generated-sources/protobuf/grpcjava/com.tp.bookstore
设置 gRPC 服务器
现在我们已经定义了包含函数定义的 proto 文件,让我们设置一个可以调用这些函数的服务器。
让我们编写我们的服务器代码来服务于上述函数,并将其保存在 **com.tp.bookstore.BookeStoreServerBothStreaming.java** 中 -
示例
package com.tp.bookstore;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
public class BookeStoreServerBothStreaming {
private static final Logger logger =Logger.getLogger(BookeStoreServerBothStreaming.class.getName());
static Map<String, Book> bookMap = new HashMap<>();
static {
bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
.setAuthor("Scott Fitzgerald")
.setPrice(300).build());
bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
.setAuthor("Harper Lee")
.setPrice(400).build());
bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
.setAuthor("E.M.Forster")
.setPrice(500).build());
bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
.setAuthor("Scott Fitzgerald")
.setPrice(600).build());
bookMap.put("Go Set a Watchman",Book.newBuilder().setName("Go Set a Watchman")
.setAuthor("Harper Lee")
.setPrice(700).build());
}
private Server server;
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new BookStoreImpl()).build().start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
public static void main(String[] args) throws IOException,InterruptedException {
final BookeStoreServerBothStreaming greetServer = newBookeStoreServerBothStreaming();
greetServer.start();
greetServer.server.awaitTermination();
}
static class BookStoreImpl extendsBookStoreGrpc.BookStoreImplBase {
@Override
public StreamObserver<Book>liveCartValue(StreamObserver<Cart> responseObserver) {
return new StreamObserver<Book>() {
ArrayList<Book> bookCart = new ArrayList<Book>();
int cartValue = 0;
@Override
public void onNext(Book book) {
logger.info("Searching for book with titlestarting with: " + book.getName());
for (Entry<String, Book> bookEntry :bookMap.entrySet()) {
if(bookEntry.getValue().getName().startsWith(book.getName())){
logger.info("Found book, adding tocart:....");
bookCart.add(bookEntry.getValue());
cartValue +=bookEntry.getValue().getPrice();
}
}
logger.info("Updating cart value...");
responseObserver.onNext(Cart.newBuilder()
.setPrice(cartValue)
.setBooks(bookCart.size()).build());
}
@Override
public void onError(Throwable t) {
logger.info("Error while reading book stream: " + t);
}
@Override
public void onCompleted() {
logger.info("Order completed");
responseObserver.onCompleted();
}
};
}
}
}
上述代码在指定的端口上启动一个 gRPC 服务器,并为我们在proto文件中编写的函数和服务提供服务。让我们逐步了解上面的代码:
从 **main** 方法开始,我们在指定的端口上创建一个 gRPC 服务器。
但在启动服务器之前,我们将要运行的服务分配给服务器,即在我们的例子中为BookStore服务。
为此,我们需要将服务实例传递给服务器,因此我们继续创建服务实例,即在我们的例子中是 **BookStoreImpl**
服务实例需要提供 **proto** 文件中存在的 method/function 的实现,即在我们的例子中,是 **totalCartValue** 方法。
现在,鉴于这是服务器和客户端流的情况,服务器将在客户端添加书籍时获得一个 **Books** 列表(在 **proto** 文件中定义)。因此,服务器返回一个自定义流观察器。此流观察器实现了当找到新 Book 时以及当流关闭时会发生什么。
当客户端添加一个 Book 时,gRPC 框架将调用 **onNext()** 方法。此时,服务器将其添加到购物车中并使用响应观察器返回购物车价值。在流式传输的情况下,服务器不会等待所有有效的书籍可用。
当客户端完成添加书籍时,将调用流观察器的 **onCompleted()** 方法。此方法实现了服务器在客户端完成添加 **Books** 时想要执行的操作,即声明它已完成接收客户端订单。
最后,我们还有一个关闭钩子,以确保在我们完成代码执行时服务器能够干净地关闭。
设置 gRPC 客户端
现在我们已经编写了服务器的代码,让我们设置一个可以调用这些函数的客户端。
让我们编写我们的客户端代码来调用上述函数,并将其保存在 **com.tp.bookstore.BookStoreClientBothStreaming.java** 中 -
示例
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreGrpc.BookStoreFutureStub;
import com.tp.bookstore.BookStoreGrpc.BookStoreStub;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;
public class BookStoreClientBothStreaming {
private static final Logger logger = Logger.getLogger(BookStoreClientBothStreaming.class.getName());
private final BookStoreStub stub;
private boolean serverIntermediateResponseCompleted = true;
private boolean serverResponseCompleted = false;
StreamObserver<Book> streamClientSender;
public BookStoreClientBothStreaming(Channel channel) {
stub = BookStoreGrpc.newStub(channel);
}
public StreamObserver>Cart< getServerResponseObserver(){
StreamObserver>Cart< observer = new StreamObserver<Cart>(){
@Override
public void onNext(Cart cart) {
logger.info("Order summary:" +
"\nTotal number of Books:" + cart.getBooks()+
"\nTotal Order Value:" cart.getPrice());
serverIntermediateResponseCompleted = true;
}
@Override
public void onError(Throwable t) {
logger.info("Error while reading response fromServer: " + t);
}
@Override
public void onCompleted() {
//logger.info("Server: Done reading orderreading cart");
serverResponseCompleted = true;
}
};
return observer;
}
public void addBook(String book) {
logger.info("Adding book with title starting with: " + book);
Book request = Book.newBuilder().setName(book).build();
if(streamClientSender == null) {
streamClientSender =stub.liveCartValue(getServerResponseObserver());
}
try {
streamClientSender.onNext(request);
}
catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
}
}
public void completeOrder() {
logger.info("Done, waiting for server to create ordersummary...");
if(streamClientSender != null); {
streamClientSender.onCompleted();
}
}
public static void main(String[] args) throws Exception {
String serverAddress = "localhost:50051";
ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
.usePlaintext()
.build();
try {
BookStoreClientBothStreaming client = new
BookStoreClientBothStreaming(channel);
String bookName = "";
while(true) {
if(client.serverIntermediateResponseCompleted ==true) {
System.out.println("Type book name to beadded to the cart....");
bookName = System.console().readLine();
if(bookName.equals("EXIT")) {
client.completeOrder();
break;
}
client.serverIntermediateResponseCompleted = false;
client.addBook(bookName);
Thread.sleep(500);
}
}
while(client.serverResponseCompleted == false) {
Thread.sleep(2000);
}
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
以上代码启动一个 gRPC 客户端并连接到指定端口上的服务器,并调用我们在 **proto** 文件中编写的函数和服务。让我们一起浏览以上代码 -
从 **main** 方法开始,我们接受要添加到购物车的书籍名称。一旦所有书籍都将要添加,用户预计会打印“EXIT”。
我们为与服务器的 gRPC 通信设置了一个 Channel。
接下来,我们使用通道创建一个非阻塞存根。在这里,我们选择计划调用其函数的服务 **"BookStore"**。
然后,我们简单地创建 **.proto** 文件中定义的预期输入,即在我们的例子中,是 **Book**,并添加我们希望服务器添加的标题。
但是鉴于这是服务器和客户端流的情况,我们首先为服务器创建一个 **流观察器**。此服务器流观察器列出了服务器响应时需要执行的操作的行为,即 **onNext()** 和 **onCompleted()**。
并且使用存根,我们也获得了客户端流观察器。我们使用此流观察器发送要添加到购物车的书籍数据。
并且一旦我们的订单完成,我们确保客户端流观察器已关闭。这告诉服务器关闭流并执行清理。
最后,我们关闭 Channel 以避免任何资源泄漏。
因此,这就是我们的客户端代码。
客户端服务器调用
总而言之,我们要做的是:
启动 gRPC 服务器。
客户端通过向服务器通知它们来添加书籍流。
服务器在其商店中搜索书籍并将其添加到购物车。
每次添加书籍时,服务器都会告诉客户端购物车价值。
当客户端完成订购时,服务器和客户端都会关闭流。
现在我们已经定义了我们的 **proto** 文件,编写了我们的服务器和客户端代码,让我们现在执行此代码并查看实际情况。
要运行代码,请启动两个 shell。通过执行以下命令在第一个 shell 中启动服务器 -
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerClientStreaming
我们将看到以下输出 -
输出
Jul 03, 2021 10:37:21 PM com.tp.bookstore.BookeStoreServerStreaming start INFO: Server started, listening on 50051
此输出表示服务器已启动。
现在,让我们启动客户端。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientBothStreaming
让我们向我们的客户端添加一本书。
Jul 24, 2021 7:21:45 PM com.tp.bookstore.BookStoreClientBothStreaming main Type book name to be added to the cart.... Great Jul 24, 2021 7:21:48 PM com.tp.bookstore.BookStoreClientBothStreaming addBook INFO: Adding book with title starting with: Gr Jul 24, 2021 7:21:48 PM com.tp.bookstore.BookStoreClientBothStreaming$1 onNext INFO: Order summary: Total number of Books: 1 Total Order Value: 300
因此,正如我们所看到的,我们得到了订单的当前购物车价值。现在让我们向我们的客户端再添加一本书。
Type book name to be added to the cart.... Passage Jul 24, 2021 7:21:51 PM com.tp.bookstore.BookStoreClientBothStreaming addBook INFO: Adding book with title starting with: Pa Jul 24, 2021 7:21:51 PM com.tp.bookstore.BookStoreClientBothStreaming$1 onNext INFO: Order summary: Total number of Books: 2 Total Order Value: 800
一旦我们添加了书籍并输入“EXIT”,客户端将关闭。
Type book name to be added to the cart.... EXIT Jul 24, 2021 7:21:59 PM com.tp.bookstore.BookStoreClientBothStreaming completeOrder INFO: Done, waiting for server to create order summary...
因此,正如我们所看到的,客户端能够添加书籍。并且随着书籍的添加,服务器将响应当前购物车价值。
gRPC - 客户端调用
gRPC 客户端支持两种类型的客户端调用,即客户端如何调用服务器。以下是两种方式 -
阻塞式客户端调用
异步客户端调用
在本章中,我们将逐一查看它们。
阻塞式客户端调用
gRPC 支持阻塞式客户端调用。这意味着,一旦客户端对服务进行调用,客户端将不会继续执行其余代码,直到它从服务器收到响应。请注意,对于单向调用和服务器流式调用,阻塞式客户端调用是可能的。
请注意,对于单向调用和服务器流式调用,阻塞式客户端调用是可能的。
这是一个单向阻塞式客户端调用的示例。
示例
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;
public class BookStoreClientUnaryBlocking {
private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlocking.class.getName());
private final BookStoreGrpc.BookStoreBlockingStubblockingStub;
public BookStoreClientUnaryBlocking(Channel channel) {
blockingStub = BookStoreGrpc.newBlockingStub(channel);
}
public void getBook(String bookName) {
logger.info("Querying for book with title: " + bookName);
BookSearch request = BookSearch.newBuilder().setName(bookName).build();
Book response;
try {
response = blockingStub.first(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
logger.info("Got following book from server: " + response);
}
public static void main(String[] args) throws Exception {
String bookName = args[0];
String serverAddress = "localhost:50051";
ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
.usePlaintext()
.build();
try {
BookStoreClientUnaryBlocking client = new
BookStoreClientUnaryBlocking(channel);
client.getBook(bookName);
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
在以上示例中,我们有,
public BookStoreClientUnaryBlocking(Channel channel) {
blockingStub = BookStoreGrpc.newBlockingStub(channel);
}
这意味着我们将使用阻塞式 RPC 调用。
然后,我们有,
BookSearch request = BookSearch.newBuilder().setName(bookName).build(); Book response; response = blockingStub.first(request);
在这里,我们使用 **blockingStub** 调用 RPC **method first()** 来获取书籍详细信息。
类似地,对于服务器流,我们可以使用阻塞式存根 -
logger.info("Querying for book with author: " + author);
BookSearch request =
BookSearch.newBuilder().setAuthor(author).build();
Iterator<Book> response;
try {
response = blockingStub.searchByAuthor(request);
while(response.hasNext()) {
logger.info("Found book: " + response.next());
}
在这里,我们调用 RPC 方法 **searchByAuthor** 方法,并迭代响应,直到服务器流结束。
非阻塞式客户端调用
gRPC 支持非阻塞式客户端调用。这意味着,当客户端对服务进行调用时,它不需要等待服务器响应。为了处理服务器响应,客户端可以简单地传递观察器,该观察器指示在收到响应时该做什么。请注意,对于单向调用和流式调用,非阻塞式客户端调用是可能的。但是,我们将专门关注服务器流式调用的情况,以将其与阻塞式调用进行比较。
请注意,对于单向调用和流式调用,非阻塞式客户端调用是可能的。但是,我们将专门关注服务器流式调用的情况,以将其与阻塞式调用进行比较。
这是一个服务器流式非阻塞式客户端调用的示例。
示例
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;
public class BookStoreClientServerStreamingNonBlocking {
private static final Logger logger = Logger.getLogger(BookStoreClientServerStreamingNonBlocking.class.getName());
private final BookStoreGrpc.BookStoreStub nonBlockingStub;
public BookStoreClientServerStreamingNonBlocking(Channelchannel) {
nonBlockingStub = BookStoreGrpc.newStub(channel);
}
public StreamObserver<Book> getServerResponseObserver(){
StreamObserver<Book> observer = new
StreamObserver<Book>(){
@Override
public void onNext(Book book) {
logger.info("Server returned following book: " +book);
}
@Override
public void onError(Throwable t) {
logger.info("Error while reading response fromServer: " + t);
}
@Override
public void onCompleted() {
logger.info("Server returned following book: " + book);
}
};
return observer;
}
public void getBook(String author) {
logger.info("Querying for book with author: " + author);
BookSearch request = BookSearch.newBuilder().setAuthor(author).build();
try {
nonBlockingStub.searchByAuthor(request,getServerResponseObserver());
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}",e.getStatus());
return;
}
}
public static void main(String[] args) throws Exception {
String authorName = args[0];
String serverAddress = "localhost:50051";
ManagedChannel channel =ManagedChannelBuilder.forTarget(serverAddress)
.usePlaintext()
.build();
try {
BookStoreClientServerStreamingNonBlocking client = new
BookStoreClientServerStreamingNonBlocking(channel);
client.getBook(authorName);
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
正如我们在以上示例中看到的,
public BookStoreClientUnaryNonBlocking(Channel channel) {
nonBlockingStub = BookStoreGrpc.newStub(channel);
}
它定义了存根是非阻塞式的。类似地,以下代码用于处理我们从服务器接收到的响应。一旦服务器发送响应,我们就记录输出。
public StreamObserver<Book> getServerResponseObserver(){
StreamObserver<Book> observer = new
StreamObserver<Book>(){
....
....
return observer;
}
以下 gRPC 调用是非阻塞式调用。
logger.info("Querying for book with author: " + author);
BookSearch request = BookSearch.newBuilder().setAuthor(author).build();
try {
nonBlockingStub.searchByAuthor(request, getServerResponseObserver());
}
这就是我们确保客户端不需要等到服务器完成 **searchByAuthor** 执行的方式。这将由流观察器对象在服务器返回 Book 对象时直接处理。
gRPC - 超时和取消
gRPC 支持为请求分配超时。这是一种执行请求取消的方法。它有助于避免为客户端和服务器使用资源,以执行其结果对客户端无用的请求。
请求超时
gRPC 支持为客户端和服务器指定超时。
客户端可以在运行时指定它希望等待多长时间,然后取消请求。
服务器也可以在其端检查请求是否需要处理,或者客户端是否已放弃请求。
让我们来看一个例子,客户端期望在2秒内收到响应,但服务器花费了更长的时间。所以,这是我们的服务器代码。
示例
package com.tp.bookstore;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
public class BookeStoreServerUnaryTimeout {
private static final Logger logger = Logger.getLogger(BookeStoreServerUnaryTimeout.class.getName());
static Map<String, Book> bookMap = new HashMap<>();
static {
bookMap.put("Great Gatsby",Book.newBuilder().setName("Great Gatsby")
.setAuthor("Scott Fitzgerald")
.setPrice(300).build());
bookMap.put("To Kill MockingBird",Book.newBuilder().setName("To Kill MockingBird")
.setAuthor("Harper Lee")
.setPrice(400).build());
bookMap.put("Passage to India",Book.newBuilder().setName("Passage to India")
.setAuthor("E.M.Forster")
.setPrice(500).build());
bookMap.put("The Side of Paradise",Book.newBuilder().setName("The Side of Paradise")
.setAuthor("Scott Fitzgerald")
.setPrice(600).build());
bookMap.put("Go Set a Watchman",Book.newBuilder().setName("Go Set a Watchman")
.setAuthor("Harper Lee")
.setPrice(700).build());
}
private Server server;
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new BookStoreImpl()).build().start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30,TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
public static void main(String[] args) throws IOException,InterruptedException {
final BookeStoreServerUnaryTimeout greetServer = new
BookeStoreServerUnaryTimeout();
greetServer.start();
greetServer.server.awaitTermination();
}
static class BookStoreImpl extends
BookStoreGrpc.BookStoreImplBase {
@Override
public void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
logger.info("Searching for book with title: " +searchQuery.getName());
logger.info("This may take more time...");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
List<String> matchingBookTitles = bookMap.keySet().stream()
.filter(title ->
title.startsWith(searchQuery.getName().trim()))
.collect(Collectors.toList());
Book foundBook = null;
if(matchingBookTitles.size() > 0) {
foundBook = bookMap.get(matchingBookTitles.get(0));
}
responseObserver.onNext(foundBook);
responseObserver.onCompleted();
}
}
}
在上面的代码中,服务器搜索客户端提供的书籍标题。我们添加了一个模拟休眠,以便我们可以看到请求超时。
这是我们的客户端代码
示例
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;
public class BookStoreClientUnaryBlockingTimeout {
private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlockingTimeout.class.getName());
private final BookStoreGrpc.BookStoreBlockingStubblockingStub;
public BookStoreClientUnaryBlockingTimeout(Channel channel){
blockingStub = BookStoreGrpc.newBlockingStub(channel);
}
public void getBook(String bookName) {
logger.info("Querying for book with title: " + bookName);
BookSearch request =BookSearch.newBuilder().setName(bookName).build();
Book response;
try {
response = blockingStub.withDeadlineAfter(2,TimeUnit.SECONDS).first(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}",e.getStatus());
return;
}
logger.info("Got following book from server: " +response);
}
public static void main(String[] args) throws Exception {
String bookName = args[0];
String serverAddress = "localhost:50051";
ManagedChannel channel =ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().build();
try {
BookStoreClientUnaryBlockingTimeout client = newBookStoreClientUnaryBlockingTimeout(channel);
client.getBook(bookName);
} finally {
channel.shutdownNow().awaitTermination(5,
TimeUnit.SECONDS);
}
}
}
上面的代码使用标题调用服务器进行搜索。但更重要的是,它为gRPC调用提供了2秒的超时时间。
现在让我们看看它的运行情况。要运行代码,请启动两个shell。在第一个shell上执行以下命令启动服务器:
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerUnaryTimeout
我们将看到以下输出 -
输出
Jul 31, 2021 12:29:31 PM com.tp.bookstore.BookeStoreServerUnaryTimeout start INFO: Server started, listening on 50051
以上输出表明服务器已启动。
Jul 31, 2021 12:29:35 PM com.tp.bookstore.BookeStoreServerUnaryTimeout$BookStoreImpl first INFO: Searching for book with title: Great Jul 31, 2021 12:29:35 PM com.tp.bookstore.BookeStoreServerUnaryTimeout$BookStoreImpl first INFO: This may take more time...
现在,让我们启动客户端。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientUnaryBlockingTimeout Great
我们将得到以下输出:
输出
Jul 31, 2021 12:29:34 PM
com.tp.bookstore.BookStoreClientUnaryBlockingTimeout getBook
INFO: Querying for book with title: Great
Jul 31, 2021 12:29:36 PM
com.tp.bookstore.BookStoreClientUnaryBlockingTimeout getBook
WARNING: RPC failed: Status{code=DEADLINE_EXCEEDED,
description=deadline exceeded after 1.970455800s.
[buffered_nanos=816522700,
remote_addr=localhost/127.0.0.1:50051], cause=null}
因此,正如我们所看到的,客户端在2秒内没有收到响应,因此它取消了请求并将其称为超时,即DEADLINE_EXCEEDED
请求取消
gRPC支持客户端和服务器端都取消请求。客户端可以在运行时指定它希望在取消请求之前等待的时间。服务器也可以在其端检查请求是否需要处理,或者客户端是否已经放弃了请求。
让我们看一个客户端流式传输的例子,其中客户端调用取消。所以,这是我们的服务器代码
示例
package com.tp.bookstore;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
public class BookeStoreServerClientStreaming {
private static final Logger logger = Logger.getLogger(BookeStoreServerClientStreaming.class.getName());
static Map<String, Book> bookMap = new HashMap<>();
static {
bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby").setAuthor("Scott Fitzgerald").setPrice(300).build());
bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird").setAuthor("Harper Lee")
.setPrice(400).build());
bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India").setAuthor("E.M.Forster")
.setPrice(500).build());
bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise").setAuthor("Scott Fitzgerald")
.setPrice(600).build());
bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman").setAuthor("Harper Lee")
.setPrice(700).build());
}
private Server server;
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port).addService(new BookStoreImpl()).build().start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
public static void main(String[] args) throws IOException,InterruptedException {
final BookeStoreServerClientStreaming greetServer = newBookeStoreServerClientStreaming();
greetServer.start();
greetServer.server.awaitTermination();
}
static class BookStoreImpl extendsBookStoreGrpc.BookStoreImplBase {
@Override
public StreamObserver<Book>
totalCartValue(StreamObserver<Cart> responseObserver) {
return new StreamObserver<Book>() {
ArrayList<Book> bookCart = new ArrayList<Book>();
@Override
public void onNext(Book book) {
logger.info("Searching for book with titlestarting with: " + book.getName());
for (Entry<String, Book> bookEntry : bookMap.entrySet()) {
if(bookEntry.getValue().getName().startsWith(book.getName())){
logger.info("Found book, adding to cart:....");
bookCart.add(bookEntry.getValue());
}
}
}
@Override
public void onError(Throwable t) {
logger.info("Error while reading book stream:" + t);
}
@Override
public void onCompleted() {
int cartValue = 0;
for (Book book : bookCart) {
cartValue += book.getPrice();
}
responseObserver.onNext(Cart.newBuilder().setPrice(cartValue).setBooks(bookCart.size()).build());
responseObserver.onCompleted();
}
};
}
}
}
此服务器代码是客户端流式传输的一个简单示例。服务器简单地跟踪客户端想要的书籍,最后,它提供订单的总购物车价值。
但是这里关于请求取消没有什么特别之处,因为这是客户端将调用的。所以,让我们看看客户端代码。
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.Context;
import io.grpc.Context.CancellableContext;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreGrpc.BookStoreStub;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.Cart;
public class BookStoreClientStreamingClientCancelation {
private static final Logger logger = Logger.getLogger(BookStoreClientStreamingClientCancelation.class.getName());
private final BookStoreStub stub;
StreamObserver<Book> streamClientSender;
private CancellableContext withCancellation;
public BookStoreClientStreamingClientCancelation(Channel channel) {
stub = BookStoreGrpc.newStub(channel);
}
public StreamObserver<Cart> getServerResponseObserver(){
StreamObserver<Cart> observer = new StreamObserver>Cart<(){
@Override
public void onNext(Cart cart) {
logger.info("Order summary:"
+ "\nTotal number of Books: "
+ cart.getBooks()
+ "\nTotal Order Value: "
+ cart.getPrice());
}
@Override
public void onError(Throwable t) {
logger.info("Error while reading response from Server: " + t);
}
@Override
public void onCompleted() {
}
};
return observer;
}
public void addBook(String book) {
logger.info("Adding book with title starting with: " + book);
Book request = Book.newBuilder().setName(book).build();
if(streamClientSender == null) {
withCancellation = Context.current().withCancellation();
streamClientSender = stub.totalCartValue(getServerResponseObserver());
}
try {
streamClientSender.onNext(request);
}
catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
}
}
public void completeOrder() {
logger.info("Done, waiting for server to create order summary...");
if(streamClientSender != null);
streamClientSender.onCompleted();
}
public void cancelOrder() {
withCancellation.cancel(null);
}
public static void main(String[] args) throws Exception {
String serverAddress = "localhost:50051";
ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().build();
try {
BookStoreClientStreamingClientCancelation client = new BookStoreClientStreamingClientCancelation(channel); String bookName = "";
while(true) {
System.out.println("Type book name to be added to the cart....");
bookName = System.console().readLine();
if(bookName.equals("EXIT")) {
client.completeOrder();
break;
}
if(bookName.equals("CANCEL")) {
client.cancelOrder();
break;
}
client.addBook(bookName);
}
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
所以,如果我们看到上面的代码,以下行定义了一个启用了取消的上下文。
withCancellation = Context.current().withCancellation();
这是当用户输入取消时将调用的方法。这将取消订单,并让服务器知道。
public void cancelOrder() {
withCancellation.cancel(null);
}
现在让我们看看它的运行情况。要运行代码,请启动两个shell。在第一个shell上执行以下命令启动服务器。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerClientStreaming
我们将看到以下输出:
输出
Jul 31, 2021 3:29:58 PM com.tp.bookstore.BookeStoreServerClientStreaming start INFO: Server started, listening on 50051
上述输出意味着服务器已启动。
现在,让我们启动客户端
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientStreamingClientCancelation
我们将得到以下输出:
输出
Type book name to be added to the cart.... Great Jul 31, 2021 3:30:55 PM com.tp.bookstore.BookStoreClientStreamingClientCancelation addBook INFO: Adding book with title starting with: Great Type book name to be added to the cart.... CANCEL Jul 31, 2021 3:30:58 PM com.tp.bookstore.BookStoreClientStreamingClientCancelation$1 onError INFO: Error while reading response from Server: io.grpc.StatusRuntimeException: UNAVAILABLE: Channel shutdownNow invoked
我们将在服务器日志中获得以下数据:
INFO: Searching for book with title starting with: Great Jul 31, 2021 3:30:56 PM com.tp.bookstore.BookeStoreServerClientStreaming$BookStoreImp l$1 onNext INFO: Found book, adding to cart:.... Jul 31, 2021 3:30:58 PM com.tp.bookstore.BookeStoreServerClientStreaming$BookStoreImp l$1 onError INFO: Error while reading book stream: io.grpc.StatusRuntimeException: CANCELLED: client cancelled
因此,正如我们所看到的,客户端启动了对其发送到服务器的请求的取消。服务器也收到了关于取消的通知。
gRPC - 发送/接收元数据
gRPC支持发送元数据。元数据基本上是我们想要发送的一组数据,这些数据不是业务逻辑的一部分,同时进行gRPC调用。
让我们看看以下两种情况:
- 客户端发送元数据,服务器读取它。
- 服务器发送元数据,客户端读取它。
我们将逐一介绍这两种情况。
客户端发送元数据
如前所述,gRPC支持客户端发送服务器可以读取的元数据。gRPC支持扩展客户端和服务器拦截器,这些拦截器可分别用于写入和读取元数据。让我们举一个例子来更好地理解它。这是我们的客户端代码,它发送主机名作为元数据:
让我们举一个例子来更好地理解它。这是我们的客户端代码,它发送主机名作为元数据:
示例
package com.tp.bookstore;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
public class BookStoreClientUnaryBlockingMetadata {
private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlockingMetadata.class.getName());
private final BookStoreGrpc.BookStoreBlockingStub blockingStub;
public BookStoreClientUnaryBlockingMetadata(Channel channel) {
blockingStub = BookStoreGrpc.newBlockingStub(channel);
}
static class BookClientInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT>
interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next
) {
return new
ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
logger.info("Added metadata");
headers.put(Metadata.Key.of("HOSTNAME", ASCII_STRING_MARSHALLER), "MY_HOST");
super.start(responseListener, headers);
}
};
}
}
public void getBook(String bookName) {
logger.info("Querying for book with title: " + bookName);
BookSearch request = BookSearch.newBuilder().setName(bookName).build();
Book response;
CallOptions.Key<String> metaDataKey = CallOptions.Key.create("my_key");
try {
response = blockingStub.withOption(metaDataKey, "bar").first(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
logger.info("Got following book from server: " + response);
}
public static void main(String[] args) throws Exception {
String bookName = args[0];
String serverAddress = "localhost:50051";
ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().intercept(new BookClientInterceptor()).build();
try {
BookStoreClientUnaryBlockingMetadata client = new BookStoreClientUnaryBlockingMetadata(channel);
client.getBook(bookName);
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
这里有趣的部分是拦截器。
static class BookClientInterceptor implements ClientInterceptor{
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT>
interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new
ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT>responseListener, Metadata headers) {
logger.info("Added metadata");
headers.put(Metadata.Key.of("HOSTNAME", ASCII_STRING_MARSHALLER), "MY_HOST");
super.start(responseListener, headers);
}
};
}
}
我们拦截客户端发出的任何调用,然后在进一步调用之前向其添加主机名元数据。
服务器读取元数据
现在,让我们看看读取此元数据的服务器代码:
package com.tp.bookstore;
import io.grpc.CallOptions;
import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
public class BookeStoreServerMetadata {
private static final Logger logger = Logger.getLogger(BookeStoreServerMetadata.class.getName());
static Map<String, Book> bookMap = new HashMap<>();
static {
bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby").setAuthor("Scott Fitzgerald").setPrice(300).build());
}
private Server server;
class BookServerInterceptor implements ServerInterceptor{
@Override
public <ReqT, RespT> Listener<ReqT>
interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
logger.info("Recieved following metadata: " + headers);
return next.startCall(call, headers);
}
}
private void start() throws IOException {
int port = 50051; server = ServerBuilder.forPort(port).addService(new BookStoreImpl()).intercept(new BookServerInterceptor()).build().start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
public static void main(String[] args) throws IOException, InterruptedException {
final BookeStoreServerMetadata greetServer = new BookeStoreServerMetadata();
greetServer.start();
greetServer.server.awaitTermination();
}
static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
@Override
public void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
logger.info("Searching for book with title: " + searchQuery.getName());
List<String> matchingBookTitles = bookMap.keySet().stream().filter(title ->title.startsWith(searchQuery.getName().trim())).collect(Collectors.toList());
Book foundBook = null;
if(matchingBookTitles.size() > 0) {
foundBook = bookMap.get(matchingBookTitles.get(0));
}
responseObserver.onNext(foundBook);
responseObserver.onCompleted();
}
}
}
同样,这里有趣的部分是拦截器。
class BookServerInterceptor implements ServerInterceptor{
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,ServerCallHandler<ReqT, RespT> next) {
logger.info("Recieved following metadata: " + headers);
return next.startCall(call, headers);
}
}
我们拦截任何传入服务器的调用,然后在实际方法处理调用之前记录元数据。
客户端-服务器调用
现在让我们看看它的运行情况。要运行代码,请启动两个shell。在第一个shell上执行以下命令启动服务器:
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerMetadata
我们将看到以下输出 -
输出
Jul 31, 2021 5:29:14 PM com.tp.bookstore.BookeStoreServerMetadata start INFO: Server started, listening on 50051
上述输出意味着服务器已启动。
现在,让我们启动客户端。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientUnaryBlockingMetadata Great
我们将看到以下输出 -
输出
Jul 31, 2021 5:29:39 PM com.tp.bookstore.BookStoreClientUnaryBlockingMetadata getBook INFO: Querying for book with title: Great Jul 31, 2021 5:29:39 PM com.tp.bookstore.BookStoreClientUnaryBlockingMetadata$BookCli entInterceptor$1 start INFO: Added metadata Jul 31, 2021 5:29:40 PM com.tp.bookstore.BookStoreClientUnaryBlockingMetadata getBook INFO: Got following book from server: name: "Great Gatsby" author: "Scott Fitzgerald" price: 300
我们将在服务器日志中获得以下数据:
Jul 31, 2021 5:29:40 PM com.tp.bookstore.BookeStoreServerMetadata$BookServerIntercept or interceptCall INFO: Recieved following metadata: Metadata(content-type=application/grpc,user-agent=grpc-java-netty/1.38.0,hostname=MY_HOST,grpc-accept-encoding=gzip) Jul 31, 2021 5:29:40 PM com.tp.bookstore.BookeStoreServerMetadata$BookStoreImpl first INFO: Searching for book with title: Great
正如我们所看到的,服务器能够读取元数据:hostname=MY_HOST,该元数据由客户端添加。