gRPC - 服务器流式RPC



现在让我们讨论一下在使用 gRPC 通信时服务器流是如何工作的。在这种情况下,客户端将搜索具有给定作者的书籍。假设服务器需要一些时间来遍历所有书籍。服务器不会等到遍历完所有书籍后才提供所有书籍,而是会以流式的方式提供书籍,即一旦找到一本书就提供一本书。

.proto 文件

首先,让我们在 common_proto_files 中定义 bookstore.proto 文件 -

syntax = "proto3";
option java_package = "com.tp.bookstore";
service BookStore {
   rpc first (BookSearch) returns (stream Book) {}
}
message BookSearch {
   string name = 1;
   string author = 2;
   string genre = 3;
}
message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}

以下代码块表示服务名称 "BookStore" 和可以调用的函数名称 "searchByAuthor"。 "searchByAuthor" 函数接收类型为 "BookSearch" 的输入,并返回类型为 "Book" 的流。因此,实际上,我们允许客户端搜索标题并返回与查询的作者匹配的书籍之一。

service BookStore {
   rpc searchByAuthor (BookSearch) returns (stream Book) {}
}

现在让我们看看这些类型。

message BookSearch {
   string name = 1;
   string author = 2;
   string genre = 3;
}

在这里,我们定义了 BookSearch,它包含一些属性,如 nameauthorgenre。客户端应该将类型为 "BookSearch" 的对象发送到服务器。

message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}

我们还定义了,给定一个 "BookSearch",服务器将返回一个 "Book" 流,其中包含书籍属性以及书籍的价格。服务器应该发送一个 "Book" 流。

请注意,我们已经完成了 Maven 设置,用于自动生成我们的类文件以及我们的 RPC 代码。因此,现在我们可以简单地编译我们的项目 -

mvn clean install

这应该会自动生成我们使用 gRPC 所需的源代码。源代码将放置在 -

Protobuf class code: target/generated-sources/protobuf/java/com.tp.bookstore
Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.bookstore

设置 gRPC 服务器

现在我们已经定义了包含函数定义的 proto 文件,让我们设置一个可以为这些函数提供服务的服务器。

让我们编写我们的服务器代码来服务上述函数,并将其保存在 com.tp.bookstore.BookeStoreServerStreaming.java 中 -

示例

package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;

public class BookeStoreServerUnary {
   private static final Logger logger = Logger.getLogger(BookeStoreServerrStreaming.class.getName());
 
   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
         .setAuthor("Scott Fitzgerald")
         .setPrice(300).build());
      bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
         .setAuthor("Harper Lee")
         .setPrice(400).build());
      bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
         .setAuthor("E.M.Forster")
         .setPrice(500).build());
      bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
         .setAuthor("Scott Fitzgerald")
         .setPrice(600).build());
      bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman")
         .setAuthor("Harper Lee")
         .setPrice(700).build());
   }
   private Server server;
   
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port)
         .addService(new BookStoreImpl()).build().start();
 
      logger.info("Server started, listening on " + port);
 
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         public void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   public static void main(String[] args) throws IOException, InterruptedException {
      final BookeStoreServerUnary greetServer = new BookeStoreServerUnary();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
      @Override
      public void searchByAuthor(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
         logger.info("Searching for book with author: " + searchQuery.getAuthor());
         for (Entry<String, Book> bookEntry : bookMap.entrySet()) {
            try {
               logger.info("Going through more books....");
               Thread.sleep(5000);
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
    
            if(bookEntry.getValue().getAuthor().startsWith(searchQuery.getAuthor())){
               logger.info("Found book with required author: " + bookEntry.getValue().getName()+ ". Sending....");

               responseObserver.onNext(bookEntry.getValue());
            } 
         }
         responseObserver.onCompleted();
      }
   }
}

以上代码在指定端口启动 gRPC 服务器,并为我们在 proto 文件中编写的函数和服务提供服务。让我们一起浏览以上代码 -

  • main 方法开始,我们在指定端口创建一个 gRPC 服务器。

  • 但在启动服务器之前,我们将要运行的服务分配给服务器,即在我们的例子中,是 BookStore 服务。

  • 为此,我们需要将服务实例传递给服务器,因此我们继续创建服务实例,即在我们的例子中,是 BookStoreImpl

  • 服务实例需要提供 .proto 文件中存在的方法/函数的实现,即在我们的例子中,是 searchByAuthor 方法。

  • 该方法期望一个类型为 .proto 文件中定义的对象,即对于我们来说是 BookSearch

  • 请注意,我们添加了一个 sleep 来模拟遍历所有书籍的操作。在流式传输的情况下,服务器不会等待所有搜索到的书籍都可用。它在可用时通过使用 onNext() 调用返回书籍。

  • 当服务器完成请求时,它通过调用 onCompleted() 关闭通道。

  • 最后,我们还有一个关闭钩子,以确保在完成执行代码时服务器能够干净地关闭。

设置 gRPC 客户端

现在我们已经编写了服务器的代码,让我们设置一个可以调用这些函数的客户端。

让我们编写我们的客户端代码来调用上述函数,并将其保存在 com.tp.bookstore.BookStoreClientServerStreamingBlocking.java 中 -

示例

package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;

public class BookStoreClientServerStreamingBlocking {
   private static final Logger logger = Logger.getLogger(BookStoreClientServerStreamingBlocking.class.getName());
   private final BookStoreGrpc.BookStoreBlockingStub blockingStub;
	public BookStoreClientServerStreamingBlocking(Channel channel) {
      blockingStub = BookStoreGrpc.newBlockingStub(channel);
   }
   public void getBook((String author) {
      logger.info("Querying for book with author: " + author);
      BookSearch request = BookSearch.newBuilder()..setAuthor(author).build();
      Iterator<Book> response; 
      try {
         response = blockingStub.searchByAuthor(request);
         while(response.hasNext()) {
            logger.info("Found book: " + response.next());
         }
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
         return;
      }
   }
   public static void main(String[] args) throws Exception {
      String authorName = args[0];
      String serverAddress = "localhost:50051";
	   
      ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
        .usePlaintext()
        .build();
 
      try {
         BookStoreClientServerStreamingBlocking client = new BookStoreClientUnaryBlocking(channel);
         client.getBook(authorName);
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

以上代码在指定端口启动 gRPC 服务器,并为我们在 proto 文件中编写的函数和服务提供服务。让我们一起浏览以上代码 -

  • main 方法开始,我们接受一个参数,即我们要搜索的书籍的 title

  • 我们为与服务器的 gRPC 通信设置了一个 Channel。

  • 然后,我们使用 channel 创建一个 阻塞存根。在这里,我们选择服务 "BookStore",我们计划调用其函数。

  • 然后,我们简单地创建 .proto 文件中定义的预期输入,即在我们的例子中是 BookSearch,并添加我们希望服务器搜索的标题。

  • 我们最终进行调用并获取有效 Books 上的迭代器。当我们迭代时,我们得到服务器提供的相应 Books。

  • 最后,我们关闭 channel 以避免任何资源泄漏。

所以,这就是我们的客户端代码。

客户端服务器调用

总而言之,我们要做的是以下内容 -

  • 启动 gRPC 服务器。

  • 客户端查询服务器以获取具有给定作者的书籍。

  • 服务器在其存储中搜索书籍,这是一个耗时的过程。

  • 服务器在找到符合给定条件的书籍时做出响应。服务器不会等待所有有效的书籍都可用。它在找到一本书时立即发送输出。然后重复此过程。

现在,我们已经定义了我们的 proto 文件,编写了我们的服务器和客户端代码,让我们继续执行此代码并查看实际情况。

要运行代码,请启动两个 shell。通过执行以下命令在第一个 shell 上启动服务器 -

java -cp .\target\grpc-point-1.0.jar 
com.tp.bookstore.BookeStoreServerStreaming

我们将看到以下输出 -

输出

Jul 03, 2021 10:37:21 PM 
com.tp.bookstore.BookeStoreServerStreaming start
INFO: Server started, listening on 50051

以上输出表示服务器已启动。

现在,让我们启动客户端。

java -cp .\target\grpc-point-1.0.jar 
com.tp.bookstore.BookStoreClientServerStreamingBlocking "Har"

我们将看到以下输出 -

输出

Jul 03, 2021 10:40:31 PM 
com.tp.bookstore.BookStoreClientServerStreamingBlocking 
getBook
INFO: Querying for book with author: Har

Jul 03, 2021 10:40:37 PM 
com.tp.bookstore.BookStoreClientServerStreamingBlocking 
getBook
INFO: Found book: name: "Go Set a Watchman"
author: "Harper Lee"
price: 700

Jul 03, 2021 10:40:42 PM 
com.tp.bookstore.BookStoreClientServerStreamingBlocking 
getBook
INFO: Found book: name: "To Kill MockingBird"
author: "Harper Lee"
price: 400

因此,正如我们所看到的,客户端能够通过使用书籍名称查询服务器来获取书籍详细信息。但更重要的是,客户端在不同的时间戳获取了第一本书和第二本书,即大约 5 秒的间隔。

广告