gRPC - 双向RPC



现在让我们看看在使用 gRPC 通信时客户端-服务器流式是如何工作的。在这种情况下,客户端将搜索书籍并添加到购物车。每次添加书籍时,服务器都会以实时购物车值进行响应。

.proto 文件

首先,让我们在 **common_proto_files** 中定义 **bookstore.proto** 文件 -

syntax = "proto3";
option java_package = "com.tp.bookstore";
service BookStore {
   rpc liveCartValue (stream Book) returns (stream Cart) {}
}
message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}
message Cart {
   int32 books = 1;
   int32 price = 2;
}

以下代码块表示服务名称 **"BookStore"** 和可调用的函数名称 "liveCartValue"。**"liveCartValue"** 函数接收类型为 **"Book"** 的输入,该输入是一个流。该函数返回一个类型为 **"Cart"** 的对象流。因此,实际上,我们允许客户端以流式方式添加书籍,并且每当添加新书籍时,服务器都会将当前购物车值响应给客户端。

service BookStore {
   rpc liveCartValue (stream Book) returns (stream Cart) {}
}

现在让我们看看这些类型。

message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}

客户端将发送它想要购买的 **"Book"**。它不需要是完整的书籍信息;它可以仅仅是书籍的标题。

message Cart {
   int32 books = 1;
   int32 price = 2;
}

服务器在获取书籍列表后,将返回 **"Cart"** 对象,它仅仅是客户端已购买的书籍总数和总价。

请注意,我们已经完成了 Maven 设置,以便自动生成我们的类文件以及我们的 RPC 代码。因此,现在我们可以简单地编译我们的项目。

mvn clean install

这应该会自动生成我们使用 gRPC 所需的源代码。源代码将放置在

Protobuf class code: target/generatedsources/protobuf/java/com.tp.bookstore
Protobuf gRPC code: target/generated-sources/protobuf/grpcjava/com.tp.bookstore

设置 gRPC 服务器

现在我们已经定义了包含函数定义的 proto 文件,让我们设置一个可以调用这些函数的服务器。

让我们编写我们的服务器代码来服务上述函数,并将其保存在 **com.tp.bookstore.BookeStoreServerBothStreaming.java** 中 -

示例

package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;

public class BookeStoreServerBothStreaming {
   private static final Logger logger =Logger.getLogger(BookeStoreServerBothStreaming.class.getName());

   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
         .setAuthor("Scott Fitzgerald")
         .setPrice(300).build());
      bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
         .setAuthor("Harper Lee")
         .setPrice(400).build());
      bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
         .setAuthor("E.M.Forster")
         .setPrice(500).build());
      bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
         .setAuthor("Scott Fitzgerald")
         .setPrice(600).build());
      bookMap.put("Go Set a Watchman",Book.newBuilder().setName("Go Set a Watchman")
         .setAuthor("Harper Lee")
         .setPrice(700).build());
   }
   private Server server;
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port)
         .addService(new BookStoreImpl()).build().start();

      logger.info("Server started, listening on " + port);
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         public void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
               } catch (InterruptedException e) {
                  e.printStackTrace(System.err);
            }
         }
      });
   }
   public static void main(String[] args) throws IOException,InterruptedException {
      final BookeStoreServerBothStreaming greetServer = newBookeStoreServerBothStreaming();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extendsBookStoreGrpc.BookStoreImplBase {
      @Override
      public StreamObserver<Book>liveCartValue(StreamObserver<Cart> responseObserver) {
         return new StreamObserver<Book>() {
            ArrayList<Book> bookCart = new ArrayList<Book>();
            int cartValue = 0;
            @Override
            public void onNext(Book book) {
               logger.info("Searching for book with titlestarting with: " + book.getName());
               for (Entry<String, Book> bookEntry :bookMap.entrySet()) {
                  if(bookEntry.getValue().getName().startsWith(book.getName())){
                     logger.info("Found book, adding tocart:....");
                     bookCart.add(bookEntry.getValue());
                     cartValue +=bookEntry.getValue().getPrice();
                  }
               }
               logger.info("Updating cart value...");

               responseObserver.onNext(Cart.newBuilder()
                  .setPrice(cartValue)
                  .setBooks(bookCart.size()).build());
            }
            @Override
            public void onError(Throwable t) {
               logger.info("Error while reading book stream: " + t);
            }
            @Override
            public void onCompleted() {
               logger.info("Order completed");
               responseObserver.onCompleted();
            }
         };
      }
   }
}

以上代码在指定端口启动一个 gRPC 服务器,并服务于我们在 **proto** 文件中编写的函数和服务。让我们逐步了解以上代码 -

  • 从 **main** 方法开始,我们在指定端口创建一个 gRPC 服务器。

  • 但在启动服务器之前,我们将要运行的服务分配给服务器,即在我们的例子中,是 **BookStore** 服务。

  • 为此,我们需要将服务实例传递给服务器,因此我们继续创建服务实例,即在我们的例子中是 **BookStoreImpl**

  • 服务实例需要提供 **proto** 文件中存在的 method/function 的实现,即在我们的例子中是 **totalCartValue** 方法。

  • 现在,鉴于这是服务器和客户端流式的情况,服务器将获得客户端添加的 **Books** 列表(在 **proto** 文件中定义)。因此,服务器返回一个自定义流观察者。此流观察者实现了在找到新 Book 时会发生什么以及在流关闭时会发生什么。

  • 当客户端添加一个 Book 时,gRPC 框架将调用 **onNext()** 方法。此时,服务器将其添加到购物车并使用响应观察者返回购物车值。在流式传输的情况下,服务器不会等待所有有效书籍都可用。

  • 当客户端完成添加书籍后,将调用流观察者的 **onCompleted()** 方法。此方法实现了服务器在客户端完成添加 **Books** 后想要执行的操作,即声明它已完成接收客户端订单。

  • 最后,我们还有一个关闭钩子,以确保在完成执行代码后干净地关闭服务器。

设置 gRPC 客户端

现在我们已经编写了服务器的代码,让我们设置一个可以调用这些函数的客户端。

让我们编写我们的客户端代码来调用上述函数,并将其保存在 **com.tp.bookstore.BookStoreClientBothStreaming.java** 中 -

示例

package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreGrpc.BookStoreFutureStub;
import com.tp.bookstore.BookStoreGrpc.BookStoreStub;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;

public class BookStoreClientBothStreaming {
   private static final Logger logger = Logger.getLogger(BookStoreClientBothStreaming.class.getName());
   private final BookStoreStub stub;
   private boolean serverIntermediateResponseCompleted = true;
   private boolean serverResponseCompleted = false;

   StreamObserver<Book> streamClientSender;
   
   public BookStoreClientBothStreaming(Channel channel) {
      stub = BookStoreGrpc.newStub(channel);
   }
   public StreamObserver>Cart< getServerResponseObserver(){
      StreamObserver>Cart< observer = new StreamObserver<Cart>(){
         @Override
         public void onNext(Cart cart) {
            logger.info("Order summary:" + 
               "\nTotal number of Books:" + cart.getBooks()+ 
               "\nTotal Order Value:" cart.getPrice());

            serverIntermediateResponseCompleted = true;
         }
         @Override
         public void onError(Throwable t) {
            logger.info("Error while reading response fromServer: " + t);
         }
         @Override
         public void onCompleted() {
            //logger.info("Server: Done reading orderreading cart");
            serverResponseCompleted = true;
         }
      };
      return observer;
   }
   public void addBook(String book) {
      logger.info("Adding book with title starting with: " + book);
      Book request = Book.newBuilder().setName(book).build();
      if(streamClientSender == null) {
         streamClientSender =stub.liveCartValue(getServerResponseObserver());
      }
      try {
         streamClientSender.onNext(request);
      }
      catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
      }
   }
   public void completeOrder() {
      logger.info("Done, waiting for server to create ordersummary...");
      if(streamClientSender != null); {
         streamClientSender.onCompleted();
      }
   }
   public static void main(String[] args) throws Exception {
      String serverAddress = "localhost:50051";
      ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
         .usePlaintext()
         .build();
      try {
         BookStoreClientBothStreaming client = new
         BookStoreClientBothStreaming(channel);
         String bookName = "";

         while(true) {
            if(client.serverIntermediateResponseCompleted ==true) {
               System.out.println("Type book name to beadded to the cart....");
               bookName = System.console().readLine();
               if(bookName.equals("EXIT")) {
                  client.completeOrder();
                  break;
               }
               client.serverIntermediateResponseCompleted = false;
               client.addBook(bookName);
               Thread.sleep(500);
            }
         }
         while(client.serverResponseCompleted == false) {
            Thread.sleep(2000);
         }
            
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }   
   }
}

以上代码启动一个 gRPC 客户端,并连接到指定端口的服务器,并调用我们在 **proto** 文件中编写的函数和服务。让我们逐步了解以上代码 -

  • 从 **main** 方法开始,我们接受要添加到购物车的书籍名称。一旦所有书籍都将被添加,用户预计会打印“EXIT”。

  • 我们为与服务器的 gRPC 通信设置了一个 Channel。

  • 接下来,我们使用 channel 创建一个非阻塞存根。在这里,我们选择要调用其函数的服务 **"BookStore"**。

  • 然后,我们简单地创建 **proto** 文件中定义的预期输入,即在我们的例子中是 **Book**,并添加我们希望服务器添加的标题。

  • 但是鉴于这是服务器和客户端流式的情况,我们首先为服务器创建一个 **流观察者**。此服务器流观察者列出了服务器响应时需要执行的操作,即 **onNext()** 和 **onCompleted()**。

  • 并且使用存根,我们也获得了客户端流观察者。我们使用此流观察者发送数据,即要添加到购物车的 **Book**。

  • 并且一旦我们的订单完成,我们确保客户端流观察者已关闭。这告诉服务器关闭流并执行清理。

  • 最后,我们关闭 channel 以避免任何资源泄漏。

所以,那是我们的客户端代码。

客户端服务器调用

总而言之,我们要做的是以下几点 -

  • 启动 gRPC 服务器。

  • 客户端通过通知服务器添加书籍流。

  • 服务器在其商店中搜索书籍并将其添加到购物车。

  • 每次添加书籍时,服务器都会告诉客户端购物车值。

  • 当客户端完成订购时,服务器和客户端都关闭流。

现在我们已经定义了 **proto** 文件,编写了服务器和客户端代码,让我们现在执行此代码并查看实际效果。

要运行代码,请启动两个 shell。通过执行以下命令在第一个 shell 中启动服务器 -

java -cp .\target\grpc-point-1.0.jar
com.tp.bookstore.BookeStoreServerClientStreaming

我们将看到以下输出 -

输出

Jul 03, 2021 10:37:21 PM
com.tp.bookstore.BookeStoreServerStreaming start
INFO: Server started, listening on 50051

此输出表示服务器已启动。

现在,让我们启动客户端。

java -cp .\target\grpc-point-1.0.jar
com.tp.bookstore.BookStoreClientBothStreaming

让我们向客户端添加一本书。

Jul 24, 2021 7:21:45 PM
com.tp.bookstore.BookStoreClientBothStreaming main
Type book name to be added to the cart....
Great

Jul 24, 2021 7:21:48 PM
com.tp.bookstore.BookStoreClientBothStreaming addBook
INFO: Adding book with title starting with: Gr

Jul 24, 2021 7:21:48 PM
com.tp.bookstore.BookStoreClientBothStreaming$1 onNext
INFO: Order summary:

Total number of Books: 1
Total Order Value: 300

因此,正如我们所看到的,我们获得了订单的当前购物车值。现在让我们向客户端再添加一本书。

Type book name to be added to the cart....
Passage

Jul 24, 2021 7:21:51 PM
com.tp.bookstore.BookStoreClientBothStreaming addBook
INFO: Adding book with title starting with: Pa

Jul 24, 2021 7:21:51 PM
com.tp.bookstore.BookStoreClientBothStreaming$1 onNext
INFO: Order summary:
Total number of Books: 2
Total Order Value: 800

一旦我们添加了书籍并输入“EXIT”,客户端将关闭。

Type book name to be added to the cart....
EXIT
Jul 24, 2021 7:21:59 PM
com.tp.bookstore.BookStoreClientBothStreaming completeOrder
INFO: Done, waiting for server to create order summary...

因此,正如我们所看到的,客户端能够添加书籍。并且随着书籍的添加,服务器会以当前购物车值进行响应。

广告