gRPC - 快速指南



gRPC - 简介

在深入了解 gRPC 之前,让我们先简单了解一下远程过程调用,也就是 gRPC 所做的事情。

什么是远程过程调用?

远程过程调用是看起来像普通/本地函数调用一样的函数调用,但不同之处在于远程函数调用通常在不同的机器上执行。但是,对于编写代码的开发人员来说,函数调用和远程调用之间的区别很小。这些调用通常遵循客户端-服务器模型,其中执行调用的机器充当服务器。

为什么我们需要远程过程调用?

远程过程调用提供了一种在另一台机器上执行代码的方法。在大型、庞大的产品中,这变得至关重要,因为单台机器无法承载产品正常运行所需的所有代码。

在微服务架构中,应用程序被分解成小的服务,这些服务通过消息队列和 API 相互通信。所有这些通信都发生在网络上,不同的机器/节点根据它们所承载的服务提供不同的功能。因此,在分布式环境中工作时,创建远程过程调用成为一个关键方面。

为什么选择 gRPC?

Google 远程过程调用 (gRPC) 提供了一个执行远程过程调用的框架。但是,还有一些其他库和机制可以在远程机器上执行代码。那么,是什么让 gRPC 如此特别呢?让我们来了解一下。

  • 语言无关性 - gRPC 在内部使用 Google Protocol Buffer。因此,可以使用多种语言,例如 Java、Python、Go、Dart 等。Java 客户端可以进行过程调用,而使用 Python 的服务器可以做出响应,从而有效地实现语言无关性。

  • 高效的数据压缩 - 在微服务环境中,考虑到网络上会发生多次通信,因此我们发送的数据尽可能简洁至关重要。我们需要避免任何多余的数据,以确保数据快速传输。鉴于 gRPC 在内部使用 Google Protocol Buffer,因此它可以利用此功能。

  • 高效的序列化和反序列化 - 在微服务环境中,考虑到网络上会发生多次通信,因此我们尽可能快速地序列化和反序列化数据至关重要。鉴于 gRPC 在内部使用 Google Protocol Buffer,因此它可以确保快速序列化和反序列化数据。

  • 易于使用 - gRPC 已经拥有一个库和插件,可以自动生成过程代码(我们将在后续章节中看到)。对于简单的用例,它可以像本地函数调用一样使用。

gRPC 与使用 JSON 的 REST 的比较

让我们看看其他通过网络传输数据的方式与 Protobuf 相比如何。

功能 gRPC 使用 JSON/XML 的 HTTP
语言无关性
HTTP 版本 HTTP/2 HTTP 1.1
指定域模式 .proto 文件(Google Protocol Buffer)
序列化数据大小 最小 高(XML 更高)
人类可读性 否,因为它使用单独的编码模式 是,因为它使用基于文本的格式
序列化速度 最快 较慢(XML 最慢)
数据类型支持 更丰富。支持复杂数据类型,如 Any、oneof 等。 支持基本数据类型
支持演变模式

gRPC - 设置

Protoc 设置

请注意,此设置仅适用于 Python。对于 Java,所有这些都由 Maven 文件处理。让我们安装“proto”二进制文件,我们将使用它来自动生成“.proto”文件的代码。二进制文件可以在 https://github.com/protocolbuffers/protobuf/releases/ 中找到。根据操作系统选择正确的二进制文件。我们将在 Windows 上安装 proto 二进制文件,但 Linux 的步骤差别不大。

安装完成后,确保您可以通过命令行访问它 -

protoc --version
libprotoc 3.15.6

这意味着 Protobuf 已正确安装。现在,让我们转到项目结构。

我们还需要设置 gRPC 代码生成所需的插件。

对于 Python,我们需要执行以下命令 -

python -m pip install grpcio
python -m pip install grpcio-tools

它将安装所有必需的二进制文件并将它们添加到路径中。

项目结构

以下是我们将拥有的整体项目结构 -

Project Structure

与各个语言相关的代码进入各自的目录。我们将有一个单独的目录来存储我们的 proto 文件。并且,以下是我们将用于 Java 的项目结构 -

Project Structure

项目依赖项

现在我们已经安装了protoc,我们可以使用protoc从 proto 文件自动生成代码。让我们先创建一个 Java 项目。

以下是我们将用于 Java 项目的 Maven 配置。请注意,它还包含 Protobuf 所需的库。

示例

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
   http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <groupId>com.grpc.point</groupId>
   <artifactId>grpc-point</artifactId>
   <version>1.0</version>
   <packaging>jar</packaging>

   <properties>
      <maven.compiler.source>1.8</maven.compiler.source>
      <maven.compiler.target>1.8</maven.compiler.target>
   </properties>

   <dependencies>
      <dependency>
         <groupId>io.grpc</groupId>
         <artifactId>grpc-netty-shaded</artifactId>
         <version>1.38.0</version>
      </dependency>
      <dependency>
         <groupId>io.grpc</groupId>
         <artifactId>grpc-protobuf</artifactId>
         <version>1.38.0</version>
      </dependency>
      <dependency>
         <groupId>io.grpc</groupId>
         <artifactId>grpc-stub</artifactId>
         <version>1.38.0</version>
      </dependency>
      <dependency> <!-- necessary for Java 9+ -->
         <groupId>org.apache.tomcat</groupId>
         <artifactId>annotations-api</artifactId>
         <version>6.0.53</version>
         <scope>provided</scope>
      </dependency>
      <!-- https://mvnrepository.com/artifact/org.slf4j/slf4jsimple-->
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-simple</artifactId>
         <version>1.7.30</version>
      </dependency>
   </dependencies>
      
   <build>
      <extensions>
         <extension>
            <groupId>kr.motd.maven</groupId>
            <artifactId>os-maven-plugin</artifactId>
            <version>1.6.2</version>
            </extension>
      </extensions>
      <plugins>
         <plugin>
            <groupId>org.codehaus.mojo</groupId>
            <artifactId>build-helper-maven-plugin</artifactId>
            <version>1.1</version>
            <executions>
               <execution>
                  <id>test</id>
                  <phase>generate-sources</phase>
                  <goals>
                     <goal>add-source</goal>
                  </goals>
                  <configuration>
                     <sources>
                        <source>${basedir}/target/generated-sources</source>
                     </sources>
                  </configuration>
               </execution>
            </executions>
         </plugin>
         <plugin>
            <groupId>org.xolstice.maven.plugins</groupId>
            <artifactId>protobuf-maven-plugin</artifactId>
            <version>0.6.1</version>
            <configuration>
               <protocArtifact>com.google.protobuf:protoc:3.12.0:exe:${os.detected.classifier}</protocArtifact>
               <pluginId>grpc-java</pluginId>
               <pluginArtifact>io.grpc:protoc-gen-grpcjava:1.38.0:exe:${os.detected.classifier}</pluginArtifact>
               <protoSourceRoot>../common_proto_files</protoSourceRoot>
            </configuration>
            <executions>
               <execution>
                  <goals>
                     <goal>compile</goal>
                     <goal>compile-custom</goal>
                  </goals>
               </execution>
            </executions>
         </plugin>
         <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.2.4</version>
            <configuration>
            </configuration>
            <executions>
               <execution>
                  <phase>package</phase>
                  <goals>
                     <goal>shade</goal>
                  </goals>
               </execution>
            </executions>
         </plugin>
      </plugins>
   </build>
</project>

gRPC - 使用 Java 的 Hello World 应用

现在让我们创建一个基本的类似“Hello World”的应用程序,它将使用 gRPC 和 Java。

.proto 文件

首先让我们在common_proto_files中定义“greeting.proto”文件 -

syntax = "proto3";
option java_package = "com.tp.greeting";
service Greeter {
   rpc greet (ClientInput) returns (ServerOutput) {}
}
message ClientInput {
   string greeting = 1;
   string name = 2;
}
message ServerOutput {
   string message = 1;
}

现在让我们仔细看看这个代码块,并了解每一行的作用 -

syntax = "proto3";

此处的“syntax”表示我们正在使用的 Protobuf 版本。因此,我们使用最新版本 3,因此模式可以使用所有对版本 3 有效的语法。

package tutorial;

此处的包用于冲突解决,例如,如果我们有多个名称相同的类/成员。

option java_package = "com.tp.greeting";

此参数特定于 Java,即从“.proto”文件自动生成代码的包。

service Greeter {
   rpc greet(ClientInput) returns (ServerOutput) {}
}

此块表示服务“Greeter”的名称和可以调用的函数名称“greet”。“greet”函数接收类型为“ClientInput”的输入并返回类型为“ServerOutput”的输出。现在让我们看看这些类型。

message ClientInput {
   string greeting = 1;
   string name = 2;
}

在上面的代码块中,我们定义了ClientInput,它包含两个属性“greeting”“name”,它们都是字符串。客户端应该将类型为“ClientInput”的对象发送到服务器。

message ServerOutput {
   string message = 1;
}

在上面的代码块中,我们定义了,给定一个“ClientInput”,服务器将返回“ServerOutput”,它将包含一个名为“message”的属性。服务器应该将类型为“ServerOutput”的对象发送到客户端。

请注意,我们已经完成了 Maven 设置以自动生成我们的类文件以及我们的 RPC 代码。因此,现在我们可以简单地编译我们的项目 -

mvn clean install

这应该会自动生成我们使用 gRPC 所需的源代码。源代码将放置在 -

Protobuf class code: target/generated-sources/protobuf/java/com.tp.greeting
Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.greeting

设置 gRPC 服务器

现在我们已经定义了包含函数定义的 proto 文件,让我们设置一个可以服务于调用这些函数的服务器。

让我们编写服务器代码来服务于上述函数,并将其保存在com.tp.grpc.GreetServer.java中 -

示例

package com.tp.grpc;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ClientInput;
import com.tp.greeting.Greeting.ServerOutput;

public class GreetServer {
   private static final Logger logger = Logger.getLogger(GreetServer.class.getName());
   private Server server;
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port).addService(new GreeterImpl()).build().start();
       
      logger.info("Server started, listening on " + port);
 
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         public void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
      @Override
      public void greet(ClientInput req, StreamObserver<ServerOutput> responseObserver) {
         logger.info("Got request from client: " + req);
         ServerOutput reply = ServerOutput.newBuilder().setMessage(
            "Server says " + "\"" + req.getGreeting() + " " + req.getName() + "\""
         ).build();
         responseObserver.onNext(reply);
         responseObserver.onCompleted();
      }
   }
   public static void main(String[] args) throws IOException, InterruptedException {
      final GreetServer greetServer = new GreetServer();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
} 

上面的代码在指定的端口启动一个 gRPC 服务器,并服务于我们在 proto 文件中编写的函数和服务。让我们遍历上面的代码 -

  • main方法开始,我们在指定的端口创建一个 gRPC 服务器。

  • 但在启动服务器之前,我们将服务器分配给我们要运行的服务,即在我们的例子中,是Greeter服务。

  • 为此,我们需要将服务实例传递给服务器,因此我们继续创建服务实例,即在我们的例子中,是GreeterImpl

  • 服务实例需要提供“.proto”文件中存在的函数/方法的实现,即在我们的例子中,是greet方法。

  • 该方法期望一个类型与“.proto”文件中定义的类型相同的对象,即在我们的例子中,是ClientInput

  • 该方法处理上述输入,执行计算,然后应该在“.proto”文件中返回提到的输出,即在我们的例子中,是ServerOutput

  • 最后,我们还有一个shutdown钩子,以确保在完成代码执行后服务器能够干净地关闭。

设置 gRPC 客户端

现在我们已经编写了服务器的代码,让我们设置一个可以调用这些函数的客户端。

让我们编写客户端代码来调用上述函数,并将其保存在com.tp.grpc.GreetClient.java中 -

示例

package com.tp.grpc;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;

public class GreetClient {
   private static final Logger logger = Logger.getLogger(GreetClient.class.getName());
   private final GreeterGrpc.GreeterBlockingStub blockingStub;
   
   public GreetClient(Channel channel) {
      blockingStub = GreeterGrpc.newBlockingStub(channel);
   }
   public void makeGreeting(String greeting, String username) {
      logger.info("Sending greeting to server: " + greeting + " for name: " + username);
      ClientInput request = ClientInput.newBuilder().setName(username).setGreeting(greeting).build();
      logger.info("Sending to server: " + request);
      ServerOutput response;
      try {
         response = blockingStub.greet(request);
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
         return;
      }
      logger.info("Got following from the server: " + response.getMessage());
   }
   
   public static void main(String[] args) throws Exception {
      String greeting = args[0];
      String username = args[1];
      String serverAddress = "localhost:50051";
	   ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
         .usePlaintext()
         .build();
      try {
         GreetClient client = new GreetClient(channel);
         client.makeGreeting(greeting, username);
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

上面的代码在指定的端口启动一个 gRPC 服务器,并服务于我们在 proto 文件中编写的函数和服务。让我们遍历上面的代码 -

  • main方法开始,我们接受两个参数,即namegreeting

  • 我们为 gRPC 与服务器的通信设置了一个 Channel。

  • 接下来,我们使用创建的 Channel 创建一个阻塞式存根。在这里,我们拥有服务“Greeter”,我们计划调用它的函数。存根只不过是一个包装器,它隐藏了远程调用的复杂性,使调用者无需关心。

  • 然后,我们简单地创建“.proto”文件中定义的预期输入,即在我们的例子中,是ClientInput

  • 我们最终进行调用并等待服务器的响应。

  • 最后,我们关闭 Channel 以避免任何资源泄漏。

因此,这就是我们的客户端代码。

客户端服务器调用

现在,我们已经定义了proto文件,编写了服务器和客户端代码,让我们继续执行此代码并查看实际效果。

要运行代码,请启动两个 shell。通过执行以下命令在第一个 shell 中启动服务器 -

java -cp .\target\grpc-point-1.0.jar com.tp.grpc.GreetServer

我们将看到以下输出 -

输出

Jul 03, 2021 1:04:23 PM com.tp.grpc.GreetServer start
INFO: Server started, listening on 50051

上述输出意味着服务器已启动。

现在,让我们启动客户端。

java -cp .\target\grpc-point-1.0.jar com.tp.grpc.GreetClient 
Hello Jane

我们将看到以下输出 -

输出

Jul 03, 2021 1:05:59 PM com.tp.grpc.GreetClient greet
INFO: Sending greeting to server: Hello for name: Jane
Jul 03, 2021 1:05:59 PM com.tp.grpc.GreetClient greet
INFO: Sending to server: greeting: "Hello"
name: "Jane"

Jul 03, 2021 1:06:00 PM com.tp.grpc.GreetClient greet
INFO: Got following from the server: Server says "Hello Jane"

现在,如果我们打开服务器日志,我们将看到以下内容 -

Jul 03, 2021 1:04:23 PM com.tp.grpc.GreetServer start
INFO: Server started, listening on 50051
Jul 03, 2021 1:06:00 PM com.tp.grpc.GreetServer$GreeterImpl 
greet
INFO: Got request from client: greeting: "Hello"
name: "Jane"

因此,客户端能够按预期调用服务器,并且服务器通过向客户端发送问候进行了响应。

gRPC - 使用 Python 的 Hello World 应用

现在让我们创建一个基本的类似“Hello World”的应用程序,它将使用 gRPC 和 Python。

.proto 文件

首先让我们在common_proto_files中定义greeting.proto文件 -

syntax = "proto3";

service Greeter {
   rpc greet (ClientInput) returns (ServerOutput) {}
}
message ClientInput {
   string greeting = 1;
   string name = 2;
}
message ServerOutput {
   string message = 1;
}

现在让我们仔细看看上面代码块中的每一行 -

syntax = "proto3";

此处的“syntax”表示我们正在使用的 Protobuf 版本。因此,我们使用最新版本 3,因此模式可以使用所有对版本 3 有效的语法。

package tutorial;

此处的package用于冲突解决,例如,如果我们有多个名称相同的类/成员。

service Greeter {
   rpc greet(ClientInput) returns (ServerOutput) {}
}

此块表示服务“Greeter”的名称和可以调用的函数名称“greet”。“greet”函数接收类型为“ClientInput”的输入并返回类型为“ServerOutput”的输出。现在让我们看看这些类型。

message ClientInput {
   string greeting = 1;
   string name = 2;
}

在上面的代码块中,我们定义了ClientInput,它包含两个属性“greeting”和“name”,它们都是字符串。客户端应该将类型为“ClientInput”的对象发送到服务器。

message ServerOutput {
   string message = 1;
}

这里,我们还定义了,给定一个“ClientInput”,服务器将返回一个包含单个属性“message”的“ServerOutput”。服务器应该将类型为“ServerOutput”的对象发送到客户端。

现在,让我们为 Protobuf 类和 gRPC 类生成底层代码。为此,我们需要执行以下命令:

python -m grpc_tools.protoc -I ..\common_proto_files\ --
python_out=../python --grpc_python_out=. greeting.proto

但是,请注意,要执行该命令,我们需要安装教程的设置部分中提到的正确依赖项。

这应该会自动生成我们使用 gRPC 所需的源代码。源代码将放置在 -

Protobuf class code: python/greeting_pb2.py
Protobuf gRPC code: python/greeting_pb2_grpcpb2.py

设置 gRPC 服务器

现在我们已经定义了包含函数定义的 proto 文件,让我们设置一个可以调用这些函数的服务器。

让我们编写我们的服务器代码来服务上述函数,并将其保存在server.py中:

示例

from concurrent import futures

import grpc
import greeting_pb2
import greeting_pb2_grpc

class Greeter(greeting_pb2_grpc.GreeterServicer):
   def greet(self, request, context):
      print("Got request " + str(request))
      return greeting_pb2.ServerOutput(message='{0} {1}!'.format(request.greeting, request.name))
	  
def server():
   server = grpc.server(futures.ThreadPoolExecutor(max_workers=2))
   greeting_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
   server.add_insecure_port('[::]:50051')
   print("gRPC starting")
   server.start()
   server.wait_for_termination()
server()

上面的代码在指定的端口启动一个 gRPC 服务器,并服务于我们在 proto 文件中编写的函数和服务。让我们遍历上面的代码 -

  • main方法开始,我们在指定的端口上创建一个 gRPC 服务器。

  • 但在启动服务器之前,我们将服务器分配给我们要运行的服务,即在我们的例子中,是Greeter服务。

  • 为此,我们需要将服务实例传递给服务器,因此我们继续创建一个服务实例,即在我们的例子中为Greeter

  • 服务实例需要提供.proto文件中存在的 method/function 的实现,即在我们的例子中为greet方法。

  • 该方法期望一个类型为 .proto 文件中定义的对象,即对我们来说是request

  • 该方法处理上述输入,进行计算,然后应该返回.proto文件中提到的输出,即在我们的例子中为ServerOutput

设置 gRPC 客户端

现在我们已经编写了服务器的代码,让我们设置一个可以调用这些函数的客户端。

让我们编写我们的客户端代码来调用上述函数,并将其保存在client.py中:

示例

import grpc

import greeting_pb2
import greeting_pb2_grpc

def run():
   with grpc.insecure_channel('localhost:50051') as channel:
      stub = greeting_pb2_grpc.GreeterStub(channel)
      response = stub.greet(greeting_pb2.ClientInput(name='John', greeting = "Yo"))
   print("Greeter client received following from server: " + response.message)   
run()

上面的代码在指定的端口启动一个 gRPC 服务器,并服务于我们在 proto 文件中编写的函数和服务。让我们遍历上面的代码 -

  • main方法开始,我们为与服务器的 gRPC 通信设置了一个 Channel。

  • 然后,我们使用 channel 创建一个stub。在这里,我们使用服务“Greeter”,我们计划调用其函数。stub 只是一个包装器,它隐藏了远程调用对调用者的复杂性。

  • 然后,我们简单地创建 proto 文件中定义的预期输入,即在我们的例子中为ClientInput。我们硬编码了两个参数,即namegreeting

  • 我们最终进行调用并等待服务器的结果。

因此,这就是我们的客户端代码。

客户端服务器调用

现在,我们已经定义了proto文件,编写了服务器和客户端代码,让我们继续执行此代码并查看实际情况。

要运行代码,请启动两个 shell。通过执行以下命令在第一个 shell 中启动服务器 -

python .\server.py

输出

我们将得到以下输出:

gRPC starting

上述输出表示服务器已启动。

现在,让我们启动客户端。

python .\client.py

我们将看到以下输出 -

输出

Greeter client received following from server: Yo John!

现在,如果我们打开服务器日志,我们将看到以下数据:

gRPC starting
Got request greeting: "Yo"
name: "John"

因此,正如我们所看到的,客户端能够按预期调用服务器,并且服务器响应了向客户端回送问候。

gRPC - 一元 gRPC

我们现在将了解 gRPC 框架支持的各种类型的通信。我们将使用书店的例子,客户可以在其中搜索和下单送书。

让我们看看一元 gRPC 通信,我们让客户端搜索标题并随机返回一个与查询的标题匹配的书籍。

.proto 文件

首先让我们在 common_proto_files 中定义 bookstore.proto 文件:

syntax = "proto3";
option java_package = "com.tp.bookstore";

service BookStore {
   rpc first (BookSearch) returns (Book) {}
}
message BookSearch {
   string name = 1;
   string author = 2;
   string genre = 3;
}
message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}

现在让我们仔细看看上面代码块中的每一行。

syntax = "proto3";

语法

这里的“语法”表示我们使用的 Protobuf 版本。我们使用最新的版本 3,因此该模式可以使用所有对版本 3 有效的语法。

package tutorial;

此处的package用于冲突解决,例如,如果我们有多个名称相同的类/成员。

option java_package = "com.tp.bookstore";

此参数特定于 Java,即从 .proto 文件自动生成代码的包。

service BookStore {
   rpc first (BookSearch) returns (Book) {}
}

这表示服务的名称"BookStore"和可以调用的函数名称"first""first"函数接收类型为"BookSearch"的输入并返回类型为"Book"的输出。因此,实际上,我们让客户端搜索标题并返回一个与查询的标题匹配的书籍。

现在让我们看看这些类型。

message BookSearch {
   string name = 1;
   string author = 2;
   string genre = 3;
}

在上面的代码块中,我们定义了 BookSearch,它包含诸如name、authorgenre之类的属性。客户端应该将类型为"BookSearch"的对象发送到服务器。

message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}

这里,我们还定义了,给定一个“BookSearch”,服务器将返回一个包含书籍属性以及书籍价格的“Book”。服务器应该将类型为“Book”的对象发送到客户端。

请注意,我们已经完成了 Maven 的设置,以便自动生成我们的类文件以及我们的 RPC 代码。因此,现在我们可以简单地编译我们的项目:

mvn clean install

这应该会自动生成我们使用 gRPC 所需的源代码。源代码将放置在 -

Protobuf class code: target/generated-sources/protobuf/java/com.tp.bookstore
Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.bookstore

设置 gRPC 服务器

现在我们已经定义了包含函数定义的 proto 文件,让我们设置一个可以调用这些函数的服务器。

让我们编写我们的服务器代码来服务上述函数,并将其保存在com.tp.bookstore.BookeStoreServerUnary.java中:

示例

package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
public class BookeStoreServerUnary {
   private static final Logger logger = Logger.getLogger(BookeStoreServerUnary.class.getName());
 
   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
         .setAuthor("Scott Fitzgerald")
         .setPrice(300).build());
      bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
         .setAuthor("Harper Lee")
         .setPrice(400).build());
      bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
         .setAuthor("E.M.Forster")
         .setPrice(500).build());
      bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
         .setAuthor("Scott Fitzgerald")
         .setPrice(600).build());
      bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman")
         .setAuthor("Harper Lee")
         .setPrice(700).build());
   }
   private Server server;
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port)
         .addService(new BookStoreImpl()).build().start();
 
      logger.info("Server started, listening on " + port);
 
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         public void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   public static void main(String[] args) throws IOException, InterruptedException {
      final BookeStoreServerUnary greetServer = new BookeStoreServerUnary();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
      @Override
      public void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
         logger.info("Searching for book with title: " + searchQuery.getName());
         List<String> matchingBookTitles = bookMap.keySet().stream().filter(title ->
            title.startsWith(searchQuery.getName().trim())).collect(Collectors.toList());

         Book foundBook = null;
         if(matchingBookTitles.size() > 0) {
            foundBook = bookMap.get(matchingBookTitles.get(0));
         }
         responseObserver.onNext(foundBook);
         responseObserver.onCompleted();
      }
   }
}

上面的代码在指定的端口启动一个 gRPC 服务器,并服务于我们在 proto 文件中编写的函数和服务。让我们遍历上面的代码 -

  • main方法开始,我们在指定的端口创建一个 gRPC 服务器。

  • 但在启动服务器之前,我们将要运行的服务分配给服务器,即在我们的例子中为BookStore服务。

  • 为此,我们需要将服务实例传递给服务器,因此我们继续创建一个服务实例,即在我们的例子中为BookStoreImpl

  • 服务实例需要提供.proto文件中存在的 method/function 的实现,即在我们的例子中为first方法。

  • 该方法期望一个类型为 .proto 文件中定义的对象,即对我们来说是BookSearch

  • 该方法在可用的 bookMap 中搜索书籍,然后通过调用onNext()方法返回Book。完成后,服务器通过调用onCompleted()宣布它已完成输出。

  • 最后,我们还有一个关闭钩子,以确保在我们完成代码执行时服务器能够干净地关闭。

设置 gRPC 客户端

现在我们已经编写了服务器的代码,让我们设置一个可以调用这些函数的客户端。

让我们编写我们的客户端代码来调用上述函数,并将其保存在com.tp.bookstore.BookStoreClientUnaryBlocking.java中:

示例

package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;

public class BookStoreClientUnaryBlocking {
   private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlocking.class.getName());
   private final BookStoreGrpc.BookStoreBlockingStub blockingStub;
	
   public BookStoreClientUnaryBlocking(Channel channel) {
      blockingStub = BookStoreGrpc.newBlockingStub(channel);
   }
   public void getBook(String bookName) {
      logger.info("Querying for book with title: " + bookName);
      BookSearch request = BookSearch.newBuilder().setName(bookName).build();
 
   Book response; 
   try {
      response = blockingStub.first(request);
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
         return;
      }
      logger.info("Got following book from server: " + response);
   }
   public static void main(String[] args) throws Exception {
      String bookName = args[0];
      String serverAddress = "localhost:50051";
	  
      ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
         .usePlaintext()
         .build();
 
      try {
         BookStoreClientUnaryBlocking client = new 
         BookStoreClientUnaryBlocking(channel);
         client.getBook(bookName);
      } finally {
         channel.shutdownNow().awaitTermination(5, 
         TimeUnit.SECONDS);
      }
   }
}

上述代码在指定的端口上启动一个 gRPC 服务器,并为我们在proto文件中编写的函数和服务提供服务。让我们逐步了解上面的代码:

  • main方法开始,我们接受一个参数,即我们要搜索的书籍的标题。

  • 我们为与服务器的 gRPC 通信设置了一个 Channel。

  • 然后,我们使用 channel 创建一个阻塞 stub。在这里,我们选择服务“BookStore”,我们计划调用其函数。“stub”只是一个包装器,它隐藏了远程调用对调用者的复杂性。

  • 然后,我们简单地创建 proto 文件中定义的预期输入,即在我们的例子中为BookSearch,并添加我们希望服务器搜索的标题名称。

  • 我们最终进行调用并等待服务器的结果。

  • 最后,我们关闭 Channel 以避免任何资源泄漏。

因此,这就是我们的客户端代码。

客户端服务器调用

总而言之,我们要做的是:

  • 启动 gRPC 服务器。

  • 客户端查询服务器以获取具有给定名称/标题的书籍。

  • 服务器在其商店中搜索书籍。

  • 服务器然后响应书籍及其其他属性。

现在,我们已经定义了proto文件,编写了服务器和客户端代码,让我们继续执行此代码并查看实际情况。

要运行代码,请启动两个 shell。通过执行以下命令在第一个 shell 中启动服务器 -

java -cp .\target\grpc-point-1.0.jar 
com.tp.bookstore.BookeStoreServerUnary

我们将看到以下输出 -

输出

Jul 03, 2021 7:21:58 PM 
com.tp.bookstore.BookeStoreServerUnary start
INFO: Server started, listening on 50051

上述输出表示服务器已启动。

现在,让我们启动客户端。

java -cp .\target\grpc-point-1.0.jar 
com.tp.bookstore.BookStoreClientUnaryBlocking "To Kill"

我们将看到以下输出 -

输出

Jul 03, 2021 7:22:03 PM 
com.tp.bookstore.BookStoreClientUnaryBlocking getBook
INFO: Querying for book with title: To Kill

Jul 03, 2021 7:22:04 PM 
com.tp.bookstore.BookStoreClientUnaryBlocking getBook
INFO: Got following book from server: name: "To Kill 

MockingBird"
author: "Harper Lee"
price: 400

因此,正如我们所看到的,客户端能够通过使用书籍名称查询服务器来获取书籍详细信息。

gRPC - 服务器端流式 RPC

现在让我们讨论在使用 gRPC 通信时服务器流是如何工作的。在这种情况下,客户端将搜索具有给定作者的书籍。假设服务器需要一些时间才能遍历所有书籍。服务器不是在遍历完所有书籍后等待提供所有书籍,而是会以流式方式提供书籍,即一旦找到一本就提供一本。

.proto 文件

首先让我们在 common_proto_files 中定义 bookstore.proto 文件:

syntax = "proto3";
option java_package = "com.tp.bookstore";
service BookStore {
   rpc first (BookSearch) returns (stream Book) {}
}
message BookSearch {
   string name = 1;
   string author = 2;
   string genre = 3;
}
message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}

以下代码块表示服务的名称"BookStore"和可以调用的函数名称"searchByAuthor"。"searchByAuthor"函数接收类型为"BookSearch"的输入并返回类型为"Book"的流。因此,实际上,我们让客户端搜索标题并返回一个与查询的作者匹配的书籍。

service BookStore {
   rpc searchByAuthor (BookSearch) returns (stream Book) {}
}

现在让我们看看这些类型。

message BookSearch {
   string name = 1;
   string author = 2;
   string genre = 3;
}

这里,我们定义了BookSearch,它包含一些属性,如nameauthorgenre。客户端应该将类型为"BookSearch"的对象发送到服务器。

message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}

我们还定义了,给定一个"BookSearch",服务器将返回一个包含书籍属性以及书籍价格的"Book"流。服务器应该发送一个"Book"流。

请注意,我们已经完成了 Maven 的设置,以便自动生成我们的类文件以及我们的 RPC 代码。因此,现在我们可以简单地编译我们的项目:

mvn clean install

这应该会自动生成我们使用 gRPC 所需的源代码。源代码将放置在 -

Protobuf class code: target/generated-sources/protobuf/java/com.tp.bookstore
Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.bookstore

设置 gRPC 服务器

现在我们已经定义了包含函数定义的 proto 文件,让我们设置一个可以服务调用这些函数的服务器。

让我们编写我们的服务器代码来服务上述函数,并将其保存在com.tp.bookstore.BookeStoreServerStreaming.java中:

示例

package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;

public class BookeStoreServerUnary {
   private static final Logger logger = Logger.getLogger(BookeStoreServerrStreaming.class.getName());
 
   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
         .setAuthor("Scott Fitzgerald")
         .setPrice(300).build());
      bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
         .setAuthor("Harper Lee")
         .setPrice(400).build());
      bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
         .setAuthor("E.M.Forster")
         .setPrice(500).build());
      bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
         .setAuthor("Scott Fitzgerald")
         .setPrice(600).build());
      bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman")
         .setAuthor("Harper Lee")
         .setPrice(700).build());
   }
   private Server server;
   
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port)
         .addService(new BookStoreImpl()).build().start();
 
      logger.info("Server started, listening on " + port);
 
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         public void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   public static void main(String[] args) throws IOException, InterruptedException {
      final BookeStoreServerUnary greetServer = new BookeStoreServerUnary();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
      @Override
      public void searchByAuthor(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
         logger.info("Searching for book with author: " + searchQuery.getAuthor());
         for (Entry<String, Book> bookEntry : bookMap.entrySet()) {
            try {
               logger.info("Going through more books....");
               Thread.sleep(5000);
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
    
            if(bookEntry.getValue().getAuthor().startsWith(searchQuery.getAuthor())){
               logger.info("Found book with required author: " + bookEntry.getValue().getName()+ ". Sending....");

               responseObserver.onNext(bookEntry.getValue());
            } 
         }
         responseObserver.onCompleted();
      }
   }
}

上述代码在指定的端口上启动一个 gRPC 服务器,并为我们在proto文件中编写的函数和服务提供服务。让我们逐步了解上面的代码:

  • main方法开始,我们在指定的端口创建一个 gRPC 服务器。

  • 但在启动服务器之前,我们将要运行的服务分配给服务器,即在我们的例子中为BookStore服务。

  • 为此,我们需要将服务实例传递给服务器,因此我们继续创建一个服务实例,即在我们的例子中为BookStoreImpl

  • 服务实例需要提供.proto 文件中存在的 method/function 的实现,即在我们的例子中为searchByAuthor方法。

  • 该方法期望一个类型为 .proto 文件中定义的对象,即对我们来说是BookSearch

  • 请注意,我们添加了一个 sleep 来模拟遍历所有书籍的操作。在流式传输的情况下,服务器不会等待所有搜索到的书籍都可用。它在书籍可用时通过使用onNext()调用返回书籍。

  • 当服务器完成请求时,它通过调用onCompleted()关闭通道。

  • 最后,我们还有一个关闭钩子,以确保在我们完成代码执行时服务器能够干净地关闭。

设置 gRPC 客户端

现在我们已经编写了服务器的代码,让我们设置一个可以调用这些函数的客户端。

让我们编写我们的客户端代码来调用上述函数,并将其保存在com.tp.bookstore.BookStoreClientServerStreamingBlocking.java 中:

示例

package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;

public class BookStoreClientServerStreamingBlocking {
   private static final Logger logger = Logger.getLogger(BookStoreClientServerStreamingBlocking.class.getName());
   private final BookStoreGrpc.BookStoreBlockingStub blockingStub;
	public BookStoreClientServerStreamingBlocking(Channel channel) {
      blockingStub = BookStoreGrpc.newBlockingStub(channel);
   }
   public void getBook((String author) {
      logger.info("Querying for book with author: " + author);
      BookSearch request = BookSearch.newBuilder()..setAuthor(author).build();
      Iterator<Book> response; 
      try {
         response = blockingStub.searchByAuthor(request);
         while(response.hasNext()) {
            logger.info("Found book: " + response.next());
         }
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
         return;
      }
   }
   public static void main(String[] args) throws Exception {
      String authorName = args[0];
      String serverAddress = "localhost:50051";
	   
      ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
        .usePlaintext()
        .build();
 
      try {
         BookStoreClientServerStreamingBlocking client = new BookStoreClientUnaryBlocking(channel);
         client.getBook(authorName);
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

上面的代码在指定的端口启动一个 gRPC 服务器,并服务于我们在 proto 文件中编写的函数和服务。让我们遍历上面的代码 -

  • main方法开始,我们接受一个参数,即我们要搜索的书籍的title

  • 我们为与服务器的 gRPC 通信设置了一个 Channel。

  • 然后,我们使用 channel 创建一个阻塞 stub。在这里,我们选择服务“BookStore”,我们计划调用其函数。

  • 然后,我们简单地创建 proto 文件中定义的预期输入,即在我们的例子中为 BookSearch,并添加我们希望服务器搜索的标题。

  • 我们最终进行调用并在有效的 Books 上获取一个迭代器。当我们迭代时,我们得到服务器提供的相应 Books。

  • 最后,我们关闭 Channel 以避免任何资源泄漏。

因此,这就是我们的客户端代码。

客户端服务器调用

总而言之,我们要做的是:

  • 启动 gRPC 服务器。

  • 客户端查询服务器以获取具有给定作者的书籍。

  • 服务器在其商店中搜索书籍,这是一个耗时的过程。

  • 服务器在找到符合给定条件的书籍时做出响应。服务器不会等待所有有效的书籍都可用。它在找到一本时立即发送输出。然后重复此过程。

现在,我们已经定义了proto文件,编写了服务器和客户端代码,让我们继续执行此代码并查看实际情况。

要运行代码,请启动两个 shell。通过执行以下命令在第一个 shell 中启动服务器 -

java -cp .\target\grpc-point-1.0.jar 
com.tp.bookstore.BookeStoreServerStreaming

我们将看到以下输出 -

输出

Jul 03, 2021 10:37:21 PM 
com.tp.bookstore.BookeStoreServerStreaming start
INFO: Server started, listening on 50051

上述输出表示服务器已启动。

现在,让我们启动客户端。

java -cp .\target\grpc-point-1.0.jar 
com.tp.bookstore.BookStoreClientServerStreamingBlocking "Har"

我们将看到以下输出 -

输出

Jul 03, 2021 10:40:31 PM 
com.tp.bookstore.BookStoreClientServerStreamingBlocking 
getBook
INFO: Querying for book with author: Har

Jul 03, 2021 10:40:37 PM 
com.tp.bookstore.BookStoreClientServerStreamingBlocking 
getBook
INFO: Found book: name: "Go Set a Watchman"
author: "Harper Lee"
price: 700

Jul 03, 2021 10:40:42 PM 
com.tp.bookstore.BookStoreClientServerStreamingBlocking 
getBook
INFO: Found book: name: "To Kill MockingBird"
author: "Harper Lee"
price: 400

因此,正如我们所看到的,客户端能够通过使用书籍名称查询服务器来获取书籍详细信息。但更重要的是,客户端在不同的时间戳获取了第一本书和第二本书,即大约 5 秒的间隔。

gRPC - 客户端流式 RPC

现在让我们看看在使用 gRPC 通信时客户端流是如何工作的。在这种情况下,客户端将搜索并将书籍添加到购物车。一旦客户端完成添加所有书籍,服务器将向客户端提供结账购物车值。

.proto 文件

首先让我们在common_proto_files中定义bookstore.proto文件:

syntax = "proto3";
option java_package = "com.tp.bookstore";
service BookStore {
   rpc totalCartValue (stream Book) returns (Cart) {}
}
message BookSearch {
   string name = 1;
   string author = 2;
   int32 price = 3;
}
message Cart {
   int32 books = 1;
   int32 price = 2;
}

这里,以下代码块表示服务名称 **"BookStore"** 和可以调用的函数名称 **"totalCartValue"**。“totalCartValue” 函数接收类型为 **"Book"** 的输入,该输入是一个流。函数返回类型为 "Cart" 的对象。因此,实际上,我们允许客户端以流的方式添加书籍,一旦客户端完成,服务器就会向客户端提供购物车总价值。

service BookStore {
   rpc totalCartValue (stream Book) returns (Cart) {}
}

现在让我们看看这些类型。

message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}

客户端将发送它想要购买的 **"Book"**。它可能不是完整的书籍信息;它可以仅仅是书籍的标题。

message Cart {
   int32 books = 1;
   int32 price = 2;
}

服务器在获取书籍列表后,将返回 **"Cart"** 对象,该对象仅仅是客户端购买的书籍总数和总价。

请注意,我们已经完成了 Maven 的设置,以便自动生成我们的类文件以及我们的 RPC 代码。因此,现在我们可以简单地编译我们的项目:

mvn clean install

这应该会自动生成我们使用 gRPC 所需的源代码。源代码将放置在 -

Protobuf class code: target/generated-sources/protobuf/java/com.tp.bookstore
Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.bookstore

设置 gRPC 服务器

现在我们已经定义了包含函数定义的 **proto** 文件,让我们设置一个可以调用这些函数的服务器。

让我们编写我们的服务器代码来服务于上述函数,并将其保存在 **com.tp.bookstore.BookeStoreServerClientStreaming.java** 中 -

示例

package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;

public class  BookeStoreServerClientStreaming {
   private static final Logger logger = Logger.getLoggerr(BookeStoreServerClientStreaming.class.getName());
 
   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
         .setAuthor("Scott Fitzgerald")
         .setPrice(300).build());
      bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
         .setAuthor("Harper Lee")
         .setPrice(400).build());
      bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
         .setAuthor("E.M.Forster")
         .setPrice(500).build());
      bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
         .setAuthor("Scott Fitzgerald")
         .setPrice(600).build());
      bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman")
         .setAuthor("Harper Lee")
         .setPrice(700).build());
   }
   private Server server;
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port)
         .addService(new BookStoreImpl()).build().start();
 
      logger.info("Server started, listening on " + port);
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         public void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   public static void main(String[] args) throws IOException, InterruptedException {
      final BookeStoreServerClientStreaming greetServer = new  BookeStoreServerClientStreaming();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
      @Override
      public StreamObserver<Book> totalCartValue(StreamObserver<Cart> responseObserver) {
         return new StreamObserver<Book>() {
            ArrayList<Book> bookCart = new ArrayList<Book>();
            @Override
            public void onNext(Book book) 
            logger.info("Searching for book with title starting with: " + book.getName());
            for (Entry<String, Book> bookEntry : bookMap.entrySet()) {
               if(bookEntry.getValue().getName().startsWith(book.getName())){
                  logger.info("Found book, adding to cart:....");
                  bookCart.add(bookEntry.getValue());
               }
            }
         }
         @Override
         public void onError(Throwable t) {
            logger.info("Error while reading book stream: " + t);
         }
         @Override
         public void onCompleted() {
            int cartValue = 0;
            for (Book book : bookCart) {
               cartValue += book.getPrice();
            }
            responseObserver.onNext(Cart.newBuilder()
               .setPrice(cartValue)
               .setBooks(bookCart.size()).build());
            responseObserver.onCompleted();
         }
      };
 
   }
}          

上述代码在指定的端口上启动一个 gRPC 服务器,并为我们在proto文件中编写的函数和服务提供服务。让我们逐步了解上面的代码:

  • main方法开始,我们在指定的端口创建一个 gRPC 服务器。

  • 但在启动服务器之前,我们将要运行的服务分配给服务器,即在我们的例子中为BookStore服务。

  • 为此,我们需要将服务实例传递给服务器,因此我们继续创建一个服务实例,即在我们的例子中为BookStoreImpl

  • 服务实例需要提供 **.proto 文件** 中存在的 method/function 的实现,即在我们的例子中,是 **totalCartValue** 方法。

  • 现在,鉴于这是客户端流的情况,服务器将在客户端添加书籍时获得一个 Book 列表(在 **proto** 文件中定义)。因此,服务器返回一个 **自定义流观察器**。此流观察器实现了当找到新 Book 时以及当流关闭时会发生什么。

  • 当客户端添加一个 Book 时,gRPC 框架将调用 **onNext()** 方法。此时,服务器将其添加到购物车中。在流式传输的情况下,服务器不会等待所有可用的书籍。

  • 当客户端完成添加书籍时,将调用流观察器的 **onCompleted()** 方法。此方法实现了服务器在客户端完成添加书籍时想要发送的内容,即它将 Cart 对象返回给客户端。

  • 最后,我们还有一个关闭钩子,以确保在我们完成代码执行时服务器能够干净地关闭。

设置 gRPC 客户端

现在我们已经编写了服务器的代码,让我们设置一个可以调用这些函数的客户端。

让我们编写我们的客户端代码来调用上述函数,并将其保存在 **com.tp.bookstore.BookStoreClientServerStreamingBlocking.java** 中 -

示例

package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreGrpc.BookStoreFutureStub;
import com.tp.bookstore.BookStoreGrpc.BookStoreStub;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;

public class BookStoreClientStreamingClient {
   private static final Logger logger = Logger.getLogger(BookStoreClientStreaming.class.getName());
   private final BookStoreStub stub;
	private boolean serverResponseCompleted = false; 
   StreamObserver<Book> streamClientSender;
   
   public BookStoreClientStreamingClient(Channel channel) {
      stub = BookStoreGrpc.newStub(channel);
   }
   public StreamObserver<Cart> getServerResponseObserver(){
      StreamObserver<Cart> observer = new StreamObserver<Cart>(){
         @Override
         public void onNext(Cart cart) {
            logger.info("Order summary:" + "\nTotal number of Books:" + cart.getBooks() + 
               "\nTotal Order Value:" + cart.getPrice());
         }
         @Override
         public void onCompleted() {
            //logger.info("Server: Done reading orderreading cart");
            serverResponseCompleted = true;
         }
      };
      return observer;
   }
   public void addBook(String book) {
      logger.info("Adding book with title starting with: " + book);
      Book request = Book.newBuilder().setName(book).build();
 
      if(streamClientSender == null) {
         streamClientSender = stub.totalCartValue(getServerResponseObserver());
      }
      try {
         streamClientSender.onNext(request);
      }
      catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
      }
   }
   public void completeOrder() {
      logger.info("Done, waiting for server to create order summary...");
      if(streamClientSender != null);
      streamClientSender.onCompleted();
   }
   public static void main(String[] args) throws Exception {
      String serverAddress = "localhost:50051";
	   ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
         .usePlaintext()
         .build();
      try {
         BookStoreClientStreamingClient client = new BookStoreClientStreamingClient(channel);
         String bookName = ""; 
 
         while(true) {
            System.out.println("Type book name to be added to the cart....");
            bookName = System.console().readLine();
            if(bookName.equals("EXIT")) {
               client.completeOrder();
               break; 
            }
            client.addBook(bookName);
         }
 
         while(client.serverResponseCompleted == false) {
            Thread.sleep(2000);
         }
 
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

上面的代码在指定的端口启动一个 gRPC 服务器,并服务于我们在 proto 文件中编写的函数和服务。让我们遍历上面的代码 -

  • main方法开始,我们接受一个参数,即我们要搜索的书籍的标题。

  • 我们为与服务器的 gRPC 通信设置了一个 Channel。

  • 接下来,我们使用创建的通道创建一个 **非阻塞存根**。在这里,我们选择计划调用其函数的服务 **"BookStore"**。

  • 然后,我们简单地创建 **.proto** 文件中定义的预期输入,即在我们的例子中,是 **Book**,并添加我们希望服务器添加的标题。

  • 但是鉴于这是客户端流的情况,我们首先为服务器创建一个流观察器。此服务器流观察器列出了服务器响应时需要执行的操作的行为,即 **onNext()** 和 **onCompleted()**

  • 并且使用存根,我们也获得了客户端流观察器。我们使用此流观察器发送要添加到购物车的书籍数据。最终,我们进行调用并在有效书籍上获取迭代器。当我们迭代时,我们得到服务器提供的相应书籍。

  • 并且一旦我们的订单完成,我们确保客户端流观察器已关闭。它告诉服务器计算购物车价值并将其作为输出提供。

  • 最后,我们关闭 Channel 以避免任何资源泄漏。

因此,这就是我们的客户端代码。

客户端服务器调用

总而言之,我们要做的是:

  • 启动 gRPC 服务器。

  • 客户端通过向服务器通知它们来添加书籍流。

  • 服务器在其商店中搜索书籍并将其添加到购物车。

  • 当客户端完成订购时,服务器会响应客户端的购物车总价值。

现在,我们已经定义了proto文件,编写了服务器和客户端代码,让我们继续执行此代码并查看实际情况。

要运行代码,请启动两个 shell。通过执行以下命令在第一个 shell 中启动服务器 -

java -cp .\target\grpc-point-1.0.jar 
com.tp.bookstore.BookeStoreServerClientStreaming

我们将看到以下输出 -

输出

Jul 03, 2021 10:37:21 PM 
com.tp.bookstore.BookeStoreServerStreaming start
INFO: Server started, listening on 50051

上述输出表示服务器已启动。

现在,让我们启动客户端。

java -cp .\target\grpc-point-1.0.jar 
com.tp.bookstore.BookStoreClientServerStreamingClient

让我们向我们的客户端添加一些书籍。

Type book name to be added to the cart....
Gr
Jul 24, 2021 5:53:07 PM 
com.tp.bookstore.BookStoreClientStreamingClient addBook
INFO: Adding book with title starting with: Great

Type book name to be added to the cart....
Pa
Jul 24, 2021 5:53:20 PM 
com.tp.bookstore.BookStoreClientStreamingClient addBook
INFO: Adding book with title starting with: Passage

Type book name to be added to the cart....

一旦我们添加了书籍并输入“EXIT”,服务器将计算购物车价值,以下是我们获得的输出 -

输出

EXIT
Jul 24, 2021 5:53:33 PM 
com.tp.bookstore.BookStoreClientStreamingClient completeOrder
INFO: Done, waiting for server to create order summary...
Jul 24, 2021 5:53:33 PM 
com.tp.bookstore.BookStoreClientStreamingClient$1 onNext
INFO: Order summary:
Total number of Books: 2
Total Order Value: 800

因此,正如我们所看到的,客户端能够添加书籍。并且一旦所有书籍都添加完毕,服务器将响应书籍总数和总价。

gRPC - 双向 RPC

现在让我们看看在使用 gRPC 通信时客户端-服务器流是如何工作的。在这种情况下,客户端将搜索并将书籍添加到购物车。服务器将在每次添加书籍时响应实时购物车价值。

.proto 文件

首先让我们在common_proto_files中定义bookstore.proto文件:

syntax = "proto3";
option java_package = "com.tp.bookstore";
service BookStore {
   rpc liveCartValue (stream Book) returns (stream Cart) {}
}
message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}
message Cart {
   int32 books = 1;
   int32 price = 2;
}

以下代码块表示服务名称 **"BookStore"** 和可以调用的函数名称 "liveCartValue"。**"liveCartValue"** 函数接收类型为 **"Book"** 的输入,该输入是一个流。函数返回类型为 **"Cart"** 的对象的流。因此,实际上,我们允许客户端以流的方式添加书籍,并且每当添加新书籍时,服务器都会将当前购物车价值响应给客户端。

service BookStore {
   rpc liveCartValue (stream Book) returns (stream Cart) {}
}

现在让我们看看这些类型。

message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}

客户端将发送它想要购买的 **"Book"**。它不必是完整的书籍信息;它可以仅仅是书籍的标题。

message Cart {
   int32 books = 1;
   int32 price = 2;
}

服务器在获取书籍列表后,将返回 **"Cart"** 对象,该对象仅仅是客户端购买的书籍总数和总价。

请注意,我们已经完成了 Maven 设置,用于自动生成我们的类文件以及我们的 RPC 代码。因此,现在我们可以简单地编译我们的项目。

mvn clean install

这应该会自动生成我们使用 gRPC 所需的源代码。源代码将放置在

Protobuf class code: target/generatedsources/protobuf/java/com.tp.bookstore
Protobuf gRPC code: target/generated-sources/protobuf/grpcjava/com.tp.bookstore

设置 gRPC 服务器

现在我们已经定义了包含函数定义的 proto 文件,让我们设置一个可以调用这些函数的服务器。

让我们编写我们的服务器代码来服务于上述函数,并将其保存在 **com.tp.bookstore.BookeStoreServerBothStreaming.java** 中 -

示例

package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;

public class BookeStoreServerBothStreaming {
   private static final Logger logger =Logger.getLogger(BookeStoreServerBothStreaming.class.getName());

   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
         .setAuthor("Scott Fitzgerald")
         .setPrice(300).build());
      bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
         .setAuthor("Harper Lee")
         .setPrice(400).build());
      bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
         .setAuthor("E.M.Forster")
         .setPrice(500).build());
      bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
         .setAuthor("Scott Fitzgerald")
         .setPrice(600).build());
      bookMap.put("Go Set a Watchman",Book.newBuilder().setName("Go Set a Watchman")
         .setAuthor("Harper Lee")
         .setPrice(700).build());
   }
   private Server server;
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port)
         .addService(new BookStoreImpl()).build().start();

      logger.info("Server started, listening on " + port);
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         public void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
               } catch (InterruptedException e) {
                  e.printStackTrace(System.err);
            }
         }
      });
   }
   public static void main(String[] args) throws IOException,InterruptedException {
      final BookeStoreServerBothStreaming greetServer = newBookeStoreServerBothStreaming();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extendsBookStoreGrpc.BookStoreImplBase {
      @Override
      public StreamObserver<Book>liveCartValue(StreamObserver<Cart> responseObserver) {
         return new StreamObserver<Book>() {
            ArrayList<Book> bookCart = new ArrayList<Book>();
            int cartValue = 0;
            @Override
            public void onNext(Book book) {
               logger.info("Searching for book with titlestarting with: " + book.getName());
               for (Entry<String, Book> bookEntry :bookMap.entrySet()) {
                  if(bookEntry.getValue().getName().startsWith(book.getName())){
                     logger.info("Found book, adding tocart:....");
                     bookCart.add(bookEntry.getValue());
                     cartValue +=bookEntry.getValue().getPrice();
                  }
               }
               logger.info("Updating cart value...");

               responseObserver.onNext(Cart.newBuilder()
                  .setPrice(cartValue)
                  .setBooks(bookCart.size()).build());
            }
            @Override
            public void onError(Throwable t) {
               logger.info("Error while reading book stream: " + t);
            }
            @Override
            public void onCompleted() {
               logger.info("Order completed");
               responseObserver.onCompleted();
            }
         };
      }
   }
}

上述代码在指定的端口上启动一个 gRPC 服务器,并为我们在proto文件中编写的函数和服务提供服务。让我们逐步了解上面的代码:

  • 从 **main** 方法开始,我们在指定的端口上创建一个 gRPC 服务器。

  • 但在启动服务器之前,我们将要运行的服务分配给服务器,即在我们的例子中为BookStore服务。

  • 为此,我们需要将服务实例传递给服务器,因此我们继续创建服务实例,即在我们的例子中是 **BookStoreImpl**

  • 服务实例需要提供 **proto** 文件中存在的 method/function 的实现,即在我们的例子中,是 **totalCartValue** 方法。

  • 现在,鉴于这是服务器和客户端流的情况,服务器将在客户端添加书籍时获得一个 **Books** 列表(在 **proto** 文件中定义)。因此,服务器返回一个自定义流观察器。此流观察器实现了当找到新 Book 时以及当流关闭时会发生什么。

  • 当客户端添加一个 Book 时,gRPC 框架将调用 **onNext()** 方法。此时,服务器将其添加到购物车中并使用响应观察器返回购物车价值。在流式传输的情况下,服务器不会等待所有有效的书籍可用。

  • 当客户端完成添加书籍时,将调用流观察器的 **onCompleted()** 方法。此方法实现了服务器在客户端完成添加 **Books** 时想要执行的操作,即声明它已完成接收客户端订单。

  • 最后,我们还有一个关闭钩子,以确保在我们完成代码执行时服务器能够干净地关闭。

设置 gRPC 客户端

现在我们已经编写了服务器的代码,让我们设置一个可以调用这些函数的客户端。

让我们编写我们的客户端代码来调用上述函数,并将其保存在 **com.tp.bookstore.BookStoreClientBothStreaming.java** 中 -

示例

package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreGrpc.BookStoreFutureStub;
import com.tp.bookstore.BookStoreGrpc.BookStoreStub;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;

public class BookStoreClientBothStreaming {
   private static final Logger logger = Logger.getLogger(BookStoreClientBothStreaming.class.getName());
   private final BookStoreStub stub;
   private boolean serverIntermediateResponseCompleted = true;
   private boolean serverResponseCompleted = false;

   StreamObserver<Book> streamClientSender;
   
   public BookStoreClientBothStreaming(Channel channel) {
      stub = BookStoreGrpc.newStub(channel);
   }
   public StreamObserver>Cart< getServerResponseObserver(){
      StreamObserver>Cart< observer = new StreamObserver<Cart>(){
         @Override
         public void onNext(Cart cart) {
            logger.info("Order summary:" + 
               "\nTotal number of Books:" + cart.getBooks()+ 
               "\nTotal Order Value:" cart.getPrice());

            serverIntermediateResponseCompleted = true;
         }
         @Override
         public void onError(Throwable t) {
            logger.info("Error while reading response fromServer: " + t);
         }
         @Override
         public void onCompleted() {
            //logger.info("Server: Done reading orderreading cart");
            serverResponseCompleted = true;
         }
      };
      return observer;
   }
   public void addBook(String book) {
      logger.info("Adding book with title starting with: " + book);
      Book request = Book.newBuilder().setName(book).build();
      if(streamClientSender == null) {
         streamClientSender =stub.liveCartValue(getServerResponseObserver());
      }
      try {
         streamClientSender.onNext(request);
      }
      catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
      }
   }
   public void completeOrder() {
      logger.info("Done, waiting for server to create ordersummary...");
      if(streamClientSender != null); {
         streamClientSender.onCompleted();
      }
   }
   public static void main(String[] args) throws Exception {
      String serverAddress = "localhost:50051";
      ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
         .usePlaintext()
         .build();
      try {
         BookStoreClientBothStreaming client = new
         BookStoreClientBothStreaming(channel);
         String bookName = "";

         while(true) {
            if(client.serverIntermediateResponseCompleted ==true) {
               System.out.println("Type book name to beadded to the cart....");
               bookName = System.console().readLine();
               if(bookName.equals("EXIT")) {
                  client.completeOrder();
                  break;
               }
               client.serverIntermediateResponseCompleted = false;
               client.addBook(bookName);
               Thread.sleep(500);
            }
         }
         while(client.serverResponseCompleted == false) {
            Thread.sleep(2000);
         }
            
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }   
   }
}

以上代码启动一个 gRPC 客户端并连接到指定端口上的服务器,并调用我们在 **proto** 文件中编写的函数和服务。让我们一起浏览以上代码 -

  • 从 **main** 方法开始,我们接受要添加到购物车的书籍名称。一旦所有书籍都将要添加,用户预计会打印“EXIT”。

  • 我们为与服务器的 gRPC 通信设置了一个 Channel。

  • 接下来,我们使用通道创建一个非阻塞存根。在这里,我们选择计划调用其函数的服务 **"BookStore"**。

  • 然后,我们简单地创建 **.proto** 文件中定义的预期输入,即在我们的例子中,是 **Book**,并添加我们希望服务器添加的标题。

  • 但是鉴于这是服务器和客户端流的情况,我们首先为服务器创建一个 **流观察器**。此服务器流观察器列出了服务器响应时需要执行的操作的行为,即 **onNext()** 和 **onCompleted()**。

  • 并且使用存根,我们也获得了客户端流观察器。我们使用此流观察器发送要添加到购物车的书籍数据。

  • 并且一旦我们的订单完成,我们确保客户端流观察器已关闭。这告诉服务器关闭流并执行清理。

  • 最后,我们关闭 Channel 以避免任何资源泄漏。

因此,这就是我们的客户端代码。

客户端服务器调用

总而言之,我们要做的是:

  • 启动 gRPC 服务器。

  • 客户端通过向服务器通知它们来添加书籍流。

  • 服务器在其商店中搜索书籍并将其添加到购物车。

  • 每次添加书籍时,服务器都会告诉客户端购物车价值。

  • 当客户端完成订购时,服务器和客户端都会关闭流。

现在我们已经定义了我们的 **proto** 文件,编写了我们的服务器和客户端代码,让我们现在执行此代码并查看实际情况。

要运行代码,请启动两个 shell。通过执行以下命令在第一个 shell 中启动服务器 -

java -cp .\target\grpc-point-1.0.jar
com.tp.bookstore.BookeStoreServerClientStreaming

我们将看到以下输出 -

输出

Jul 03, 2021 10:37:21 PM
com.tp.bookstore.BookeStoreServerStreaming start
INFO: Server started, listening on 50051

此输出表示服务器已启动。

现在,让我们启动客户端。

java -cp .\target\grpc-point-1.0.jar
com.tp.bookstore.BookStoreClientBothStreaming

让我们向我们的客户端添加一本书。

Jul 24, 2021 7:21:45 PM
com.tp.bookstore.BookStoreClientBothStreaming main
Type book name to be added to the cart....
Great

Jul 24, 2021 7:21:48 PM
com.tp.bookstore.BookStoreClientBothStreaming addBook
INFO: Adding book with title starting with: Gr

Jul 24, 2021 7:21:48 PM
com.tp.bookstore.BookStoreClientBothStreaming$1 onNext
INFO: Order summary:

Total number of Books: 1
Total Order Value: 300

因此,正如我们所看到的,我们得到了订单的当前购物车价值。现在让我们向我们的客户端再添加一本书。

Type book name to be added to the cart....
Passage

Jul 24, 2021 7:21:51 PM
com.tp.bookstore.BookStoreClientBothStreaming addBook
INFO: Adding book with title starting with: Pa

Jul 24, 2021 7:21:51 PM
com.tp.bookstore.BookStoreClientBothStreaming$1 onNext
INFO: Order summary:
Total number of Books: 2
Total Order Value: 800

一旦我们添加了书籍并输入“EXIT”,客户端将关闭。

Type book name to be added to the cart....
EXIT
Jul 24, 2021 7:21:59 PM
com.tp.bookstore.BookStoreClientBothStreaming completeOrder
INFO: Done, waiting for server to create order summary...

因此,正如我们所看到的,客户端能够添加书籍。并且随着书籍的添加,服务器将响应当前购物车价值。

gRPC - 客户端调用

gRPC 客户端支持两种类型的客户端调用,即客户端如何调用服务器。以下是两种方式 -

  • 阻塞式客户端调用

  • 异步客户端调用

在本章中,我们将逐一查看它们。

阻塞式客户端调用

gRPC 支持阻塞式客户端调用。这意味着,一旦客户端对服务进行调用,客户端将不会继续执行其余代码,直到它从服务器收到响应。请注意,对于单向调用和服务器流式调用,阻塞式客户端调用是可能的。

请注意,对于单向调用和服务器流式调用,阻塞式客户端调用是可能的。

这是一个单向阻塞式客户端调用的示例。

示例

package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;

public class BookStoreClientUnaryBlocking {
   private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlocking.class.getName());
   private final BookStoreGrpc.BookStoreBlockingStubblockingStub;
   public BookStoreClientUnaryBlocking(Channel channel) {
      blockingStub = BookStoreGrpc.newBlockingStub(channel);
   }
   public void getBook(String bookName) {
      logger.info("Querying for book with title: " + bookName);
      BookSearch request = BookSearch.newBuilder().setName(bookName).build();

      Book response;
      try {
         response = blockingStub.first(request);
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
         return;
      }
      logger.info("Got following book from server: " + response);
   }
   public static void main(String[] args) throws Exception {
      String bookName = args[0];
      String serverAddress = "localhost:50051";
      ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
         .usePlaintext()
         .build();
      try {
         BookStoreClientUnaryBlocking client = new
         BookStoreClientUnaryBlocking(channel);
         client.getBook(bookName);
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

在以上示例中,我们有,

public BookStoreClientUnaryBlocking(Channel channel) {
   blockingStub = BookStoreGrpc.newBlockingStub(channel);
}

这意味着我们将使用阻塞式 RPC 调用。

然后,我们有,

BookSearch request = BookSearch.newBuilder().setName(bookName).build();

Book response;
response = blockingStub.first(request);

在这里,我们使用 **blockingStub** 调用 RPC **method first()** 来获取书籍详细信息。

类似地,对于服务器流,我们可以使用阻塞式存根 -

logger.info("Querying for book with author: " + author);
BookSearch request =
BookSearch.newBuilder().setAuthor(author).build();

Iterator<Book> response;
try {
   response = blockingStub.searchByAuthor(request);
   while(response.hasNext()) {
   logger.info("Found book: " + response.next());
}

在这里,我们调用 RPC 方法 **searchByAuthor** 方法,并迭代响应,直到服务器流结束。

非阻塞式客户端调用

gRPC 支持非阻塞式客户端调用。这意味着,当客户端对服务进行调用时,它不需要等待服务器响应。为了处理服务器响应,客户端可以简单地传递观察器,该观察器指示在收到响应时该做什么。请注意,对于单向调用和流式调用,非阻塞式客户端调用是可能的。但是,我们将专门关注服务器流式调用的情况,以将其与阻塞式调用进行比较。

请注意,对于单向调用和流式调用,非阻塞式客户端调用是可能的。但是,我们将专门关注服务器流式调用的情况,以将其与阻塞式调用进行比较。

这是一个服务器流式非阻塞式客户端调用的示例。

示例

package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;

public class BookStoreClientServerStreamingNonBlocking {
   private static final Logger logger = Logger.getLogger(BookStoreClientServerStreamingNonBlocking.class.getName());
   private final BookStoreGrpc.BookStoreStub nonBlockingStub;
   public BookStoreClientServerStreamingNonBlocking(Channelchannel) {
      nonBlockingStub = BookStoreGrpc.newStub(channel);
   }
   public StreamObserver<Book> getServerResponseObserver(){
      StreamObserver<Book> observer = new
      StreamObserver<Book>(){
         @Override
         public void onNext(Book book) {
            logger.info("Server returned following book: " +book);
         }
         @Override
         public void onError(Throwable t) {
            logger.info("Error while reading response fromServer: " + t);
         }
         @Override
         public void onCompleted() {
            logger.info("Server returned following book: " + book);
         }
      };
      return observer;
   }
   public void getBook(String author) {
      logger.info("Querying for book with author: " + author);
      BookSearch request = BookSearch.newBuilder().setAuthor(author).build();
      try {
         nonBlockingStub.searchByAuthor(request,getServerResponseObserver());
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}",e.getStatus());
         return;
      }
   }
   public static void main(String[] args) throws Exception {
      String authorName = args[0];
      String serverAddress = "localhost:50051";
      ManagedChannel channel =ManagedChannelBuilder.forTarget(serverAddress)
         .usePlaintext()
         .build();
      try {
         BookStoreClientServerStreamingNonBlocking client = new
         BookStoreClientServerStreamingNonBlocking(channel);
         client.getBook(authorName);
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

正如我们在以上示例中看到的,

public BookStoreClientUnaryNonBlocking(Channel channel) {
   nonBlockingStub = BookStoreGrpc.newStub(channel);
}

它定义了存根是非阻塞式的。类似地,以下代码用于处理我们从服务器接收到的响应。一旦服务器发送响应,我们就记录输出。

public StreamObserver<Book> getServerResponseObserver(){
   StreamObserver<Book> observer = new
   StreamObserver<Book>(){
   ....
   ....
   return observer;
}

以下 gRPC 调用是非阻塞式调用。

logger.info("Querying for book with author: " + author);
BookSearch request = BookSearch.newBuilder().setAuthor(author).build();
try {
   nonBlockingStub.searchByAuthor(request, getServerResponseObserver());
}

这就是我们确保客户端不需要等到服务器完成 **searchByAuthor** 执行的方式。这将由流观察器对象在服务器返回 Book 对象时直接处理。

gRPC - 超时和取消

gRPC 支持为请求分配超时。这是一种执行请求取消的方法。它有助于避免为客户端和服务器使用资源,以执行其结果对客户端无用的请求。

请求超时

gRPC 支持为客户端和服务器指定超时。

  • 客户端可以在运行时指定它希望等待多长时间,然后取消请求。

  • 服务器也可以在其端检查请求是否需要处理,或者客户端是否已放弃请求。

让我们来看一个例子,客户端期望在2秒内收到响应,但服务器花费了更长的时间。所以,这是我们的服务器代码

示例

package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;

public class BookeStoreServerUnaryTimeout {
   private static final Logger logger = Logger.getLogger(BookeStoreServerUnaryTimeout.class.getName());

   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby",Book.newBuilder().setName("Great Gatsby")
        .setAuthor("Scott Fitzgerald")
        .setPrice(300).build());
      bookMap.put("To Kill MockingBird",Book.newBuilder().setName("To Kill MockingBird")
        .setAuthor("Harper Lee")
        .setPrice(400).build());
      bookMap.put("Passage to India",Book.newBuilder().setName("Passage to India")
        .setAuthor("E.M.Forster")
        .setPrice(500).build());
      bookMap.put("The Side of Paradise",Book.newBuilder().setName("The Side of Paradise")
        .setAuthor("Scott Fitzgerald")
        .setPrice(600).build());
      bookMap.put("Go Set a Watchman",Book.newBuilder().setName("Go Set a Watchman")
        .setAuthor("Harper Lee")
        .setPrice(700).build());
   }
   private Server server;
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port)
         .addService(new BookStoreImpl()).build().start();

      logger.info("Server started, listening on " + port);
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         public void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30,TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   public static void main(String[] args) throws IOException,InterruptedException {
      final BookeStoreServerUnaryTimeout greetServer = new
      BookeStoreServerUnaryTimeout();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extends
   BookStoreGrpc.BookStoreImplBase {
      @Override
      public void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
         logger.info("Searching for book with title: " +searchQuery.getName());
         logger.info("This may take more time...");
         try {
            Thread.sleep(5000);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
         List<String> matchingBookTitles = bookMap.keySet().stream()
            .filter(title ->
         title.startsWith(searchQuery.getName().trim()))
            .collect(Collectors.toList());
         Book foundBook = null;
         if(matchingBookTitles.size() > 0) {
            foundBook = bookMap.get(matchingBookTitles.get(0));
         }
         responseObserver.onNext(foundBook);
         responseObserver.onCompleted();
      }
   }
}

在上面的代码中,服务器搜索客户端提供的书籍标题。我们添加了一个模拟休眠,以便我们可以看到请求超时。

这是我们的客户端代码

示例

package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;

public class BookStoreClientUnaryBlockingTimeout {
   private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlockingTimeout.class.getName());
   private final BookStoreGrpc.BookStoreBlockingStubblockingStub;
   public BookStoreClientUnaryBlockingTimeout(Channel channel){
      blockingStub = BookStoreGrpc.newBlockingStub(channel);
   }
   public void getBook(String bookName) {
      logger.info("Querying for book with title: " + bookName);
      BookSearch request =BookSearch.newBuilder().setName(bookName).build();
      Book response;
      try {
         response = blockingStub.withDeadlineAfter(2,TimeUnit.SECONDS).first(request);
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}",e.getStatus());
         return;
      }
      logger.info("Got following book from server: " +response);
   }
   public static void main(String[] args) throws Exception {
      String bookName = args[0];
      String serverAddress = "localhost:50051";
      ManagedChannel channel =ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().build();
      try {
         BookStoreClientUnaryBlockingTimeout client = newBookStoreClientUnaryBlockingTimeout(channel);
         client.getBook(bookName);
      } finally {
         channel.shutdownNow().awaitTermination(5,
         TimeUnit.SECONDS);
      }
   }
}

上面的代码使用标题调用服务器进行搜索。但更重要的是,它为gRPC调用提供了2秒的超时时间

现在让我们看看它的运行情况。要运行代码,请启动两个shell。在第一个shell上执行以下命令启动服务器:

java -cp .\target\grpc-point-1.0.jar
com.tp.bookstore.BookeStoreServerUnaryTimeout

我们将看到以下输出 -

输出

Jul 31, 2021 12:29:31 PM
com.tp.bookstore.BookeStoreServerUnaryTimeout start
INFO: Server started, listening on 50051

以上输出表明服务器已启动。

Jul 31, 2021 12:29:35 PM
com.tp.bookstore.BookeStoreServerUnaryTimeout$BookStoreImpl
first
INFO: Searching for book with title: Great

Jul 31, 2021 12:29:35 PM
com.tp.bookstore.BookeStoreServerUnaryTimeout$BookStoreImpl
first
INFO: This may take more time...

现在,让我们启动客户端。

java -cp .\target\grpc-point-1.0.jar
com.tp.bookstore.BookStoreClientUnaryBlockingTimeout Great

我们将得到以下输出:

输出

Jul 31, 2021 12:29:34 PM
com.tp.bookstore.BookStoreClientUnaryBlockingTimeout getBook

INFO: Querying for book with title: Great
Jul 31, 2021 12:29:36 PM
com.tp.bookstore.BookStoreClientUnaryBlockingTimeout getBook
WARNING: RPC failed: Status{code=DEADLINE_EXCEEDED,
description=deadline exceeded after 1.970455800s.
[buffered_nanos=816522700,
remote_addr=localhost/127.0.0.1:50051], cause=null}

因此,正如我们所看到的,客户端在2秒内没有收到响应,因此它取消了请求并将其称为超时,即DEADLINE_EXCEEDED

请求取消

gRPC支持客户端和服务器端都取消请求。客户端可以在运行时指定它希望在取消请求之前等待的时间。服务器也可以在其端检查请求是否需要处理,或者客户端是否已经放弃了请求。

让我们看一个客户端流式传输的例子,其中客户端调用取消。所以,这是我们的服务器代码

示例

package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;

public class BookeStoreServerClientStreaming {
   private static final Logger logger = Logger.getLogger(BookeStoreServerClientStreaming.class.getName());
   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby").setAuthor("Scott   Fitzgerald").setPrice(300).build());
      bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird").setAuthor("Harper Lee")
        .setPrice(400).build());
      bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India").setAuthor("E.M.Forster")
         .setPrice(500).build());
      bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise").setAuthor("Scott Fitzgerald")
         .setPrice(600).build());
      bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman").setAuthor("Harper Lee")
         .setPrice(700).build());
   }
   private Server server;
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port).addService(new BookStoreImpl()).build().start();

      logger.info("Server started, listening on " + port);
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         public void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   public static void main(String[] args) throws IOException,InterruptedException {
      final BookeStoreServerClientStreaming greetServer = newBookeStoreServerClientStreaming();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extendsBookStoreGrpc.BookStoreImplBase {
      @Override
      public StreamObserver<Book>
      totalCartValue(StreamObserver<Cart> responseObserver) {
         return new StreamObserver<Book>() {
            ArrayList<Book> bookCart = new ArrayList<Book>();
            @Override
            public void onNext(Book book) {
               logger.info("Searching for book with titlestarting with: " + book.getName());
               for (Entry<String, Book> bookEntry : bookMap.entrySet()) {
                  if(bookEntry.getValue().getName().startsWith(book.getName())){
                     logger.info("Found book, adding to cart:....");
                     bookCart.add(bookEntry.getValue());
                  }
               }
            }
            @Override
            public void onError(Throwable t) {
               logger.info("Error while reading book stream:" + t);
            }
            @Override
            public void onCompleted() {
               int cartValue = 0;
               for (Book book : bookCart) {
                  cartValue += book.getPrice();
               }
               responseObserver.onNext(Cart.newBuilder().setPrice(cartValue).setBooks(bookCart.size()).build());
               responseObserver.onCompleted();
            }
         };   
      }
   }
}

此服务器代码是客户端流式传输的一个简单示例。服务器简单地跟踪客户端想要的书籍,最后,它提供订单的总购物车价值。

但是这里关于请求取消没有什么特别之处,因为这是客户端将调用的。所以,让我们看看客户端代码。

package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.Context;
import io.grpc.Context.CancellableContext;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreGrpc.BookStoreStub;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.Cart;

public class BookStoreClientStreamingClientCancelation {
   private static final Logger logger = Logger.getLogger(BookStoreClientStreamingClientCancelation.class.getName());
   private final BookStoreStub stub;
   StreamObserver<Book> streamClientSender;
   private CancellableContext withCancellation;
   public BookStoreClientStreamingClientCancelation(Channel channel) {
      stub = BookStoreGrpc.newStub(channel);
   }
   public StreamObserver<Cart> getServerResponseObserver(){
      StreamObserver<Cart> observer = new StreamObserver>Cart<(){
         @Override
         public void onNext(Cart cart) {
            logger.info("Order summary:" 
               + "\nTotal number of Books: " 
               + cart.getBooks() 
               + "\nTotal Order Value: " 
               + cart.getPrice());
         }
         @Override
         public void onError(Throwable t) {
            logger.info("Error while reading response from Server: " + t);
         }
         @Override
            public void onCompleted() {
         }
      };
      return observer;
   }
   public void addBook(String book) {
      logger.info("Adding book with title starting with: " + book);
      Book request = Book.newBuilder().setName(book).build();

      if(streamClientSender == null) {
         withCancellation = Context.current().withCancellation();
         streamClientSender = stub.totalCartValue(getServerResponseObserver());
      }
      try {
         streamClientSender.onNext(request);
      }
      catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
      }
   }
   public void completeOrder() {
      logger.info("Done, waiting for server to create order summary...");
      if(streamClientSender != null);
      streamClientSender.onCompleted();
   }
   public void cancelOrder() {
      withCancellation.cancel(null);
   }
   public static void main(String[] args) throws Exception {
      String serverAddress = "localhost:50051";
      ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().build();
      try {
         BookStoreClientStreamingClientCancelation client = new BookStoreClientStreamingClientCancelation(channel); String bookName = "";
         while(true) {
            System.out.println("Type book name to be added to the cart....");
            bookName = System.console().readLine();
            if(bookName.equals("EXIT")) {
               client.completeOrder();
               break;
            }
            if(bookName.equals("CANCEL")) {
               client.cancelOrder();
               break;
            }
            client.addBook(bookName);
         }
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

所以,如果我们看到上面的代码,以下行定义了一个启用了取消的上下文。

withCancellation = Context.current().withCancellation();

这是当用户输入取消时将调用的方法。这将取消订单,并让服务器知道。

public void cancelOrder() {
   withCancellation.cancel(null);
}

现在让我们看看它的运行情况。要运行代码,请启动两个shell。在第一个shell上执行以下命令启动服务器。

java -cp .\target\grpc-point-1.0.jar
com.tp.bookstore.BookeStoreServerClientStreaming

我们将看到以下输出:

输出

Jul 31, 2021 3:29:58 PM
com.tp.bookstore.BookeStoreServerClientStreaming start
INFO: Server started, listening on 50051

上述输出意味着服务器已启动。

现在,让我们启动客户端

java -cp .\target\grpc-point-1.0.jar
com.tp.bookstore.BookStoreClientStreamingClientCancelation

我们将得到以下输出:

输出

Type book name to be added to the cart....
Great
Jul 31, 2021 3:30:55 PM
com.tp.bookstore.BookStoreClientStreamingClientCancelation
addBook
INFO: Adding book with title starting with: Great

Type book name to be added to the cart....
CANCEL
Jul 31, 2021 3:30:58 PM
com.tp.bookstore.BookStoreClientStreamingClientCancelation$1
onError
INFO: Error while reading response from Server:

io.grpc.StatusRuntimeException: UNAVAILABLE: Channel
shutdownNow invoked

我们将在服务器日志中获得以下数据:

INFO: Searching for book with title starting with: Great
Jul 31, 2021 3:30:56 PM
com.tp.bookstore.BookeStoreServerClientStreaming$BookStoreImp
l$1 onNext
INFO: Found book, adding to cart:....
Jul 31, 2021 3:30:58 PM
com.tp.bookstore.BookeStoreServerClientStreaming$BookStoreImp
l$1 onError
INFO: Error while reading book stream:
io.grpc.StatusRuntimeException: CANCELLED: client cancelled

因此,正如我们所看到的,客户端启动了对其发送到服务器的请求的取消。服务器也收到了关于取消的通知。

gRPC - 发送/接收元数据

gRPC支持发送元数据。元数据基本上是我们想要发送的一组数据,这些数据不是业务逻辑的一部分,同时进行gRPC调用。

让我们看看以下两种情况:

  • 客户端发送元数据,服务器读取它。
  • 服务器发送元数据,客户端读取它。

我们将逐一介绍这两种情况。

客户端发送元数据

如前所述,gRPC支持客户端发送服务器可以读取的元数据。gRPC支持扩展客户端和服务器拦截器,这些拦截器可分别用于写入和读取元数据。让我们举一个例子来更好地理解它。这是我们的客户端代码,它发送主机名作为元数据:

让我们举一个例子来更好地理解它。这是我们的客户端代码,它发送主机名作为元数据:

示例

package com.tp.bookstore;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;

public class BookStoreClientUnaryBlockingMetadata {
   private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlockingMetadata.class.getName());
   private final BookStoreGrpc.BookStoreBlockingStub blockingStub;
   public BookStoreClientUnaryBlockingMetadata(Channel channel) {
      blockingStub = BookStoreGrpc.newBlockingStub(channel);
   }
   static class BookClientInterceptor implements ClientInterceptor {
      @Override
      public <ReqT, RespT> ClientCall<ReqT, RespT>
      interceptCall(
         MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next
      ) {
         return new 
         ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
            @Override
            public void start(Listener<RespT> responseListener, Metadata headers) {
               logger.info("Added metadata");
               headers.put(Metadata.Key.of("HOSTNAME", ASCII_STRING_MARSHALLER), "MY_HOST");
               super.start(responseListener, headers);
            }
         };
      }
   }
   public void getBook(String bookName) {
      logger.info("Querying for book with title: " + bookName); 
      BookSearch request = BookSearch.newBuilder().setName(bookName).build(); 
      Book response; 
      CallOptions.Key<String> metaDataKey = CallOptions.Key.create("my_key");
      try {
         response = blockingStub.withOption(metaDataKey, "bar").first(request);
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
         return;
      }
      logger.info("Got following book from server: " + response);
   } 
   public static void main(String[] args) throws Exception {
      String bookName = args[0];
      String serverAddress = "localhost:50051";
      ManagedChannel channel =  ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().intercept(new BookClientInterceptor()).build();
      try {
         BookStoreClientUnaryBlockingMetadata client = new BookStoreClientUnaryBlockingMetadata(channel);
         client.getBook(bookName);
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

这里有趣的部分是拦截器

static class BookClientInterceptor implements ClientInterceptor{
   @Override
   public <ReqT, RespT> ClientCall<ReqT, RespT>
   interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
      return new 
      ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
         @Override
         public void start(Listener<RespT>responseListener, Metadata headers) {
            logger.info("Added metadata");
            headers.put(Metadata.Key.of("HOSTNAME", ASCII_STRING_MARSHALLER), "MY_HOST");
            super.start(responseListener, headers);
         }
      };
   }
}

我们拦截客户端发出的任何调用,然后在进一步调用之前向其添加主机名元数据。

服务器读取元数据

现在,让我们看看读取此元数据的服务器代码:

package com.tp.bookstore;

import io.grpc.CallOptions;
import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;

public class BookeStoreServerMetadata {
   private static final Logger logger = Logger.getLogger(BookeStoreServerMetadata.class.getName());
 
   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby").setAuthor("Scott Fitzgerald").setPrice(300).build());
   }
   private Server server;
   class BookServerInterceptor implements ServerInterceptor{
      @Override
      public <ReqT, RespT> Listener<ReqT> 
      interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
 
         logger.info("Recieved following metadata: " + headers);
         return next.startCall(call, headers);
      } 
   }
   private void start() throws IOException {
      int port = 50051; server = ServerBuilder.forPort(port).addService(new BookStoreImpl()).intercept(new BookServerInterceptor()).build().start();
      logger.info("Server started, listening on " + port);
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         public void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   public static void main(String[] args) throws IOException, InterruptedException {
      final BookeStoreServerMetadata greetServer = new BookeStoreServerMetadata();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
      @Override
      public void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
         logger.info("Searching for book with title: " + searchQuery.getName());
         List<String> matchingBookTitles = bookMap.keySet().stream().filter(title ->title.startsWith(searchQuery.getName().trim())).collect(Collectors.toList());
         Book foundBook = null;
         if(matchingBookTitles.size() > 0) {
            foundBook = bookMap.get(matchingBookTitles.get(0));
         }
         responseObserver.onNext(foundBook);
         responseObserver.onCompleted();
      }
   }
}

同样,这里有趣的部分是拦截器

class BookServerInterceptor implements ServerInterceptor{
   @Override
   public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,ServerCallHandler<ReqT, RespT> next) {
      logger.info("Recieved following metadata: " + headers);
      return next.startCall(call, headers);
   }
}

我们拦截任何传入服务器的调用,然后在实际方法处理调用之前记录元数据。

客户端-服务器调用

现在让我们看看它的运行情况。要运行代码,请启动两个shell。在第一个shell上执行以下命令启动服务器:

java -cp .\target\grpc-point-1.0.jar 
com.tp.bookstore.BookeStoreServerMetadata

我们将看到以下输出 -

输出

Jul 31, 2021 5:29:14 PM 
com.tp.bookstore.BookeStoreServerMetadata start
INFO: Server started, listening on 50051

上述输出意味着服务器已启动。

现在,让我们启动客户端。

java -cp .\target\grpc-point-1.0.jar 
com.tp.bookstore.BookStoreClientUnaryBlockingMetadata Great

我们将看到以下输出 -

输出

Jul 31, 2021 5:29:39 PM 
com.tp.bookstore.BookStoreClientUnaryBlockingMetadata getBook
INFO: Querying for book with title: Great
Jul 31, 2021 5:29:39 PM 
com.tp.bookstore.BookStoreClientUnaryBlockingMetadata$BookCli
entInterceptor$1 start
INFO: Added metadata
Jul 31, 2021 5:29:40 PM 
com.tp.bookstore.BookStoreClientUnaryBlockingMetadata getBook
INFO: Got following book from server: name: "Great Gatsby"
author: "Scott Fitzgerald"
price: 300

我们将在服务器日志中获得以下数据:

Jul 31, 2021 5:29:40 PM 
com.tp.bookstore.BookeStoreServerMetadata$BookServerIntercept
or interceptCall
INFO: Recieved following metadata:
Metadata(content-type=application/grpc,user-agent=grpc-java-netty/1.38.0,hostname=MY_HOST,grpc-accept-encoding=gzip)
Jul 31, 2021 5:29:40 PM 
com.tp.bookstore.BookeStoreServerMetadata$BookStoreImpl first
INFO: Searching for book with title: Great

正如我们所看到的,服务器能够读取元数据:hostname=MY_HOST,该元数据由客户端添加。

广告