gRPC - 超时 & 取消
gRPC 支持为请求分配超时时间。这是一种执行请求取消的方式。它有助于避免为客户端和服务器使用资源,因为这些资源对于客户端来说其结果将毫无用处。
请求超时
gRPC 支持为客户端和服务器指定超时时间。
客户端可以在运行时指定它希望在取消请求之前等待的时间。
服务器也可以在其端检查是否需要处理请求,或者客户端是否已放弃该请求。
让我们举一个例子,客户端期望在 2 秒内收到响应,但服务器需要更长时间。所以,这是我们的服务器代码。
示例
package com.tp.bookstore; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import java.util.stream.Collectors; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; public class BookeStoreServerUnaryTimeout { private static final Logger logger = Logger.getLogger(BookeStoreServerUnaryTimeout.class.getName()); static Map<String, Book> bookMap = new HashMap<>(); static { bookMap.put("Great Gatsby",Book.newBuilder().setName("Great Gatsby") .setAuthor("Scott Fitzgerald") .setPrice(300).build()); bookMap.put("To Kill MockingBird",Book.newBuilder().setName("To Kill MockingBird") .setAuthor("Harper Lee") .setPrice(400).build()); bookMap.put("Passage to India",Book.newBuilder().setName("Passage to India") .setAuthor("E.M.Forster") .setPrice(500).build()); bookMap.put("The Side of Paradise",Book.newBuilder().setName("The Side of Paradise") .setAuthor("Scott Fitzgerald") .setPrice(600).build()); bookMap.put("Go Set a Watchman",Book.newBuilder().setName("Go Set a Watchman") .setAuthor("Harper Lee") .setPrice(700).build()); } private Server server; private void start() throws IOException { int port = 50051; server = ServerBuilder.forPort(port) .addService(new BookStoreImpl()).build().start(); logger.info("Server started, listening on " + port); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { System.err.println("Shutting down gRPC server"); try { server.shutdown().awaitTermination(30,TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(System.err); } } }); } public static void main(String[] args) throws IOException,InterruptedException { final BookeStoreServerUnaryTimeout greetServer = new BookeStoreServerUnaryTimeout(); greetServer.start(); greetServer.server.awaitTermination(); } static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase { @Override public void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) { logger.info("Searching for book with title: " +searchQuery.getName()); logger.info("This may take more time..."); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } List<String> matchingBookTitles = bookMap.keySet().stream() .filter(title -> title.startsWith(searchQuery.getName().trim())) .collect(Collectors.toList()); Book foundBook = null; if(matchingBookTitles.size() > 0) { foundBook = bookMap.get(matchingBookTitles.get(0)); } responseObserver.onNext(foundBook); responseObserver.onCompleted(); } } }
在上面的代码中,服务器搜索客户端提供的标题对应的书籍。我们添加了一个虚拟休眠,以便我们可以看到请求超时。
这是我们的客户端代码
示例
package com.tp.bookstore; import io.grpc.Channel; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.StatusRuntimeException; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; import com.tp.greeting.GreeterGrpc; import com.tp.greeting.Greeting.ServerOutput; import com.tp.greeting.Greeting.ClientInput; public class BookStoreClientUnaryBlockingTimeout { private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlockingTimeout.class.getName()); private final BookStoreGrpc.BookStoreBlockingStubblockingStub; public BookStoreClientUnaryBlockingTimeout(Channel channel){ blockingStub = BookStoreGrpc.newBlockingStub(channel); } public void getBook(String bookName) { logger.info("Querying for book with title: " + bookName); BookSearch request =BookSearch.newBuilder().setName(bookName).build(); Book response; try { response = blockingStub.withDeadlineAfter(2,TimeUnit.SECONDS).first(request); } catch (StatusRuntimeException e) { logger.log(Level.WARNING, "RPC failed: {0}",e.getStatus()); return; } logger.info("Got following book from server: " +response); } public static void main(String[] args) throws Exception { String bookName = args[0]; String serverAddress = "localhost:50051"; ManagedChannel channel =ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().build(); try { BookStoreClientUnaryBlockingTimeout client = newBookStoreClientUnaryBlockingTimeout(channel); client.getBook(bookName); } finally { channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } } }
上面的代码使用标题调用服务器进行搜索。但更重要的是,它为 gRPC 调用提供了2 秒的超时时间。
现在让我们看看它的实际操作。要运行代码,请启动两个 shell。在第一个 shell 上启动服务器,执行以下命令:
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerUnaryTimeout
我们会看到以下输出:
输出
Jul 31, 2021 12:29:31 PM com.tp.bookstore.BookeStoreServerUnaryTimeout start INFO: Server started, listening on 50051
以上输出表明服务器已启动。
Jul 31, 2021 12:29:35 PM com.tp.bookstore.BookeStoreServerUnaryTimeout$BookStoreImpl first INFO: Searching for book with title: Great Jul 31, 2021 12:29:35 PM com.tp.bookstore.BookeStoreServerUnaryTimeout$BookStoreImpl first INFO: This may take more time...
现在,让我们启动客户端。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientUnaryBlockingTimeout Great
我们会得到以下输出:
输出
Jul 31, 2021 12:29:34 PM com.tp.bookstore.BookStoreClientUnaryBlockingTimeout getBook INFO: Querying for book with title: Great Jul 31, 2021 12:29:36 PM com.tp.bookstore.BookStoreClientUnaryBlockingTimeout getBook WARNING: RPC failed: Status{code=DEADLINE_EXCEEDED, description=deadline exceeded after 1.970455800s. [buffered_nanos=816522700, remote_addr=localhost/127.0.0.1:50051], cause=null}
因此,正如我们所看到的,客户端在 2 秒内没有收到响应,因此它取消了请求并将其称为超时,即DEADLINE_EXCEEDED
请求取消
gRPC 支持从客户端和服务器端取消请求。客户端可以在运行时指定它希望在取消请求之前等待的时间。服务器也可以在其端检查是否需要处理请求,或者客户端是否已放弃该请求。
让我们看一个客户端流式传输的例子,其中客户端调用取消。所以,这是我们的服务器代码
示例
package com.tp.bookstore; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import java.util.stream.Collectors; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; import com.tp.bookstore.BookStoreOuterClass.Cart; public class BookeStoreServerClientStreaming { private static final Logger logger = Logger.getLogger(BookeStoreServerClientStreaming.class.getName()); static Map<String, Book> bookMap = new HashMap<>(); static { bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby").setAuthor("Scott Fitzgerald").setPrice(300).build()); bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird").setAuthor("Harper Lee") .setPrice(400).build()); bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India").setAuthor("E.M.Forster") .setPrice(500).build()); bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise").setAuthor("Scott Fitzgerald") .setPrice(600).build()); bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman").setAuthor("Harper Lee") .setPrice(700).build()); } private Server server; private void start() throws IOException { int port = 50051; server = ServerBuilder.forPort(port).addService(new BookStoreImpl()).build().start(); logger.info("Server started, listening on " + port); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { System.err.println("Shutting down gRPC server"); try { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(System.err); } } }); } public static void main(String[] args) throws IOException,InterruptedException { final BookeStoreServerClientStreaming greetServer = newBookeStoreServerClientStreaming(); greetServer.start(); greetServer.server.awaitTermination(); } static class BookStoreImpl extendsBookStoreGrpc.BookStoreImplBase { @Override public StreamObserver<Book> totalCartValue(StreamObserver<Cart> responseObserver) { return new StreamObserver<Book>() { ArrayList<Book> bookCart = new ArrayList<Book>(); @Override public void onNext(Book book) { logger.info("Searching for book with titlestarting with: " + book.getName()); for (Entry<String, Book> bookEntry : bookMap.entrySet()) { if(bookEntry.getValue().getName().startsWith(book.getName())){ logger.info("Found book, adding to cart:...."); bookCart.add(bookEntry.getValue()); } } } @Override public void onError(Throwable t) { logger.info("Error while reading book stream:" + t); } @Override public void onCompleted() { int cartValue = 0; for (Book book : bookCart) { cartValue += book.getPrice(); } responseObserver.onNext(Cart.newBuilder().setPrice(cartValue).setBooks(bookCart.size()).build()); responseObserver.onCompleted(); } }; } } }
此服务器代码是一个客户端端流式传输的简单示例。服务器只是跟踪客户端想要的书籍,最后它提供订单的总购物车价值。
但是这里在请求取消方面没有什么特别的,因为这是客户端将调用的操作。所以,让我们看看客户端代码。
package com.tp.bookstore; import io.grpc.Channel; import io.grpc.Context; import io.grpc.Context.CancellableContext; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import com.tp.bookstore.BookStoreGrpc.BookStoreStub; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.Cart; public class BookStoreClientStreamingClientCancelation { private static final Logger logger = Logger.getLogger(BookStoreClientStreamingClientCancelation.class.getName()); private final BookStoreStub stub; StreamObserver<Book> streamClientSender; private CancellableContext withCancellation; public BookStoreClientStreamingClientCancelation(Channel channel) { stub = BookStoreGrpc.newStub(channel); } public StreamObserver<Cart> getServerResponseObserver(){ StreamObserver<Cart> observer = new StreamObserver>Cart<(){ @Override public void onNext(Cart cart) { logger.info("Order summary:" + "\nTotal number of Books: " + cart.getBooks() + "\nTotal Order Value: " + cart.getPrice()); } @Override public void onError(Throwable t) { logger.info("Error while reading response from Server: " + t); } @Override public void onCompleted() { } }; return observer; } public void addBook(String book) { logger.info("Adding book with title starting with: " + book); Book request = Book.newBuilder().setName(book).build(); if(streamClientSender == null) { withCancellation = Context.current().withCancellation(); streamClientSender = stub.totalCartValue(getServerResponseObserver()); } try { streamClientSender.onNext(request); } catch (StatusRuntimeException e) { logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); } } public void completeOrder() { logger.info("Done, waiting for server to create order summary..."); if(streamClientSender != null); streamClientSender.onCompleted(); } public void cancelOrder() { withCancellation.cancel(null); } public static void main(String[] args) throws Exception { String serverAddress = "localhost:50051"; ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().build(); try { BookStoreClientStreamingClientCancelation client = new BookStoreClientStreamingClientCancelation(channel); String bookName = ""; while(true) { System.out.println("Type book name to be added to the cart...."); bookName = System.console().readLine(); if(bookName.equals("EXIT")) { client.completeOrder(); break; } if(bookName.equals("CANCEL")) { client.cancelOrder(); break; } client.addBook(bookName); } } finally { channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } } }
因此,如果我们查看上面的代码,以下行定义了一个启用了取消功能的上下文。
withCancellation = Context.current().withCancellation();
这是用户键入 CANCEL 时将调用的方法。这将取消订单并让服务器知道。
public void cancelOrder() { withCancellation.cancel(null); }
现在让我们看看它的实际操作。要运行代码,请启动两个 shell。在第一个 shell 上启动服务器,执行以下命令。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerClientStreaming
我们会看到以下输出:
输出
Jul 31, 2021 3:29:58 PM com.tp.bookstore.BookeStoreServerClientStreaming start INFO: Server started, listening on 50051
以上输出表示服务器已启动。
现在,让我们启动客户端
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientStreamingClientCancelation
我们会得到以下输出:
输出
Type book name to be added to the cart.... Great Jul 31, 2021 3:30:55 PM com.tp.bookstore.BookStoreClientStreamingClientCancelation addBook INFO: Adding book with title starting with: Great Type book name to be added to the cart.... CANCEL Jul 31, 2021 3:30:58 PM com.tp.bookstore.BookStoreClientStreamingClientCancelation$1 onError INFO: Error while reading response from Server: io.grpc.StatusRuntimeException: UNAVAILABLE: Channel shutdownNow invoked
并且我们将在服务器日志中获得以下数据:
INFO: Searching for book with title starting with: Great Jul 31, 2021 3:30:56 PM com.tp.bookstore.BookeStoreServerClientStreaming$BookStoreImp l$1 onNext INFO: Found book, adding to cart:.... Jul 31, 2021 3:30:58 PM com.tp.bookstore.BookeStoreServerClientStreaming$BookStoreImp l$1 onError INFO: Error while reading book stream: io.grpc.StatusRuntimeException: CANCELLED: client cancelled
因此,正如我们所看到的,客户端启动了对其向服务器发出的请求的取消操作。服务器也收到了关于取消的通知。