gRPC - 发送/接收元数据



gRPC 支持发送元数据。元数据基本上是我们想要发送的一组数据,它不是业务逻辑的一部分,但在进行 gRPC 调用时会用到。

让我们来看以下两种情况:

  • 客户端发送元数据,服务器读取它。
  • 服务器发送元数据,客户端读取它。

我们将逐一介绍这两种情况。

客户端发送元数据

如前所述,gRPC 支持客户端发送服务器可以读取的元数据。gRPC 支持扩展客户端和服务器拦截器,分别用于写入和读取元数据。让我们通过一个例子来更好地理解它。这是我们的客户端代码,它发送主机名作为元数据:

让我们通过一个例子来更好地理解它。这是我们的客户端代码,它发送主机名作为元数据:

示例

package com.tp.bookstore;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;

public class BookStoreClientUnaryBlockingMetadata {
   private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlockingMetadata.class.getName());
   private final BookStoreGrpc.BookStoreBlockingStub blockingStub;
   public BookStoreClientUnaryBlockingMetadata(Channel channel) {
      blockingStub = BookStoreGrpc.newBlockingStub(channel);
   }
   static class BookClientInterceptor implements ClientInterceptor {
      @Override
      public <ReqT, RespT> ClientCall<ReqT, RespT>
      interceptCall(
         MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next
      ) {
         return new 
         ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
            @Override
            public void start(Listener<RespT> responseListener, Metadata headers) {
               logger.info("Added metadata");
               headers.put(Metadata.Key.of("HOSTNAME", ASCII_STRING_MARSHALLER), "MY_HOST");
               super.start(responseListener, headers);
            }
         };
      }
   }
   public void getBook(String bookName) {
      logger.info("Querying for book with title: " + bookName); 
      BookSearch request = BookSearch.newBuilder().setName(bookName).build(); 
      Book response; 
      CallOptions.Key<String> metaDataKey = CallOptions.Key.create("my_key");
      try {
         response = blockingStub.withOption(metaDataKey, "bar").first(request);
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
         return;
      }
      logger.info("Got following book from server: " + response);
   } 
   public static void main(String[] args) throws Exception {
      String bookName = args[0];
      String serverAddress = "localhost:50051";
      ManagedChannel channel =  ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().intercept(new BookClientInterceptor()).build();
      try {
         BookStoreClientUnaryBlockingMetadata client = new BookStoreClientUnaryBlockingMetadata(channel);
         client.getBook(bookName);
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

这里有趣的部分是拦截器

static class BookClientInterceptor implements ClientInterceptor{
   @Override
   public <ReqT, RespT> ClientCall<ReqT, RespT>
   interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
      return new 
      ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
         @Override
         public void start(Listener<RespT>responseListener, Metadata headers) {
            logger.info("Added metadata");
            headers.put(Metadata.Key.of("HOSTNAME", ASCII_STRING_MARSHALLER), "MY_HOST");
            super.start(responseListener, headers);
         }
      };
   }
}

我们拦截客户端发出的任何调用,然后在进一步调用之前向其中添加主机名元数据。

服务器读取元数据

现在,让我们看看读取此元数据的服务器代码:

package com.tp.bookstore;

import io.grpc.CallOptions;
import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;

public class BookeStoreServerMetadata {
   private static final Logger logger = Logger.getLogger(BookeStoreServerMetadata.class.getName());
 
   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby").setAuthor("Scott Fitzgerald").setPrice(300).build());
   }
   private Server server;
   class BookServerInterceptor implements ServerInterceptor{
      @Override
      public <ReqT, RespT> Listener<ReqT> 
      interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
 
         logger.info("Recieved following metadata: " + headers);
         return next.startCall(call, headers);
      } 
   }
   private void start() throws IOException {
      int port = 50051; server = ServerBuilder.forPort(port).addService(new BookStoreImpl()).intercept(new BookServerInterceptor()).build().start();
      logger.info("Server started, listening on " + port);
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         public void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   public static void main(String[] args) throws IOException, InterruptedException {
      final BookeStoreServerMetadata greetServer = new BookeStoreServerMetadata();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
      @Override
      public void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
         logger.info("Searching for book with title: " + searchQuery.getName());
         List<String> matchingBookTitles = bookMap.keySet().stream().filter(title ->title.startsWith(searchQuery.getName().trim())).collect(Collectors.toList());
         Book foundBook = null;
         if(matchingBookTitles.size() > 0) {
            foundBook = bookMap.get(matchingBookTitles.get(0));
         }
         responseObserver.onNext(foundBook);
         responseObserver.onCompleted();
      }
   }
}

同样,这里有趣的部分是拦截器

class BookServerInterceptor implements ServerInterceptor{
   @Override
   public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,ServerCallHandler<ReqT, RespT> next) {
      logger.info("Recieved following metadata: " + headers);
      return next.startCall(call, headers);
   }
}

我们拦截进入服务器的任何调用,然后在实际方法处理调用之前记录元数据。

客户端-服务器调用

现在让我们看看它的实际运行情况。要运行代码,请启动两个终端。在第一个终端上启动服务器,执行以下命令:

java -cp .\target\grpc-point-1.0.jar 
com.tp.bookstore.BookeStoreServerMetadata

我们将看到以下输出:

输出

Jul 31, 2021 5:29:14 PM 
com.tp.bookstore.BookeStoreServerMetadata start
INFO: Server started, listening on 50051

上述输出表明服务器已启动。

现在,让我们启动客户端。

java -cp .\target\grpc-point-1.0.jar 
com.tp.bookstore.BookStoreClientUnaryBlockingMetadata Great

我们将看到以下输出:

输出

Jul 31, 2021 5:29:39 PM 
com.tp.bookstore.BookStoreClientUnaryBlockingMetadata getBook
INFO: Querying for book with title: Great
Jul 31, 2021 5:29:39 PM 
com.tp.bookstore.BookStoreClientUnaryBlockingMetadata$BookCli
entInterceptor$1 start
INFO: Added metadata
Jul 31, 2021 5:29:40 PM 
com.tp.bookstore.BookStoreClientUnaryBlockingMetadata getBook
INFO: Got following book from server: name: "Great Gatsby"
author: "Scott Fitzgerald"
price: 300

服务器日志中将包含以下数据:

Jul 31, 2021 5:29:40 PM 
com.tp.bookstore.BookeStoreServerMetadata$BookServerIntercept
or interceptCall
INFO: Recieved following metadata:
Metadata(content-type=application/grpc,user-agent=grpc-java-netty/1.38.0,hostname=MY_HOST,grpc-accept-encoding=gzip)
Jul 31, 2021 5:29:40 PM 
com.tp.bookstore.BookeStoreServerMetadata$BookStoreImpl first
INFO: Searching for book with title: Great

正如我们所看到的,服务器能够读取客户端添加的元数据:hostname=MY_HOST。

广告