gRPC - 一元gRPC
现在我们将了解gRPC框架支持的各种通信类型。我们将以书店为例,客户可以在其中搜索书籍并下单送货。
让我们看看一元gRPC通信,我们让客户端搜索标题,并返回与查询标题匹配的其中一本随机书籍。
.proto 文件
首先,让我们在common_proto_files中定义bookstore.proto文件:
syntax = "proto3";
option java_package = "com.tp.bookstore";
service BookStore {
rpc first (BookSearch) returns (Book) {}
}
message BookSearch {
string name = 1;
string author = 2;
string genre = 3;
}
message Book {
string name = 1;
string author = 2;
int32 price = 3;
}
现在让我们仔细看看上面代码块中的每一行。
syntax = "proto3";
语法
这里的“syntax”表示我们使用的Protobuf版本。我们使用最新的版本3,因此该模式可以使用所有对版本3有效的语法。
package tutorial;
这里的package用于冲突解决,例如,如果我们有多个同名的类/成员。
option java_package = "com.tp.bookstore";
此参数特定于Java,即从.proto文件自动生成代码的包。
service BookStore {
rpc first (BookSearch) returns (Book) {}
}
这表示服务的名称"BookStore"和可以调用的函数名"first"。"first"函数接收类型为"BookSearch"的输入,并返回类型为"Book"的输出。因此,实际上,我们让客户端搜索标题,并返回与查询标题匹配的其中一本书籍。
现在让我们看看这些类型。
message BookSearch {
string name = 1;
string author = 2;
string genre = 3;
}
在上面的代码块中,我们定义了BookSearch,它包含名称、作者和类型等属性。客户端应该向服务器发送类型为“BookSearch”的对象。
message Book {
string name = 1;
string author = 2;
int32 price = 3;
}
在这里,我们还定义了,给定一个“BookSearch”,服务器将返回包含书籍属性以及书籍价格的“Book”。服务器应该向客户端发送类型为“Book”的对象。
请注意,我们已经完成了Maven设置,用于自动生成我们的类文件以及我们的RPC代码。因此,现在我们可以简单地编译我们的项目:
mvn clean install
这应该会自动生成我们使用gRPC所需的源代码。源代码将放在:
Protobuf class code: target/generated-sources/protobuf/java/com.tp.bookstore Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.bookstore
设置gRPC服务器
现在我们已经定义了包含函数定义的proto文件,让我们设置一个可以调用这些函数的服务器。
让我们编写我们的服务器代码来服务上述函数,并将其保存在com.tp.bookstore.BookeStoreServerUnary.java中:
示例
package com.tp.bookstore;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
public class BookeStoreServerUnary {
private static final Logger logger = Logger.getLogger(BookeStoreServerUnary.class.getName());
static Map<String, Book> bookMap = new HashMap<>();
static {
bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
.setAuthor("Scott Fitzgerald")
.setPrice(300).build());
bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
.setAuthor("Harper Lee")
.setPrice(400).build());
bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
.setAuthor("E.M.Forster")
.setPrice(500).build());
bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
.setAuthor("Scott Fitzgerald")
.setPrice(600).build());
bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman")
.setAuthor("Harper Lee")
.setPrice(700).build());
}
private Server server;
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new BookStoreImpl()).build().start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
public static void main(String[] args) throws IOException, InterruptedException {
final BookeStoreServerUnary greetServer = new BookeStoreServerUnary();
greetServer.start();
greetServer.server.awaitTermination();
}
static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
@Override
public void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
logger.info("Searching for book with title: " + searchQuery.getName());
List<String> matchingBookTitles = bookMap.keySet().stream().filter(title ->
title.startsWith(searchQuery.getName().trim())).collect(Collectors.toList());
Book foundBook = null;
if(matchingBookTitles.size() > 0) {
foundBook = bookMap.get(matchingBookTitles.get(0));
}
responseObserver.onNext(foundBook);
responseObserver.onCompleted();
}
}
}
上面的代码在指定的端口启动一个gRPC服务器,并服务于我们在proto文件中编写的函数和服务。让我们一起浏览上面的代码:
从main方法开始,我们在指定的端口创建一个gRPC服务器。
但在启动服务器之前,我们将服务器分配给我们要运行的服务,即在我们的例子中是BookStore服务。
为此,我们需要将服务实例传递给服务器,因此我们继续创建一个服务实例,即在我们的例子中是BookStoreImpl
服务实例需要提供.proto文件中存在的method/function的实现,即在我们的例子中是first方法。
该方法期望一个在.proto文件中定义的类型的对象,即对于我们来说是BookSearch
该方法搜索可用bookMap中的书籍,然后通过调用onNext()方法返回Book。完成后,服务器通过调用onCompleted()来宣布它已完成输出。
最后,我们还有一个关闭钩子,以确保在完成代码执行时服务器能够干净地关闭。
设置gRPC客户端
现在我们已经编写了服务器的代码,让我们设置一个可以调用这些函数的客户端。
让我们编写我们的客户端代码来调用上述函数,并将其保存在com.tp.bookstore.BookStoreClientUnaryBlocking.java中:
示例
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;
public class BookStoreClientUnaryBlocking {
private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlocking.class.getName());
private final BookStoreGrpc.BookStoreBlockingStub blockingStub;
public BookStoreClientUnaryBlocking(Channel channel) {
blockingStub = BookStoreGrpc.newBlockingStub(channel);
}
public void getBook(String bookName) {
logger.info("Querying for book with title: " + bookName);
BookSearch request = BookSearch.newBuilder().setName(bookName).build();
Book response;
try {
response = blockingStub.first(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
logger.info("Got following book from server: " + response);
}
public static void main(String[] args) throws Exception {
String bookName = args[0];
String serverAddress = "localhost:50051";
ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
.usePlaintext()
.build();
try {
BookStoreClientUnaryBlocking client = new
BookStoreClientUnaryBlocking(channel);
client.getBook(bookName);
} finally {
channel.shutdownNow().awaitTermination(5,
TimeUnit.SECONDS);
}
}
}
上面的代码在指定的端口启动一个gRPC服务器,并服务于我们在proto文件中编写的函数和服务。让我们一起浏览上面的代码:
从main方法开始,我们接受一个参数,即我们要搜索的书籍的标题。
我们为与服务器的gRPC通信设置一个Channel。
然后,我们使用channel创建一个阻塞存根。在这里,我们选择要调用其函数的服务“BookStore”。“存根”只不过是一个包装器,它隐藏了远程调用对调用者的复杂性。
然后,我们只需创建在.proto文件中定义的预期输入,即在我们的例子中是BookSearch,并添加我们想要服务器搜索的标题名称。
我们最终进行调用并等待来自服务器的结果。
最后,我们关闭channel以避免任何资源泄漏。
所以,这就是我们的客户端代码。
客户端服务器调用
总而言之,我们要做的是:
启动gRPC服务器。
客户端查询服务器以获取具有给定名称/标题的书籍。
服务器在其商店中搜索书籍。
服务器然后返回书籍及其其他属性。
现在,我们已经定义了我们的proto文件,编写了我们的服务器和客户端代码,让我们继续执行这段代码,看看实际效果。
要运行代码,启动两个shell。通过执行以下命令在第一个shell中启动服务器:
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerUnary
我们将看到以下输出:
输出
Jul 03, 2021 7:21:58 PM com.tp.bookstore.BookeStoreServerUnary start INFO: Server started, listening on 50051
上述输出意味着服务器已启动。
现在,让我们启动客户端。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientUnaryBlocking "To Kill"
我们将看到以下输出:
输出
Jul 03, 2021 7:22:03 PM com.tp.bookstore.BookStoreClientUnaryBlocking getBook INFO: Querying for book with title: To Kill Jul 03, 2021 7:22:04 PM com.tp.bookstore.BookStoreClientUnaryBlocking getBook INFO: Got following book from server: name: "To Kill MockingBird" author: "Harper Lee" price: 400
因此,正如我们所看到的,客户端能够通过查询服务器来获取书籍详细信息。