Apache Kafka - 简单生产者示例



让我们创建一个应用程序,使用 Java 客户端发布和消费消息。Kafka 生产者客户端包含以下 API。

KafkaProducer API

让我们在本节中了解 Kafka 生产者 API 中最重要的部分。KafkaProducer API 的核心部分是 KafkaProducer 类。KafkaProducer 类提供了一个选项,可以在其构造函数中连接 Kafka 代理,并使用以下方法。

  • KafkaProducer 类提供 send 方法以异步方式将消息发送到主题。send() 的签名如下

producer.send(new ProducerRecord<byte[],byte[]>(topic, 
partition, key1, value1) , callback);
  • ProducerRecord - 生产者管理一个等待发送的记录缓冲区。

  • Callback - 用户提供的回调函数,在服务器确认记录后执行(null 表示没有回调)。

  • KafkaProducer 类提供了一个 flush 方法,以确保所有先前发送的消息都已实际完成。flush 方法的语法如下 -

public void flush()
  • KafkaProducer 类提供 partitionFor 方法,该方法有助于获取给定主题的分区元数据。这可用于自定义分区。此方法的签名如下 -

public Map metrics()

它返回生产者维护的内部指标映射。

  • public void close() - KafkaProducer 类提供 close 方法,该方法会阻塞,直到所有先前发送的请求都已完成。

生产者 API

生产者 API 的核心部分是 Producer 类。Producer 类提供了一个选项,可以通过以下方法在其构造函数中连接 Kafka 代理。

生产者类

生产者类提供 send 方法来发送消息到单个或多个主题,使用以下签名。


public void send(KeyedMessaget<k,v> message) 
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);

生产者有两种类型:同步异步

相同的 API 配置也适用于 Sync 生产者。它们之间的区别在于同步生产者直接发送消息,而异步生产者在后台发送消息。当您需要更高的吞吐量时,建议使用异步生产者。在之前的版本(例如 0.8)中,异步生产者没有 send() 的回调来注册错误处理程序。这仅在当前 0.9 版本中可用。

public void close()

Producer 类提供close方法来关闭生产者池到所有 Kafka 代理的连接。

配置设置

为了更好地理解,生产者 API 的主要配置设置列在下表中 -

序号 配置设置及说明
1

client.id

标识生产者应用程序

2

producer.type

同步或异步

3

acks

acks 配置控制生产者请求被视为完成的标准。

4

retries

如果生产者请求失败,则自动重试特定次数。

5

bootstrap.servers

代理引导列表。

6

linger.ms

如果要减少请求数量,可以将 linger.ms 设置为大于某个值。

7

key.serializer

序列化器接口的键。

8

value.serializer

序列化器接口的值。

9

batch.size

缓冲区大小。

10

buffer.memory

控制生产者用于缓冲的可用内存总量。

ProducerRecord API

ProducerRecord 是发送到 Kafka 集群的键值对。ProducerRecord 类构造函数用于使用以下签名创建带有分区、键和值对的记录。

public ProducerRecord (string topic, int partition, k key, v value)
  • 主题 - 将附加到记录的用户定义主题名称。

  • 分区 - 分区数量

  • - 将包含在记录中的键。

  • - 记录内容
public ProducerRecord (string topic, k key, v value)

ProducerRecord 类构造函数用于创建带键值对且不带分区的记录。

  • 主题 - 创建一个主题来分配记录。

  • - 记录的键。

  • - 记录内容。

public ProducerRecord (string topic, v value)

ProducerRecord 类创建不带分区和键的记录。

  • 主题 - 创建一个主题。

  • - 记录内容。

ProducerRecord 类的方法列在下表中 -

序号 类方法及说明
1

public string topic()

将附加到记录的主题。

2

public K key()

将包含在记录中的键。如果没有此类键,则此处将返回 null。

3

public V value()

记录内容。

4

partition()

记录的分区数量

SimpleProducer 应用程序

在创建应用程序之前,首先启动 ZooKeeper 和 Kafka 代理,然后使用 create topic 命令在 Kafka 代理中创建您自己的主题。之后,创建一个名为 Sim-pleProducer.java 的 Java 类,并输入以下代码。

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer”
public class SimpleProducer {
   
   public static void main(String[] args) throws Exception{
      
      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      
      //Assign topicName to string variable
      String topicName = args[0].toString();
      
      // create instance for properties to access producer configs   
      Properties props = new Properties();
      
      //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");
      
      //Set acknowledgements for producer requests.      
      props.put("acks", “all");
      
      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);
      
      //Specify buffer size in config
      props.put("batch.size", 16384);
      
      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);
      
      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serialization.StringSerializer");
         
      props.put("value.serializer", 
         "org.apache.kafka.common.serialization.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);
            
      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();
   }
}

编译 - 可以使用以下命令编译应用程序。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

执行 - 可以使用以下命令执行应用程序。

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>

输出

Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10

简单消费者示例

到目前为止,我们创建了一个生产者来将消息发送到 Kafka 集群。现在让我们创建一个消费者来从 Kafka 集群消费消息。KafkaConsumer API 用于从 Kafka 集群消费消息。KafkaConsumer 类构造函数定义如下。

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

configs - 返回消费者配置的映射。

KafkaConsumer 类具有以下重要方法,列在下表中。

序号 方法及说明
1

public java.util.Set<TopicPar-tition> assignment()

获取消费者当前分配的分区集。

2

public string subscription()

订阅给定的主题列表以获取动态分配的分区。

3

public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener)

订阅给定的主题列表以获取动态分配的分区。

4

public void unsubscribe()

取消订阅给定分区列表中的主题。

5

public void sub-scribe(java.util.List<java.lang.String> topics)

订阅给定的主题列表以获取动态分配的分区。如果给定的主题列表为空,则将其视为与取消订阅相同。

6

public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener)

参数 pattern 指的是正则表达式格式的订阅模式,而 listener 参数从订阅模式获取通知。

7

public void as-sign(java.util.List<TopicParti-tion> partitions)

手动将分区列表分配给客户。

8

poll()

获取使用其中一个 subscribe/assign API 指定的主题或分区的数据。如果在轮询数据之前没有订阅主题,则将返回错误。

9

public void commitSync()

提交上次 poll() 返回的所有已订阅主题和分区的偏移量。相同的操作应用于 commitAsyn()。

10

public void seek(TopicPartition partition, long offset)

获取消费者将在下一个 poll() 方法中使用的当前偏移量值。

11

public void resume()

恢复已暂停的分区。

12

public void wakeup()

唤醒消费者。

ConsumerRecord API

ConsumerRecord API 用于接收来自 Kafka 集群的记录。此 API 包含主题名称、分区编号(从中接收记录)以及指向 Kafka 分区中记录的偏移量。ConsumerRecord 类用于创建具有特定主题名称、分区数量和 <键,值> 对的消费者记录。它具有以下签名。

public ConsumerRecord(string topic,int partition, long offset,K key, V value)
  • 主题 - 从 Kafka 集群接收的消费者记录的主题名称。

  • 分区 - 主题的分区。

  • - 记录的键,如果不存在键,则返回 null。

  • - 记录内容。

ConsumerRecords API

ConsumerRecords API 充当 ConsumerRecord 的容器。此 API 用于保存特定主题每个分区的 ConsumerRecord 列表。其构造函数定义如下。

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
  • TopicPartition - 返回特定主题的分区映射。

  • Records - 返回 ConsumerRecord 列表。

ConsumerRecords 类定义了以下方法。

序号 方法及说明
1

public int count()

所有主题的记录数量。

2

public Set partitions()

此记录集中包含数据的分区集(如果未返回数据,则该集合为空)。

3

public Iterator iterator()

迭代器使您能够遍历集合,获取或删除元素。

4

public List records()

获取给定分区的记录列表。

配置设置

消费者客户端 API 的主要配置设置列在下表中 -

序号 设置及说明
1

bootstrap.servers

代理引导列表。

2

group.id

将单个消费者分配到一个组。

3

enable.auto.commit

如果值为 true,则启用偏移量的自动提交,否则不提交。

4

auto.commit.interval.ms

返回更新的已消费偏移量写入 ZooKeeper 的频率。

5

session.timeout.ms

指示 Kafka 在放弃并继续消费消息之前,将等待 ZooKeeper 对请求(读取或写入)做出响应的毫秒数。

SimpleConsumer 应用程序

生产者应用程序的步骤在此保持不变。首先,启动您的 ZooKeeper 和 Kafka 代理。然后创建一个名为 SimpleConsumer 的 Java 类应用程序,名为 SimpleCon-sumer.java,并输入以下代码。

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", 
         "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serialization.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);
      
      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))
      
      //print the topic name
      System.out.println("Subscribed to topic " + topicName);
      int i = 0;
      
      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
         
         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n", 
            record.offset(), record.key(), record.value());
      }
   }
}

编译 - 可以使用以下命令编译应用程序。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

执行 - 可以使用以下命令执行应用程序

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>

输入 - 打开生产者 CLI 并向主题发送一些消息。您可以将示例输入设置为“Hello Consumer”。

输出 - 以下是输出。

Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer
广告