gRPC - 发送/接收元数据
gRPC 支持发送元数据。元数据基本上是我们想要发送的一组数据,它不是业务逻辑的一部分,但在进行 gRPC 调用时会用到。
让我们来看以下两种情况:
- 客户端发送元数据,服务器读取它。
- 服务器发送元数据,客户端读取它。
我们将逐一介绍这两种情况。
客户端发送元数据
如前所述,gRPC 支持客户端发送服务器可以读取的元数据。gRPC 支持扩展客户端和服务器拦截器,分别用于写入和读取元数据。让我们通过一个例子来更好地理解它。这是我们的客户端代码,它发送主机名作为元数据:
让我们通过一个例子来更好地理解它。这是我们的客户端代码,它发送主机名作为元数据:
示例
package com.tp.bookstore; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; import io.grpc.ForwardingClientCall; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.StatusRuntimeException; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import static io.grpc.Metadata.ASCII_STRING_MARSHALLER; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; public class BookStoreClientUnaryBlockingMetadata { private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlockingMetadata.class.getName()); private final BookStoreGrpc.BookStoreBlockingStub blockingStub; public BookStoreClientUnaryBlockingMetadata(Channel channel) { blockingStub = BookStoreGrpc.newBlockingStub(channel); } static class BookClientInterceptor implements ClientInterceptor { @Override public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next ) { return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) { @Override public void start(Listener<RespT> responseListener, Metadata headers) { logger.info("Added metadata"); headers.put(Metadata.Key.of("HOSTNAME", ASCII_STRING_MARSHALLER), "MY_HOST"); super.start(responseListener, headers); } }; } } public void getBook(String bookName) { logger.info("Querying for book with title: " + bookName); BookSearch request = BookSearch.newBuilder().setName(bookName).build(); Book response; CallOptions.Key<String> metaDataKey = CallOptions.Key.create("my_key"); try { response = blockingStub.withOption(metaDataKey, "bar").first(request); } catch (StatusRuntimeException e) { logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); return; } logger.info("Got following book from server: " + response); } public static void main(String[] args) throws Exception { String bookName = args[0]; String serverAddress = "localhost:50051"; ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().intercept(new BookClientInterceptor()).build(); try { BookStoreClientUnaryBlockingMetadata client = new BookStoreClientUnaryBlockingMetadata(channel); client.getBook(bookName); } finally { channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } } }
这里有趣的部分是拦截器。
static class BookClientInterceptor implements ClientInterceptor{ @Override public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) { @Override public void start(Listener<RespT>responseListener, Metadata headers) { logger.info("Added metadata"); headers.put(Metadata.Key.of("HOSTNAME", ASCII_STRING_MARSHALLER), "MY_HOST"); super.start(responseListener, headers); } }; } }
我们拦截客户端发出的任何调用,然后在进一步调用之前向其中添加主机名元数据。
服务器读取元数据
现在,让我们看看读取此元数据的服务器代码:
package com.tp.bookstore; import io.grpc.CallOptions; import io.grpc.Context; import io.grpc.Metadata; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.ServerCall; import io.grpc.ServerCall.Listener; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import java.util.stream.Collectors; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; public class BookeStoreServerMetadata { private static final Logger logger = Logger.getLogger(BookeStoreServerMetadata.class.getName()); static Map<String, Book> bookMap = new HashMap<>(); static { bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby").setAuthor("Scott Fitzgerald").setPrice(300).build()); } private Server server; class BookServerInterceptor implements ServerInterceptor{ @Override public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) { logger.info("Recieved following metadata: " + headers); return next.startCall(call, headers); } } private void start() throws IOException { int port = 50051; server = ServerBuilder.forPort(port).addService(new BookStoreImpl()).intercept(new BookServerInterceptor()).build().start(); logger.info("Server started, listening on " + port); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { System.err.println("Shutting down gRPC server"); try { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(System.err); } } }); } public static void main(String[] args) throws IOException, InterruptedException { final BookeStoreServerMetadata greetServer = new BookeStoreServerMetadata(); greetServer.start(); greetServer.server.awaitTermination(); } static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase { @Override public void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) { logger.info("Searching for book with title: " + searchQuery.getName()); List<String> matchingBookTitles = bookMap.keySet().stream().filter(title ->title.startsWith(searchQuery.getName().trim())).collect(Collectors.toList()); Book foundBook = null; if(matchingBookTitles.size() > 0) { foundBook = bookMap.get(matchingBookTitles.get(0)); } responseObserver.onNext(foundBook); responseObserver.onCompleted(); } } }
同样,这里有趣的部分是拦截器。
class BookServerInterceptor implements ServerInterceptor{ @Override public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,ServerCallHandler<ReqT, RespT> next) { logger.info("Recieved following metadata: " + headers); return next.startCall(call, headers); } }
我们拦截进入服务器的任何调用,然后在实际方法处理调用之前记录元数据。
客户端-服务器调用
现在让我们看看它的实际运行情况。要运行代码,请启动两个终端。在第一个终端上启动服务器,执行以下命令:
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerMetadata
我们将看到以下输出:
输出
Jul 31, 2021 5:29:14 PM com.tp.bookstore.BookeStoreServerMetadata start INFO: Server started, listening on 50051
上述输出表明服务器已启动。
现在,让我们启动客户端。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientUnaryBlockingMetadata Great
我们将看到以下输出:
输出
Jul 31, 2021 5:29:39 PM com.tp.bookstore.BookStoreClientUnaryBlockingMetadata getBook INFO: Querying for book with title: Great Jul 31, 2021 5:29:39 PM com.tp.bookstore.BookStoreClientUnaryBlockingMetadata$BookCli entInterceptor$1 start INFO: Added metadata Jul 31, 2021 5:29:40 PM com.tp.bookstore.BookStoreClientUnaryBlockingMetadata getBook INFO: Got following book from server: name: "Great Gatsby" author: "Scott Fitzgerald" price: 300
服务器日志中将包含以下数据:
Jul 31, 2021 5:29:40 PM com.tp.bookstore.BookeStoreServerMetadata$BookServerIntercept or interceptCall INFO: Recieved following metadata: Metadata(content-type=application/grpc,user-agent=grpc-java-netty/1.38.0,hostname=MY_HOST,grpc-accept-encoding=gzip) Jul 31, 2021 5:29:40 PM com.tp.bookstore.BookeStoreServerMetadata$BookStoreImpl first INFO: Searching for book with title: Great
正如我们所看到的,服务器能够读取客户端添加的元数据:hostname=MY_HOST。
广告