gRPC - 超时 & 取消



gRPC 支持为请求分配超时时间。这是一种执行请求取消的方式。它有助于避免为客户端和服务器使用资源,因为这些资源对于客户端来说其结果将毫无用处。

请求超时

gRPC 支持为客户端和服务器指定超时时间。

  • 客户端可以在运行时指定它希望在取消请求之前等待的时间。

  • 服务器也可以在其端检查是否需要处理请求,或者客户端是否已放弃该请求。

让我们举一个例子,客户端期望在 2 秒内收到响应,但服务器需要更长时间。所以,这是我们的服务器代码

示例

package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;

public class BookeStoreServerUnaryTimeout {
   private static final Logger logger = Logger.getLogger(BookeStoreServerUnaryTimeout.class.getName());

   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby",Book.newBuilder().setName("Great Gatsby")
        .setAuthor("Scott Fitzgerald")
        .setPrice(300).build());
      bookMap.put("To Kill MockingBird",Book.newBuilder().setName("To Kill MockingBird")
        .setAuthor("Harper Lee")
        .setPrice(400).build());
      bookMap.put("Passage to India",Book.newBuilder().setName("Passage to India")
        .setAuthor("E.M.Forster")
        .setPrice(500).build());
      bookMap.put("The Side of Paradise",Book.newBuilder().setName("The Side of Paradise")
        .setAuthor("Scott Fitzgerald")
        .setPrice(600).build());
      bookMap.put("Go Set a Watchman",Book.newBuilder().setName("Go Set a Watchman")
        .setAuthor("Harper Lee")
        .setPrice(700).build());
   }
   private Server server;
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port)
         .addService(new BookStoreImpl()).build().start();

      logger.info("Server started, listening on " + port);
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         public void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30,TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   public static void main(String[] args) throws IOException,InterruptedException {
      final BookeStoreServerUnaryTimeout greetServer = new
      BookeStoreServerUnaryTimeout();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extends
   BookStoreGrpc.BookStoreImplBase {
      @Override
      public void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
         logger.info("Searching for book with title: " +searchQuery.getName());
         logger.info("This may take more time...");
         try {
            Thread.sleep(5000);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
         List<String> matchingBookTitles = bookMap.keySet().stream()
            .filter(title ->
         title.startsWith(searchQuery.getName().trim()))
            .collect(Collectors.toList());
         Book foundBook = null;
         if(matchingBookTitles.size() > 0) {
            foundBook = bookMap.get(matchingBookTitles.get(0));
         }
         responseObserver.onNext(foundBook);
         responseObserver.onCompleted();
      }
   }
}

在上面的代码中,服务器搜索客户端提供的标题对应的书籍。我们添加了一个虚拟休眠,以便我们可以看到请求超时。

这是我们的客户端代码

示例

package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;

public class BookStoreClientUnaryBlockingTimeout {
   private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlockingTimeout.class.getName());
   private final BookStoreGrpc.BookStoreBlockingStubblockingStub;
   public BookStoreClientUnaryBlockingTimeout(Channel channel){
      blockingStub = BookStoreGrpc.newBlockingStub(channel);
   }
   public void getBook(String bookName) {
      logger.info("Querying for book with title: " + bookName);
      BookSearch request =BookSearch.newBuilder().setName(bookName).build();
      Book response;
      try {
         response = blockingStub.withDeadlineAfter(2,TimeUnit.SECONDS).first(request);
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}",e.getStatus());
         return;
      }
      logger.info("Got following book from server: " +response);
   }
   public static void main(String[] args) throws Exception {
      String bookName = args[0];
      String serverAddress = "localhost:50051";
      ManagedChannel channel =ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().build();
      try {
         BookStoreClientUnaryBlockingTimeout client = newBookStoreClientUnaryBlockingTimeout(channel);
         client.getBook(bookName);
      } finally {
         channel.shutdownNow().awaitTermination(5,
         TimeUnit.SECONDS);
      }
   }
}

上面的代码使用标题调用服务器进行搜索。但更重要的是,它为 gRPC 调用提供了2 秒的超时时间

现在让我们看看它的实际操作。要运行代码,请启动两个 shell。在第一个 shell 上启动服务器,执行以下命令:

java -cp .\target\grpc-point-1.0.jar
com.tp.bookstore.BookeStoreServerUnaryTimeout

我们会看到以下输出:

输出

Jul 31, 2021 12:29:31 PM
com.tp.bookstore.BookeStoreServerUnaryTimeout start
INFO: Server started, listening on 50051

以上输出表明服务器已启动。

Jul 31, 2021 12:29:35 PM
com.tp.bookstore.BookeStoreServerUnaryTimeout$BookStoreImpl
first
INFO: Searching for book with title: Great

Jul 31, 2021 12:29:35 PM
com.tp.bookstore.BookeStoreServerUnaryTimeout$BookStoreImpl
first
INFO: This may take more time...

现在,让我们启动客户端。

java -cp .\target\grpc-point-1.0.jar
com.tp.bookstore.BookStoreClientUnaryBlockingTimeout Great

我们会得到以下输出:

输出

Jul 31, 2021 12:29:34 PM
com.tp.bookstore.BookStoreClientUnaryBlockingTimeout getBook

INFO: Querying for book with title: Great
Jul 31, 2021 12:29:36 PM
com.tp.bookstore.BookStoreClientUnaryBlockingTimeout getBook
WARNING: RPC failed: Status{code=DEADLINE_EXCEEDED,
description=deadline exceeded after 1.970455800s.
[buffered_nanos=816522700,
remote_addr=localhost/127.0.0.1:50051], cause=null}

因此,正如我们所看到的,客户端在 2 秒内没有收到响应,因此它取消了请求并将其称为超时,即DEADLINE_EXCEEDED

请求取消

gRPC 支持从客户端和服务器端取消请求。客户端可以在运行时指定它希望在取消请求之前等待的时间。服务器也可以在其端检查是否需要处理请求,或者客户端是否已放弃该请求。

让我们看一个客户端流式传输的例子,其中客户端调用取消。所以,这是我们的服务器代码

示例

package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;

public class BookeStoreServerClientStreaming {
   private static final Logger logger = Logger.getLogger(BookeStoreServerClientStreaming.class.getName());
   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby").setAuthor("Scott   Fitzgerald").setPrice(300).build());
      bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird").setAuthor("Harper Lee")
        .setPrice(400).build());
      bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India").setAuthor("E.M.Forster")
         .setPrice(500).build());
      bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise").setAuthor("Scott Fitzgerald")
         .setPrice(600).build());
      bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman").setAuthor("Harper Lee")
         .setPrice(700).build());
   }
   private Server server;
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port).addService(new BookStoreImpl()).build().start();

      logger.info("Server started, listening on " + port);
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         public void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   public static void main(String[] args) throws IOException,InterruptedException {
      final BookeStoreServerClientStreaming greetServer = newBookeStoreServerClientStreaming();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extendsBookStoreGrpc.BookStoreImplBase {
      @Override
      public StreamObserver<Book>
      totalCartValue(StreamObserver<Cart> responseObserver) {
         return new StreamObserver<Book>() {
            ArrayList<Book> bookCart = new ArrayList<Book>();
            @Override
            public void onNext(Book book) {
               logger.info("Searching for book with titlestarting with: " + book.getName());
               for (Entry<String, Book> bookEntry : bookMap.entrySet()) {
                  if(bookEntry.getValue().getName().startsWith(book.getName())){
                     logger.info("Found book, adding to cart:....");
                     bookCart.add(bookEntry.getValue());
                  }
               }
            }
            @Override
            public void onError(Throwable t) {
               logger.info("Error while reading book stream:" + t);
            }
            @Override
            public void onCompleted() {
               int cartValue = 0;
               for (Book book : bookCart) {
                  cartValue += book.getPrice();
               }
               responseObserver.onNext(Cart.newBuilder().setPrice(cartValue).setBooks(bookCart.size()).build());
               responseObserver.onCompleted();
            }
         };   
      }
   }
}

此服务器代码是一个客户端端流式传输的简单示例。服务器只是跟踪客户端想要的书籍,最后它提供订单的总购物车价值。

但是这里在请求取消方面没有什么特别的,因为这是客户端将调用的操作。所以,让我们看看客户端代码。

package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.Context;
import io.grpc.Context.CancellableContext;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreGrpc.BookStoreStub;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.Cart;

public class BookStoreClientStreamingClientCancelation {
   private static final Logger logger = Logger.getLogger(BookStoreClientStreamingClientCancelation.class.getName());
   private final BookStoreStub stub;
   StreamObserver<Book> streamClientSender;
   private CancellableContext withCancellation;
   public BookStoreClientStreamingClientCancelation(Channel channel) {
      stub = BookStoreGrpc.newStub(channel);
   }
   public StreamObserver<Cart> getServerResponseObserver(){
      StreamObserver<Cart> observer = new StreamObserver>Cart<(){
         @Override
         public void onNext(Cart cart) {
            logger.info("Order summary:" 
               + "\nTotal number of Books: " 
               + cart.getBooks() 
               + "\nTotal Order Value: " 
               + cart.getPrice());
         }
         @Override
         public void onError(Throwable t) {
            logger.info("Error while reading response from Server: " + t);
         }
         @Override
            public void onCompleted() {
         }
      };
      return observer;
   }
   public void addBook(String book) {
      logger.info("Adding book with title starting with: " + book);
      Book request = Book.newBuilder().setName(book).build();

      if(streamClientSender == null) {
         withCancellation = Context.current().withCancellation();
         streamClientSender = stub.totalCartValue(getServerResponseObserver());
      }
      try {
         streamClientSender.onNext(request);
      }
      catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
      }
   }
   public void completeOrder() {
      logger.info("Done, waiting for server to create order summary...");
      if(streamClientSender != null);
      streamClientSender.onCompleted();
   }
   public void cancelOrder() {
      withCancellation.cancel(null);
   }
   public static void main(String[] args) throws Exception {
      String serverAddress = "localhost:50051";
      ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().build();
      try {
         BookStoreClientStreamingClientCancelation client = new BookStoreClientStreamingClientCancelation(channel); String bookName = "";
         while(true) {
            System.out.println("Type book name to be added to the cart....");
            bookName = System.console().readLine();
            if(bookName.equals("EXIT")) {
               client.completeOrder();
               break;
            }
            if(bookName.equals("CANCEL")) {
               client.cancelOrder();
               break;
            }
            client.addBook(bookName);
         }
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

因此,如果我们查看上面的代码,以下行定义了一个启用了取消功能的上下文。

withCancellation = Context.current().withCancellation();

这是用户键入 CANCEL 时将调用的方法。这将取消订单并让服务器知道。

public void cancelOrder() {
   withCancellation.cancel(null);
}

现在让我们看看它的实际操作。要运行代码,请启动两个 shell。在第一个 shell 上启动服务器,执行以下命令。

java -cp .\target\grpc-point-1.0.jar
com.tp.bookstore.BookeStoreServerClientStreaming

我们会看到以下输出:

输出

Jul 31, 2021 3:29:58 PM
com.tp.bookstore.BookeStoreServerClientStreaming start
INFO: Server started, listening on 50051

以上输出表示服务器已启动。

现在,让我们启动客户端

java -cp .\target\grpc-point-1.0.jar
com.tp.bookstore.BookStoreClientStreamingClientCancelation

我们会得到以下输出:

输出

Type book name to be added to the cart....
Great
Jul 31, 2021 3:30:55 PM
com.tp.bookstore.BookStoreClientStreamingClientCancelation
addBook
INFO: Adding book with title starting with: Great

Type book name to be added to the cart....
CANCEL
Jul 31, 2021 3:30:58 PM
com.tp.bookstore.BookStoreClientStreamingClientCancelation$1
onError
INFO: Error while reading response from Server:

io.grpc.StatusRuntimeException: UNAVAILABLE: Channel
shutdownNow invoked

并且我们将在服务器日志中获得以下数据:

INFO: Searching for book with title starting with: Great
Jul 31, 2021 3:30:56 PM
com.tp.bookstore.BookeStoreServerClientStreaming$BookStoreImp
l$1 onNext
INFO: Found book, adding to cart:....
Jul 31, 2021 3:30:58 PM
com.tp.bookstore.BookeStoreServerClientStreaming$BookStoreImp
l$1 onError
INFO: Error while reading book stream:
io.grpc.StatusRuntimeException: CANCELLED: client cancelled

因此,正如我们所看到的,客户端启动了对其向服务器发出的请求的取消操作。服务器也收到了关于取消的通知。

广告