- Protocol Buffers 教程
- Protocol Buffers - 首页
- Protocol Buffers - 简介
- Protocol Buffers - 基本应用
- Protocol Buffers - 结构
- Protocol Buffers - 消息
- Protocol Buffers - 字符串
- Protocol Buffers - 数字
- Protocol Buffers - 布尔值
- Protocol Buffers - 枚举
- Protocol Buffers - 重复字段
- Protocol Buffers - 映射
- Protocol Buffers - 嵌套类
- Protocol Buffers - 可选字段和默认值
- Protocol Buffers - 语言无关性
- Protocol Buffers - 复合数据类型
- Protocol Buffers - 命令行使用
- Protocol Buffers - 更新定义规则
- Protocol Buffers - 与 Kafka 集成
- Protocol Buffers - 在其他语言中的使用
- Protocol Buffers 有用资源
- Protocol Buffers - 快速指南
- Protocol Buffers - 有用资源
- Protocol Buffers - 讨论
Protocol Buffers - Kafka 集成
我们已经涵盖了相当多的 Protocol Buffers 及其数据类型的示例。在本章中,让我们再举一个例子,看看 Protocol Buffers 如何与 Kafka 使用的 Schema Registry 集成。让我们首先了解什么是“schema registry”。
Schema Registry
Kafka 是广泛使用的消息队列之一。它用于大规模应用发布-订阅模型。有关 Kafka 的更多信息,请访问此处 - https://tutorialspoint.com/apache_kafka/index.htm
然而,在基本层面上,Kafka **生产者**应该发送一条消息,即 Kafka **消费者**可以读取的信息片段。而这种消息的发送和消费是我们需要 schema 的地方。在大型组织中,有多个团队读取/写入 Kafka 主题时,尤其需要 schema。Kafka 提供了一种方法将此 schema 存储在 *schema registry* 中,然后在生产者/消费者创建/消费消息时创建/使用这些 schema。
维护 schema 有两个主要好处:
**兼容性** - 在大型组织中,必须确保生产消息的团队不会破坏消费这些消息的下游工具。Schema registry 确保更改向后兼容。
**高效编码** - 在每条消息中发送字段名称及其类型会占用空间和计算资源。使用 schema,我们不需要在每条消息中发送此信息。
Schema registry 支持 **Avro、Google Protocol Buffers** 和 **JSON** Schema 作为 schema 语言。这些语言中的 schema 可以存储在 schema registry 中。在本教程中,我们需要 Kafka 设置和 Schema registry 设置。
要安装 Kafka,您可以查看以下链接:
安装 Kafka 后,您可以通过更新 ** /etc/schema-registry/schema-registry.properties ** 文件来设置 Schema Registry。
# where should schema registry listen on listeners=http://0.0.0.0:8081 # Schema registry uses Kafka beneath it, so we need to tell where are the Kafka brokers available kafkastore.bootstrap.servers=PLAINTEXT://hostname:9092,SSL://hostname2:9092 Once done, you can then run: sudo systemctl start confluent-schema-registry
设置完成后,让我们开始将 Google Protocol Buffers 与 Schema Registry 一起使用。
使用 Protocol Buffers Schema 的 Kafka 生产者
让我们继续我们的 **theater** 示例。我们将使用以下 Protocol Buffers schema:
theater.proto
syntax = "proto3"; package theater; option java_package = "com.tutorialspoint.theater"; message Theater { string name = 1; string address = 2; int32 total_capcity = 3; int64 mobile = 4; float base_ticket_price = 5; bool drive_in = 6; enum PAYMENT_SYSTEM{ CASH = 0; CREDIT_CARD = 1; DEBIT_CARD = 2; APP = 3; } PAYMENT_SYSTEM payment = 7; repeated string snacks = 8; map<string, int32> movieTicketPrice = 9; }
现在,让我们创建一个简单的 Kafka **写入器**,它将以这种格式编码的消息写入 Kafka 主题。但为此,首先我们需要在 Maven POM 中添加一些依赖项:
Kafka 客户端,用于使用 Kafka 生产者和消费者
Kafka Protocol Buffers 序列化器,用于序列化和反序列化消息
**Slf4j** simple,确保我们获得 Kafka 的日志
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.5.0</version> </dependency> <!-- https://mvnrepository.com/artifact/io.confluent/kafka-protobuf-serializer --> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-protobuf-serializer</artifactId> <version>5.5.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.30</version> </dependency>
完成此操作后,让我们现在创建一个 Kafka **生产者**。此生产者将创建并发送一条消息,其中将包含 **theater** 对象。
KafkaProtbufProducer.java
package com.tutorialspoint.kafka; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import com.tutorialspoint.theater.TheaterOuterClass.Theater; import com.tutorialspoint.theater.TheaterOuterClass.Theater.PAYMENT_SYSTEM; public class KafkaProtbufProducer { public static void main(String[] args) throws Exception{ String topicName = "testy1"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("clientid", "foo"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "io.confluent.kafka.serializers.protobuf.KafkaProtocol BuffersSerializer"); props.put("schema.registry.url", "https://127.0.0.1:8081"); props.put("auto.register.schemas", "true"); Producer<String, Theater> producer = new KafkaProducer<>(props); producer.send(new ProducerRecord<String, Theater>(topicName, "SilverScreen", getTheater())).get(); System.out.println("Sent to Kafka: \n" + getTheater()); producer.flush(); producer.close(); } public static Theater getTheater() { List<String> snacks = new ArrayList<>(); snacks.add("Popcorn"); snacks.add("Coke"); snacks.add("Chips"); snacks.add("Soda"); Map<String, Integer> ticketPrice = new HashMap<>(); ticketPrice.put("Avengers Endgame", 700); ticketPrice.put("Captain America", 200); ticketPrice.put("Wonder Woman 1984", 400); Theater theater = Theater.newBuilder() .setName("Silver Screener") .setAddress("212, Maple Street, LA, California") .setDriveIn(true) .setTotalCapacity(320) .setMobile(98234567189L) .setBaseTicketPrice(22.45f) .setPayment(PAYMENT_SYSTEM.CREDIT_CARD) .putAllMovieTicketPrice(ticketPrice) .addAllSnacks(snacks) .build(); return theater; } }
以下列出了我们需要了解的一些要点:
我们需要将 Schema Registry URL 传递给生产者。
我们还需要传递正确的 Protocol Buffers 序列化器,该序列化器特定于 Schema Registry。
Schema registry 会在我们发送完成后自动存储 **theater** 对象的 schema。
最后,我们从自动生成的 Java 代码创建了一个 **theater** 对象,这就是我们将要发送的内容。
输出
让我们现在编译并执行代码:
mvn clean install ; java -cp .\target\protobuf-tutorial-1.0.jar com.tutorialspoint.kafka.KafkaProtbufProducer
我们将看到以下输出:
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.5.0 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 66563e712b0b9f84 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1621692205607 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: 7kwQVXjYSz--bE47MiXmjw
已发送到 Kafka
name: "Silver Screener" address: "212, Maple Street, LA, California" total_capacity: 320 mobile: 98234567189 base_ticket_price: 22.45 drive_in: true payment: CREDIT_CARD snacks: "Popcorn" snacks: "Coke" snacks: "Chips" snacks: "Soda" movieTicketPrice { key: "Avengers Endgame" value: 700 } movieTicketPrice { key: "Captain America" value: 200 } movieTicketPrice { key: "Wonder Woman 1984" value: 400 } [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
这意味着我们的消息已发送。
现在,让我们确认 schema 是否已存储在 Schema Registry 中。
curl -X GET https://127.0.0.1:8081/subjects | jq
显示的输出为 **"topicName" + "key/value"**
[ "testy1-value" ]
我们还可以查看 registry 存储的 schema:
curl -X GET https://127.0.0.1:8081/schemas/ids/1 | jq { "schemaType": "PROTOBUF", "schema": "syntax = \"proto3\";\npackage theater;\n\noption java_package = \"com.tutorialspoint.theater\";\n\nmessage Theater { \n string name = 1;\n string address = 2;\n int64 total_capacity = 3;\n int64 mobile = 4;\n float base_ticket_price = 5;\n bool drive_in = 6;\n .theater.Theater.PAYMENT_SYSTEM payment = 7;\n repeated string snacks = 8;\n repeated .theater.Theater.MovieTicketPriceEntry movieTicketPrice = 9;\n\n message MovieTicketPriceEntry {\n option map_entry = true;\n \n string key = 1;\n int32 value = 2;\n }\n enum PAYMENT_SYSTEM { \n CASH = 0;\n CREDIT_CARD = 1;\n DEBIT_CARD = 2;\n APP = 3;\n }\n }\n" }
使用 Protocol Buffers Schema 的 Kafka 消费者
让我们现在创建一个 Kafka **消费者**。此消费者将消费包含 **theater** 对象的消息。
KafkaProtbufConsumer.java
package com.tutorialspoint.kafka; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import com.tutorialspoint.theater.TheaterOuterClass.Theater; import com.tutorialspoint.theater.TheaterOuterClass.Theater.PAYMENT_SYSTEM; public class KafkaProtbufConsumer { public static void main(String[] args) throws Exception{ String topicName = "testy1"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("clientid", "foo"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "io.confluent.kafka.serializers.protobuf.KafkaProtocol BuffersSerializer"); props.put("schema.registry.url", "https://127.0.0.1:8081"); props.put("auto.register.schemas", "true"); Producer<String, Theater> producer = new KafkaProducer<>(props); producer.send(new ProducerRecord<String, Theater>(topicName, "SilverScreen", getTheater())).get(); System.out.println("Sent to Kafka: \n" + getTheater()); producer.flush(); producer.close(); } public static Theater getTheater() { List<String> snacks = new ArrayList<>(); snacks.add("Popcorn"); snacks.add("Coke"); snacks.add("Chips"); snacks.add("Soda"); Map<String, Integer> ticketPrice = new HashMap<>(); ticketPrice.put("Avengers Endgame", 700); ticketPrice.put("Captain America", 200); ticketPrice.put("Wonder Woman 1984", 400); Theater theater = Theater.newBuilder() .setName("Silver Screener") .setAddress("212, Maple Street, LA, California") .setDriveIn(true) .setTotalCapacity(320) .setMobile(98234567189L) .setBaseTicketPrice(22.45f) .setPayment(PAYMENT_SYSTEM.CREDIT_CARD) .putAllMovieTicketPrice(ticketPrice) .addAllSnacks(snacks) .build(); return theater; } }
以下列出了我们需要了解的一些要点:
我们需要将 Schema Registry URL 传递给消费者。
我们还需要传递正确的 Protocol Buffers 反序列化器,该反序列化器特定于 Schema Registry。
Schema Registry 会在我们消费完成后自动读取存储的 **theater** 对象的 schema。
最后,我们从自动生成的 Java 代码创建了一个 **theater** 对象,这就是我们将要发送的内容。
输出
让我们现在编译并执行代码:
mvn clean install ; java -cp .\target\protobuf-tutorial-1.0.jar com.tutorialspoint.kafka.KafkaProtbufConsumer offset = 0, key = SilverScreen, value = May 22, 2021 7:50:15 PM com.google.protobuf.TextFormat$Printer$MapEntryAdapter compareTo May 22, 2021 7:50:15 PM com.google.protobuf.TextFormat$Printer$MapEntryAdapter compareTo name: "Silver Screener" address: "212, Maple Street, LA, California" total_capacity: 320 mobile: 98234567189 base_ticket_price: 22.45 drive_in: true payment: CREDIT_CARD snacks: "Popcorn" snacks: "Coke" snacks: "Chips" snacks: "Soda" movieTicketPrice { key: "Captain America" value: 200 } movieTicketPrice { key: "Wonder Woman 1984" value: 400 } movieTicketPrice { key: "Avengers Endgame" value: 700 }
因此,正如我们所看到的,写入 Kafka 的消息被消费者正确地消费了。此外,Registry 存储了 schema,也可以通过 REST API 访问。