gRPC - 超时 & 取消
gRPC 支持为请求分配超时时间。这是一种执行请求取消的方式。它有助于避免为客户端和服务器使用资源,因为这些资源对于客户端来说其结果将毫无用处。
请求超时
gRPC 支持为客户端和服务器指定超时时间。
客户端可以在运行时指定它希望在取消请求之前等待的时间。
服务器也可以在其端检查是否需要处理请求,或者客户端是否已放弃该请求。
让我们举一个例子,客户端期望在 2 秒内收到响应,但服务器需要更长时间。所以,这是我们的服务器代码。
示例
package com.tp.bookstore;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
public class BookeStoreServerUnaryTimeout {
private static final Logger logger = Logger.getLogger(BookeStoreServerUnaryTimeout.class.getName());
static Map<String, Book> bookMap = new HashMap<>();
static {
bookMap.put("Great Gatsby",Book.newBuilder().setName("Great Gatsby")
.setAuthor("Scott Fitzgerald")
.setPrice(300).build());
bookMap.put("To Kill MockingBird",Book.newBuilder().setName("To Kill MockingBird")
.setAuthor("Harper Lee")
.setPrice(400).build());
bookMap.put("Passage to India",Book.newBuilder().setName("Passage to India")
.setAuthor("E.M.Forster")
.setPrice(500).build());
bookMap.put("The Side of Paradise",Book.newBuilder().setName("The Side of Paradise")
.setAuthor("Scott Fitzgerald")
.setPrice(600).build());
bookMap.put("Go Set a Watchman",Book.newBuilder().setName("Go Set a Watchman")
.setAuthor("Harper Lee")
.setPrice(700).build());
}
private Server server;
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new BookStoreImpl()).build().start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30,TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
public static void main(String[] args) throws IOException,InterruptedException {
final BookeStoreServerUnaryTimeout greetServer = new
BookeStoreServerUnaryTimeout();
greetServer.start();
greetServer.server.awaitTermination();
}
static class BookStoreImpl extends
BookStoreGrpc.BookStoreImplBase {
@Override
public void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
logger.info("Searching for book with title: " +searchQuery.getName());
logger.info("This may take more time...");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
List<String> matchingBookTitles = bookMap.keySet().stream()
.filter(title ->
title.startsWith(searchQuery.getName().trim()))
.collect(Collectors.toList());
Book foundBook = null;
if(matchingBookTitles.size() > 0) {
foundBook = bookMap.get(matchingBookTitles.get(0));
}
responseObserver.onNext(foundBook);
responseObserver.onCompleted();
}
}
}
在上面的代码中,服务器搜索客户端提供的标题对应的书籍。我们添加了一个虚拟休眠,以便我们可以看到请求超时。
这是我们的客户端代码
示例
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;
public class BookStoreClientUnaryBlockingTimeout {
private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlockingTimeout.class.getName());
private final BookStoreGrpc.BookStoreBlockingStubblockingStub;
public BookStoreClientUnaryBlockingTimeout(Channel channel){
blockingStub = BookStoreGrpc.newBlockingStub(channel);
}
public void getBook(String bookName) {
logger.info("Querying for book with title: " + bookName);
BookSearch request =BookSearch.newBuilder().setName(bookName).build();
Book response;
try {
response = blockingStub.withDeadlineAfter(2,TimeUnit.SECONDS).first(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}",e.getStatus());
return;
}
logger.info("Got following book from server: " +response);
}
public static void main(String[] args) throws Exception {
String bookName = args[0];
String serverAddress = "localhost:50051";
ManagedChannel channel =ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().build();
try {
BookStoreClientUnaryBlockingTimeout client = newBookStoreClientUnaryBlockingTimeout(channel);
client.getBook(bookName);
} finally {
channel.shutdownNow().awaitTermination(5,
TimeUnit.SECONDS);
}
}
}
上面的代码使用标题调用服务器进行搜索。但更重要的是,它为 gRPC 调用提供了2 秒的超时时间。
现在让我们看看它的实际操作。要运行代码,请启动两个 shell。在第一个 shell 上启动服务器,执行以下命令:
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerUnaryTimeout
我们会看到以下输出:
输出
Jul 31, 2021 12:29:31 PM com.tp.bookstore.BookeStoreServerUnaryTimeout start INFO: Server started, listening on 50051
以上输出表明服务器已启动。
Jul 31, 2021 12:29:35 PM com.tp.bookstore.BookeStoreServerUnaryTimeout$BookStoreImpl first INFO: Searching for book with title: Great Jul 31, 2021 12:29:35 PM com.tp.bookstore.BookeStoreServerUnaryTimeout$BookStoreImpl first INFO: This may take more time...
现在,让我们启动客户端。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientUnaryBlockingTimeout Great
我们会得到以下输出:
输出
Jul 31, 2021 12:29:34 PM
com.tp.bookstore.BookStoreClientUnaryBlockingTimeout getBook
INFO: Querying for book with title: Great
Jul 31, 2021 12:29:36 PM
com.tp.bookstore.BookStoreClientUnaryBlockingTimeout getBook
WARNING: RPC failed: Status{code=DEADLINE_EXCEEDED,
description=deadline exceeded after 1.970455800s.
[buffered_nanos=816522700,
remote_addr=localhost/127.0.0.1:50051], cause=null}
因此,正如我们所看到的,客户端在 2 秒内没有收到响应,因此它取消了请求并将其称为超时,即DEADLINE_EXCEEDED
请求取消
gRPC 支持从客户端和服务器端取消请求。客户端可以在运行时指定它希望在取消请求之前等待的时间。服务器也可以在其端检查是否需要处理请求,或者客户端是否已放弃该请求。
让我们看一个客户端流式传输的例子,其中客户端调用取消。所以,这是我们的服务器代码
示例
package com.tp.bookstore;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
public class BookeStoreServerClientStreaming {
private static final Logger logger = Logger.getLogger(BookeStoreServerClientStreaming.class.getName());
static Map<String, Book> bookMap = new HashMap<>();
static {
bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby").setAuthor("Scott Fitzgerald").setPrice(300).build());
bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird").setAuthor("Harper Lee")
.setPrice(400).build());
bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India").setAuthor("E.M.Forster")
.setPrice(500).build());
bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise").setAuthor("Scott Fitzgerald")
.setPrice(600).build());
bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman").setAuthor("Harper Lee")
.setPrice(700).build());
}
private Server server;
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port).addService(new BookStoreImpl()).build().start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
public static void main(String[] args) throws IOException,InterruptedException {
final BookeStoreServerClientStreaming greetServer = newBookeStoreServerClientStreaming();
greetServer.start();
greetServer.server.awaitTermination();
}
static class BookStoreImpl extendsBookStoreGrpc.BookStoreImplBase {
@Override
public StreamObserver<Book>
totalCartValue(StreamObserver<Cart> responseObserver) {
return new StreamObserver<Book>() {
ArrayList<Book> bookCart = new ArrayList<Book>();
@Override
public void onNext(Book book) {
logger.info("Searching for book with titlestarting with: " + book.getName());
for (Entry<String, Book> bookEntry : bookMap.entrySet()) {
if(bookEntry.getValue().getName().startsWith(book.getName())){
logger.info("Found book, adding to cart:....");
bookCart.add(bookEntry.getValue());
}
}
}
@Override
public void onError(Throwable t) {
logger.info("Error while reading book stream:" + t);
}
@Override
public void onCompleted() {
int cartValue = 0;
for (Book book : bookCart) {
cartValue += book.getPrice();
}
responseObserver.onNext(Cart.newBuilder().setPrice(cartValue).setBooks(bookCart.size()).build());
responseObserver.onCompleted();
}
};
}
}
}
此服务器代码是一个客户端端流式传输的简单示例。服务器只是跟踪客户端想要的书籍,最后它提供订单的总购物车价值。
但是这里在请求取消方面没有什么特别的,因为这是客户端将调用的操作。所以,让我们看看客户端代码。
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.Context;
import io.grpc.Context.CancellableContext;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreGrpc.BookStoreStub;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.Cart;
public class BookStoreClientStreamingClientCancelation {
private static final Logger logger = Logger.getLogger(BookStoreClientStreamingClientCancelation.class.getName());
private final BookStoreStub stub;
StreamObserver<Book> streamClientSender;
private CancellableContext withCancellation;
public BookStoreClientStreamingClientCancelation(Channel channel) {
stub = BookStoreGrpc.newStub(channel);
}
public StreamObserver<Cart> getServerResponseObserver(){
StreamObserver<Cart> observer = new StreamObserver>Cart<(){
@Override
public void onNext(Cart cart) {
logger.info("Order summary:"
+ "\nTotal number of Books: "
+ cart.getBooks()
+ "\nTotal Order Value: "
+ cart.getPrice());
}
@Override
public void onError(Throwable t) {
logger.info("Error while reading response from Server: " + t);
}
@Override
public void onCompleted() {
}
};
return observer;
}
public void addBook(String book) {
logger.info("Adding book with title starting with: " + book);
Book request = Book.newBuilder().setName(book).build();
if(streamClientSender == null) {
withCancellation = Context.current().withCancellation();
streamClientSender = stub.totalCartValue(getServerResponseObserver());
}
try {
streamClientSender.onNext(request);
}
catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
}
}
public void completeOrder() {
logger.info("Done, waiting for server to create order summary...");
if(streamClientSender != null);
streamClientSender.onCompleted();
}
public void cancelOrder() {
withCancellation.cancel(null);
}
public static void main(String[] args) throws Exception {
String serverAddress = "localhost:50051";
ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().build();
try {
BookStoreClientStreamingClientCancelation client = new BookStoreClientStreamingClientCancelation(channel); String bookName = "";
while(true) {
System.out.println("Type book name to be added to the cart....");
bookName = System.console().readLine();
if(bookName.equals("EXIT")) {
client.completeOrder();
break;
}
if(bookName.equals("CANCEL")) {
client.cancelOrder();
break;
}
client.addBook(bookName);
}
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
因此,如果我们查看上面的代码,以下行定义了一个启用了取消功能的上下文。
withCancellation = Context.current().withCancellation();
这是用户键入 CANCEL 时将调用的方法。这将取消订单并让服务器知道。
public void cancelOrder() {
withCancellation.cancel(null);
}
现在让我们看看它的实际操作。要运行代码,请启动两个 shell。在第一个 shell 上启动服务器,执行以下命令。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerClientStreaming
我们会看到以下输出:
输出
Jul 31, 2021 3:29:58 PM com.tp.bookstore.BookeStoreServerClientStreaming start INFO: Server started, listening on 50051
以上输出表示服务器已启动。
现在,让我们启动客户端
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientStreamingClientCancelation
我们会得到以下输出:
输出
Type book name to be added to the cart.... Great Jul 31, 2021 3:30:55 PM com.tp.bookstore.BookStoreClientStreamingClientCancelation addBook INFO: Adding book with title starting with: Great Type book name to be added to the cart.... CANCEL Jul 31, 2021 3:30:58 PM com.tp.bookstore.BookStoreClientStreamingClientCancelation$1 onError INFO: Error while reading response from Server: io.grpc.StatusRuntimeException: UNAVAILABLE: Channel shutdownNow invoked
并且我们将在服务器日志中获得以下数据:
INFO: Searching for book with title starting with: Great Jul 31, 2021 3:30:56 PM com.tp.bookstore.BookeStoreServerClientStreaming$BookStoreImp l$1 onNext INFO: Found book, adding to cart:.... Jul 31, 2021 3:30:58 PM com.tp.bookstore.BookeStoreServerClientStreaming$BookStoreImp l$1 onError INFO: Error while reading book stream: io.grpc.StatusRuntimeException: CANCELLED: client cancelled
因此,正如我们所看到的,客户端启动了对其向服务器发出的请求的取消操作。服务器也收到了关于取消的通知。