Apache Kafka 快速指南



Apache Kafka - 简介

在大数据中,使用了海量数据。关于数据,我们面临两个主要挑战。第一个挑战是如何收集大量数据,第二个挑战是如何分析收集到的数据。为了克服这些挑战,您需要一个消息系统。

Kafka专为分布式高吞吐量系统而设计。Kafka往往可以很好地替代更传统的消息代理。与其他消息系统相比,Kafka具有更高的吞吐量、内置分区、复制和固有的容错性,这使其非常适合大规模消息处理应用程序。

什么是消息系统?

消息系统负责将数据从一个应用程序传输到另一个应用程序,这样应用程序可以专注于数据,而不必担心如何共享数据。分布式消息传递基于可靠消息排队的概念。消息在客户端应用程序和消息系统之间异步排队。两种类型的消息模式可用——一种是点对点,另一种是发布-订阅(pub-sub)消息系统。大多数消息模式遵循**pub-sub**。

点对点消息系统

在点对点系统中,消息持久化在队列中。一个或多个消费者可以消费队列中的消息,但特定消息最多只能被一个消费者消费。一旦消费者读取队列中的消息,它就会从该队列中消失。此系统的典型示例是订单处理系统,其中每个订单将由一个订单处理器处理,但多个订单处理器也可以同时工作。下图描述了该结构。

point-to-point Messaging system

发布-订阅消息系统

在发布-订阅系统中,消息持久化在主题中。与点对点系统不同,消费者可以订阅一个或多个主题并消费该主题中的所有消息。在发布-订阅系统中,消息生产者称为发布者,消息消费者称为订阅者。一个现实世界的例子是卫星电视,它发布不同的频道,如体育、电影、音乐等,任何人都可以订阅他们自己的一组频道并在他们的订阅频道可用时接收它们。

Publish-Subscribe Messaging system

什么是Kafka?

Apache Kafka是一个分布式发布-订阅消息系统和一个强大的队列,它可以处理大量数据,并使您可以将消息从一个端点传递到另一个端点。Kafka适用于离线和在线消息消费。Kafka消息持久化到磁盘,并在集群内复制以防止数据丢失。Kafka构建在ZooKeeper同步服务之上。它与Apache Storm和Spark集成得很好,用于实时流数据分析。

优势

以下是Kafka的一些优势:

  • **可靠性** - Kafka是分布式的、分区的、复制的和容错的。

  • **可扩展性** - Kafka消息系统可以轻松扩展而无需停机。

  • **持久性** - Kafka使用“分布式提交日志”,这意味着消息尽可能快地持久化到磁盘,因此它是持久的。

  • **性能** - Kafka对发布和订阅消息都具有高吞吐量。即使存储了TB级消息,它也能保持稳定的性能。

Kafka非常快,保证零停机时间和零数据丢失。

用例

Kafka可以用于许多用例。其中一些列在下面:

  • **指标** - Kafka通常用于操作监控数据。这涉及聚合来自分布式应用程序的统计数据,以生成集中的操作数据馈送。

  • **日志聚合解决方案** - Kafka可用于跨组织收集来自多个服务的日志,并以标准格式将其提供给多个消费者。

  • **流处理** - Storm和Spark Streaming等流行框架从主题读取数据,处理它,并将处理后的数据写入新主题,在那里它可供用户和应用程序使用。Kafka强大的持久性在流处理中也非常有用。

对Kafka的需求

Kafka是一个统一的平台,用于处理所有实时数据馈送。Kafka支持低延迟消息传递,并在机器故障情况下保证容错性。它能够处理大量不同的消费者。Kafka非常快,每秒执行200万次写入。Kafka将所有数据持久化到磁盘,这意味着所有写入都进入操作系统的页面缓存(RAM)。这使得将数据从页面缓存传输到网络套接字非常高效。

Apache Kafka - 基础知识

在深入研究Kafka之前,您必须了解主要术语,例如主题、代理、生产者和消费者。下图说明了主要术语,表格详细描述了图表组件。

Fundamentals

在上图中,一个主题配置为三个分区。分区1有两个偏移量因子0和1。分区2有四个偏移量因子0、1、2和3。分区3有一个偏移量因子0。副本的ID与其托管它的服务器的ID相同。

假设,如果主题的复制因子设置为3,则Kafka将创建每个分区的3个相同副本并将它们放置在集群中以使其可用于所有操作。为了平衡集群中的负载,每个代理存储一个或多个这些分区。多个生产者和消费者可以同时发布和检索消息。

序号 组件和描述
1

主题

属于特定类别的一系列消息称为主题。数据存储在主题中。

主题被分成分区。对于每个主题,Kafka至少保留一个分区。每个这样的分区都包含按不变顺序排列的消息。分区实现为一组大小相等的段文件。

2

分区

主题可能有多个分区,因此它可以处理任意数量的数据。

3

分区偏移量

每个分区消息都有一个唯一的序列ID,称为“偏移量”。

4

分区的副本

副本只不过是分区的“备份”。副本永远不会读取或写入数据。它们用于防止数据丢失。

5

代理

  • 代理是负责维护已发布数据的简单系统。每个代理可能对每个主题有零个或多个分区。假设,如果一个主题中有N个分区和N个代理,则每个代理将有一个分区。

  • 假设如果一个主题中有N个分区,并且代理多于N个(n+m),则前N个代理将有一个分区,接下来的M个代理将不会为此特定主题拥有任何分区。

  • 假设如果一个主题中有N个分区,并且代理少于N个(n-m),则每个代理将拥有一个或多个分区在它们之间共享。由于代理之间负载分配不均,不推荐这种情况。

6

Kafka集群

Kafka拥有多个代理被称为Kafka集群。Kafka集群可以在不停机的情况下扩展。这些集群用于管理消息数据的持久性和复制。

7

生产者

生产者是向一个或多个Kafka主题发布消息的发布者。生产者将数据发送到Kafka代理。每次生产者向代理发布消息时,代理只是将消息附加到最后一个段文件。实际上,消息将附加到分区。生产者也可以将消息发送到他们选择的分区。

8

消费者

消费者从代理读取数据。消费者订阅一个或多个主题,并通过从代理拉取数据来使用已发布的消息。

9

领导者

“领导者”是负责给定分区的所有读写操作的节点。每个分区都有一个充当领导者的服务器。

10

追随者

遵循领导者指令的节点称为追随者。如果领导者失败,一个追随者将自动成为新的领导者。追随者充当普通消费者,拉取消息并更新其自己的数据存储。

Apache Kafka - 集群架构

请看下面的图示。它显示了Kafka的集群图。

Cluster Architecture

下表描述了上图中显示的每个组件。

序号 组件和描述
1

代理

Kafka集群通常包含多个代理以保持负载平衡。Kafka代理是无状态的,因此它们使用ZooKeeper来维护其集群状态。一个Kafka代理实例每秒可以处理数十万次读写操作,每个代理可以处理TB级消息而不会影响性能。Kafka代理领导者选举可以由ZooKeeper完成。

2

ZooKeeper

ZooKeeper用于管理和协调Kafka代理。ZooKeeper服务主要用于通知生产者和消费者Kafka系统中任何新代理的存在或代理的故障。根据Zookeeper收到的关于代理存在或故障的通知,生产者和消费者做出决策,并开始与其他代理协调其任务。

3

生产者

生产者将数据推送到代理。当启动新的代理时,所有生产者都会搜索它并自动向该新代理发送消息。Kafka生产者不会等待代理的确认,并尽快发送消息,直到代理可以处理。

4

消费者

由于Kafka代理是无状态的,这意味着消费者必须使用分区偏移量来维护已消费的消息数量。如果消费者确认特定的消息偏移量,则意味着消费者已消费所有先前消息。消费者向代理发出异步拉取请求,以准备要消费的字节缓冲区。消费者只需提供偏移量值即可将分区中的任何点倒带或跳过。消费者偏移量值由ZooKeeper通知。

Apache Kafka - 工作流程

目前为止,我们讨论了Kafka的核心概念。现在让我们来了解一下Kafka的工作流程。

Kafka简单来说就是多个主题的集合,每个主题又可以分成一个或多个分区。Kafka分区是一个线性排序的消息序列,每条消息都由其索引(称为偏移量)标识。Kafka集群中的所有数据都是各个分区的无交集并集。传入的消息写入分区的末尾,消息由消费者顺序读取。通过将消息复制到不同的代理来提供持久性。

Kafka以快速、可靠、持久、容错和零停机的方式提供发布-订阅和基于队列的消息系统。在这两种情况下,生产者只需将消息发送到主题,消费者可以根据需要选择任何一种消息系统。让我们在下一节中按照步骤来了解消费者如何选择他们想要的消息系统。

发布-订阅消息的工作流程

以下是发布-订阅消息的逐步工作流程:

  • 生产者定期向主题发送消息。

  • Kafka代理将所有消息存储在为该特定主题配置的分区中。它确保消息在分区之间平均分配。如果生产者发送两条消息,并且有两个分区,Kafka将把一条消息存储在第一个分区中,另一条消息存储在第二个分区中。

  • 消费者订阅特定主题。

  • 一旦消费者订阅了主题,Kafka将向消费者提供该主题的当前偏移量,并将其保存在ZooKeeper集群中。

  • 消费者将定期(例如100毫秒)向Kafka请求新消息。

  • 一旦Kafka从生产者接收到消息,它就会将这些消息转发给消费者。

  • 消费者将接收消息并处理它。

  • 消息处理完毕后,消费者将向Kafka代理发送确认。

  • Kafka收到确认后,它会将偏移量更改为新值并在ZooKeeper中更新它。由于偏移量保存在ZooKeeper中,即使服务器发生故障,消费者也可以正确读取下一条消息。

  • 上述流程将重复,直到消费者停止请求。

  • 消费者可以选择随时回溯/跳过到主题的所需偏移量并读取所有后续消息。

队列消息/消费者组的工作流程

在队列消息系统中,不是单个消费者,而是一组具有相同“组ID”的消费者将订阅主题。简单来说,使用相同“组ID”订阅主题的消费者被视为一个组,消息在它们之间共享。让我们检查一下这个系统的实际工作流程。

  • 生产者定期向主题发送消息。

  • Kafka将所有消息存储在为该特定主题配置的分区中,类似于之前的场景。

  • 单个消费者订阅特定主题,假设为“Topic-01”,其“组ID”为“Group-1”。

  • Kafka与消费者的交互方式与发布-订阅消息相同,直到新的消费者使用相同的“组ID”(“Group-1”)订阅相同的主题“Topic-01”。

  • 一旦新的消费者加入,Kafka就会切换到共享模式,并在两个消费者之间共享数据。这种共享将持续到消费者的数量达到为该特定主题配置的分区数量。

  • 一旦消费者的数量超过了分区的数量,新的消费者将不会收到任何进一步的消息,直到任何一个现有的消费者取消订阅。这种情况的出现是因为Kafka中的每个消费者至少会被分配一个分区,一旦所有分区都被分配给现有的消费者,新的消费者就必须等待。

  • 此功能也称为“消费者组”。同样,Kafka将以非常简单和高效的方式提供这两个系统的优点。

ZooKeeper的作用

Apache Kafka的一个关键依赖项是Apache ZooKeeper,它是一个分布式配置和同步服务。ZooKeeper充当Kafka代理和消费者之间的协调接口。Kafka服务器通过ZooKeeper集群共享信息。Kafka在ZooKeeper中存储基本元数据,例如关于主题、代理、消费者偏移量(队列读取器)等信息。

由于所有关键信息都存储在ZooKeeper中,并且它通常在其集群中复制此数据,因此Kafka代理/ZooKeeper的故障不会影响Kafka集群的状态。Kafka将在ZooKeeper重启后恢复状态。这为Kafka提供了零停机时间。Kafka代理之间的领导者选举也是在领导者故障时使用ZooKeeper完成的。

要了解更多关于ZooKeeper的信息,请参考 zookeeper

让我们在下一章继续学习如何在你的机器上安装Java、ZooKeeper和Kafka。

Apache Kafka - 安装步骤

以下是安装Java的步骤:

步骤1 - 验证Java安装

希望你现在已经在你机器上安装了Java,所以你只需要使用以下命令验证它。

$ java -version

如果Java已成功安装在你的机器上,你将看到已安装Java的版本。

步骤1.1 - 下载JDK

如果Java未下载,请访问以下链接下载最新版本的JDK并下载最新版本。

http://www.oracle.com/technetwork/java/javase/downloads/index.html

现在最新版本是JDK 8u 60,文件名为“jdk-8u60-linux-x64.tar.gz”。请将文件下载到你的机器上。

步骤1.2 - 解压文件

通常情况下,下载的文件存储在downloads文件夹中,请验证并使用以下命令解压tar安装包。

$ cd /go/to/download/path
$ tar -zxf jdk-8u60-linux-x64.gz

步骤1.3 - 移动到Opt目录

为了使所有用户都可以使用Java,请将解压后的Java内容移动到“usr/local/java”文件夹。

$ su
password: (type password of root user)
$ mkdir /opt/jdk
$ mv jdk-1.8.0_60 /opt/jdk/

步骤1.4 - 设置路径

要设置路径和JAVA_HOME变量,请将以下命令添加到~/.bashrc文件中。

export JAVA_HOME =/usr/jdk/jdk-1.8.0_60
export PATH=$PATH:$JAVA_HOME/bin

现在将所有更改应用到当前运行的系统。

$ source ~/.bashrc

步骤1.5 - Java Alternatives

使用以下命令更改Java Alternatives。

update-alternatives --install /usr/bin/java java /opt/jdk/jdk1.8.0_60/bin/java 100

步骤1.6 − 现在使用步骤1中解释的验证命令(java -version)验证java。

步骤2 - ZooKeeper框架安装

步骤2.1 - 下载ZooKeeper

要在你的机器上安装ZooKeeper框架,请访问以下链接并下载最新版本的ZooKeeper。

https://zookeeper.net.cn/releases.html

目前,ZooKeeper的最新版本是3.4.6(ZooKeeper-3.4.6.tar.gz)。

步骤2.2 - 解压tar文件

使用以下命令解压tar文件

$ cd opt/
$ tar -zxf zookeeper-3.4.6.tar.gz
$ cd zookeeper-3.4.6
$ mkdir data

步骤2.3 - 创建配置文件

使用命令vi “conf/zoo.cfg”打开名为“conf/zoo.cfg”的配置文件,并将所有以下参数设置为起始点。

$ vi conf/zoo.cfg
tickTime=2000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2

配置文件保存成功并返回终端后,您可以启动ZooKeeper服务器。

步骤2.4 - 启动ZooKeeper服务器

$ bin/zkServer.sh start

执行此命令后,您将获得如下所示的响应:

$ JMX enabled by default
$ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg
$ Starting zookeeper ... STARTED

步骤2.5 - 启动CLI

$ bin/zkCli.sh

输入上述命令后,您将连接到ZooKeeper服务器,并将获得以下响应。

Connecting to localhost:2181
................
................
................
Welcome to ZooKeeper!
................
................
WATCHER::
WatchedEvent state:SyncConnected type: None path:null
[zk: localhost:2181(CONNECTED) 0]

步骤2.6 - 停止ZooKeeper服务器

连接服务器并执行所有操作后,您可以使用以下命令停止ZooKeeper服务器:

$ bin/zkServer.sh stop

现在你已经成功地在你的机器上安装了Java和ZooKeeper。让我们看看安装Apache Kafka的步骤。

步骤3 - Apache Kafka安装

让我们继续按照以下步骤在你的机器上安装Kafka。

步骤3.1 - 下载Kafka

要在你的机器上安装Kafka,请点击以下链接:

https://apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz

现在,最新版本,即kafka_2.11_0.9.0.0.tgz将下载到你的机器上。

步骤3.2 - 解压tar文件

使用以下命令解压tar文件:

$ cd opt/
$ tar -zxf kafka_2.11.0.9.0.0 tar.gz
$ cd kafka_2.11.0.9.0.0

现在你已经将Kafka的最新版本下载到你的机器上了。

步骤3.3 - 启动服务器

您可以通过执行以下命令启动服务器:

$ bin/kafka-server-start.sh config/server.properties

服务器启动后,您将在屏幕上看到以下响应:

$ bin/kafka-server-start.sh config/server.properties
[2016-01-02 15:37:30,410] INFO KafkaConfig values:
request.timeout.ms = 30000
log.roll.hours = 168
inter.broker.protocol.version = 0.9.0.X
log.preallocate = false
security.inter.broker.protocol = PLAINTEXT
…………………………………………….
…………………………………………….

步骤4 - 停止服务器

执行所有操作后,您可以使用以下命令停止服务器:

$ bin/kafka-server-stop.sh config/server.properties

既然我们已经讨论了Kafka的安装,我们可以在下一章学习如何在Kafka上执行基本操作。

Apache Kafka - 基本操作

首先让我们开始实现“单节点-单个代理”配置,然后我们将把我们的设置迁移到单节点-多个代理配置。

希望你现在已经在你的机器上安装了Java、ZooKeeper和Kafka。在迁移到Kafka集群设置之前,你需要先启动ZooKeeper,因为Kafka集群使用ZooKeeper。

启动ZooKeeper

打开一个新的终端并输入以下命令:

bin/zookeeper-server-start.sh config/zookeeper.properties

要启动Kafka代理,请键入以下命令:

bin/kafka-server-start.sh config/server.properties

启动Kafka代理后,在ZooKeeper终端上键入命令“jps”,你将看到以下响应:

821 QuorumPeerMain
928 Kafka
931 Jps

现在你可以看到终端上运行着两个守护进程,其中QuorumPeerMain是ZooKeeper守护进程,另一个是Kafka守护进程。

单节点-单个代理配置

在此配置中,你只有一个ZooKeeper和代理ID实例。以下是配置步骤:

创建Kafka主题 − Kafka提供了一个名为“kafka-topics.sh”的命令行实用程序来在服务器上创建主题。打开新的终端并键入以下示例。

语法

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 
--partitions 1 --topic topic-name

示例

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1   
--partitions 1 --topic Hello-Kafka

我们刚刚创建了一个名为“Hello-Kafka”的主题,它只有一个分区和一个副本因子。上面创建的输出将类似于以下输出:

输出 − 创建主题“Hello-Kafka”

主题创建后,你可以在Kafka代理终端窗口中收到通知,并在config/server.properties文件中指定的“/tmp/kafka-logs/”中看到已创建主题的日志。

主题列表

要获取Kafka服务器中的主题列表,可以使用以下命令:

语法

bin/kafka-topics.sh --list --zookeeper localhost:2181

输出

Hello-Kafka

由于我们已经创建了一个主题,它将只列出“Hello-Kafka”。假设你创建了多个主题,你将在输出中看到主题名称。

启动生产者发送消息

语法

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name

从上面的语法中,生产者命令行客户端需要两个主要参数:

代理列表 (Broker-list) − 我们要向其发送消息的代理列表。在本例中,我们只有一个代理。Config/server.properties 文件包含代理端口 ID,由于我们知道我们的代理正在 9092 端口监听,因此可以直接指定它。

主题名称 − 这是一个主题名称的示例。

示例

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka

生产者将等待来自标准输入 (stdin) 的输入,并发布到 Kafka 集群。默认情况下,每行新内容都会作为一条新消息发布,然后在 config/producer.properties 文件中指定默认生产者属性。现在,您可以在终端中输入几行消息,如下所示。

输出

$ bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Hello-Kafka[2016-01-16 13:50:45,931] 
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message

启动消费者接收消息

与生产者类似,默认消费者属性在 config/consumer.properties 文件中指定。打开一个新的终端,并键入以下语法来消费消息。

语法

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name 
--from-beginning

示例

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka 
--from-beginning

输出

Hello
My first message
My second message

最后,您可以从生产者的终端输入消息,并看到它们出现在消费者的终端中。目前为止,您已经对具有单个代理的单节点集群有了很好的理解。现在让我们继续讨论多个代理的配置。

单节点-多个代理配置

在继续进行多个代理集群设置之前,首先启动您的 ZooKeeper 服务器。

创建多个 Kafka 代理 − 我们已经在 config/server.properties 中已经有了一个 Kafka 代理实例。现在我们需要多个代理实例,因此将现有的 server.properties 文件复制到两个新的配置文件中,并将其重命名为 server-one.properties 和 server-two.properties。然后编辑这两个新文件并进行以下更改 −

config/server-one.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1

config/server-two.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2

启动多个代理 − 对三个服务器进行所有更改后,打开三个新的终端,一次启动一个代理。

Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties

现在我们有三台不同的代理在机器上运行。您可以自己尝试一下,通过在 ZooKeeper 终端键入 jps 来检查所有守护进程,然后您将看到响应。

创建主题

让我们为这个主题分配复制因子值为 3,因为我们有三个不同的代理正在运行。如果您有两个代理,则分配的副本值将为 2。

语法

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
-partitions 1 --topic topic-name

示例

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
-partitions 1 --topic Multibrokerapplication

输出

created topic “Multibrokerapplication”

Describe 命令用于检查哪个代理正在监听当前创建的主题,如下所示 −

bin/kafka-topics.sh --describe --zookeeper localhost:2181 
--topic Multibrokerappli-cation

输出

bin/kafka-topics.sh --describe --zookeeper localhost:2181 
--topic Multibrokerappli-cation

Topic:Multibrokerapplication    PartitionCount:1 
ReplicationFactor:3 Configs:
   
Topic:Multibrokerapplication Partition:0 Leader:0 
Replicas:0,2,1 Isr:0,2,1

从以上输出中,我们可以得出结论,第一行总结了所有分区,显示主题名称、分区计数和我们已经选择的复制因子。在第二行中,每个节点都将成为随机选择的一部分分区的领导者。

在我们的例子中,我们看到我们的第一个代理(broker.id 为 0)是领导者。然后 Replicas:0,2,1 表示所有代理最终都会复制主题,Isr同步副本的集合。好吧,这是当前处于活动状态并与领导者同步的副本的子集。

启动生产者发送消息

此过程与单代理设置中的过程相同。

示例

bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Multibrokerapplication

输出

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second message

启动消费者接收消息

此过程与单代理设置中所示的过程相同。

示例

bin/kafka-console-consumer.sh --zookeeper localhost:2181 
—topic Multibrokerapplica-tion --from-beginning

输出

bin/kafka-console-consumer.sh --zookeeper localhost:2181 
—topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second message

基本主题操作

本章将讨论各种基本主题操作。

修改主题

您已经了解了如何在 Kafka 集群中创建主题。现在让我们使用以下命令修改已创建的主题

语法

bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name 
--parti-tions count

示例

We have already created a topic “Hello-Kafka” with single partition count and one replica factor. 
Now using “alter” command we have changed the partition count.
bin/kafka-topics.sh --zookeeper localhost:2181 
--alter --topic Hello-kafka --parti-tions 2

输出

WARNING: If partitions are increased for a topic that has a key, 
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

删除主题

要删除主题,可以使用以下语法。

语法

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name

示例

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka

输出

> Topic Hello-kafka marked for deletion

注意 −如果 delete.topic.enable 未设置为 true,则此操作将不会产生任何影响。

Apache Kafka - 简单生产者示例

让我们创建一个应用程序,使用 Java 客户端发布和消费消息。Kafka 生产者客户端包含以下 API。

KafkaProducer API

让我们在本节中了解 Kafka 生产者 API 中最重要的一组 API。KafkaProducer API 的核心部分是 KafkaProducer 类。KafkaProducer 类提供了一个选项,可以在其构造函数中连接 Kafka 代理,并使用以下方法。

  • KafkaProducer 类提供 send 方法以异步方式将消息发送到主题。send() 的签名如下所示

producer.send(new ProducerRecord<byte[],byte[]>(topic, 
partition, key1, value1) , callback);
  • ProducerRecord − 生产者管理一个等待发送的记录缓冲区。

  • 回调 (Callback) − 用户提供的回调函数,在服务器确认记录后执行(null 表示没有回调)。

  • KafkaProducer 类提供 flush 方法来确保所有先前发送的消息都已实际完成。flush 方法的语法如下 −

public void flush()
  • KafkaProducer 类提供 partitionFor 方法,该方法有助于获取给定主题的分区元数据。这可用于自定义分区。此方法的签名如下 −

public Map metrics()

它返回生产者维护的内部指标映射。

  • public void close() − KafkaProducer 类提供 close 方法,该方法会阻塞直到所有先前发送的请求都已完成。

生产者 API

生产者 API 的核心部分是 Producer 类。生产者类提供了一个选项,可以通过以下方法在其构造函数中连接 Kafka 代理。

生产者类

生产者类提供 send 方法来发送消息到单个或多个主题,使用以下签名。

public void send(KeyedMessaget<k,v> message) 
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);

生产者有两种类型:同步 (Sync)异步 (Async)

相同的 API 配置也适用于 Sync 生产者。它们之间的区别在于同步生产者直接发送消息,而异步生产者在后台发送消息。当您需要更高的吞吐量时,首选异步生产者。在之前的版本(如 0.8)中,异步生产者没有 send() 的回调来注册错误处理程序。这仅在当前的 0.9 版本中可用。

public void close()

生产者类提供close方法来关闭生产者池与所有 Kafka 代理的连接。

配置设置

为了更好地理解,生产者 API 的主要配置设置列在下表中 −

序号 配置设置和说明
1

client.id

标识生产者应用程序

2

producer.type

同步或异步

3

acks

acks 配置控制生产者请求被认为完成的标准。

4

retries

如果生产者请求失败,则会使用特定值自动重试。

5

bootstrap.servers

代理的引导列表。

6

linger.ms

如果要减少请求数量,可以将 linger.ms 设置为大于某个值。

7

key.serializer

序列化器接口的键。

8

value.serializer

序列化器接口的值。

9

batch.size

缓冲区大小。

10

buffer.memory

控制生产者可用于缓冲的总内存量。

ProducerRecord API

ProducerRecord 是发送到 Kafka 集群的键值对。ProducerRecord 类构造函数用于使用以下签名创建带有分区、键和值对的记录。

public ProducerRecord (string topic, int partition, k key, v value)
  • 主题 (Topic) − 将附加到记录的用户定义主题名称。

  • 分区 (Partition) − 分区计数

  • 键 (Key) − 将包含在记录中的键。

  • 值 (Value) − 记录内容
public ProducerRecord (string topic, k key, v value)

ProducerRecord 类构造函数用于创建具有键值对且没有分区的记录。

  • 主题 (Topic) − 创建一个主题来分配记录。

  • 键 (Key) − 记录的键。

  • 值 (Value) − 记录内容。

public ProducerRecord (string topic, v value)

ProducerRecord 类创建没有分区和键的记录。

  • 主题 (Topic) − 创建一个主题。

  • 值 (Value) − 记录内容。

ProducerRecord 类的函数列在下表中 −

序号 类方法和说明
1

public string topic()

将附加到记录的主题。

2

public K key()

将包含在记录中的键。如果没有此类键,则此处将返回 null。

3

public V value()

记录内容。

4

partition()

记录的分区计数

SimpleProducer 应用程序

在创建应用程序之前,首先启动 ZooKeeper 和 Kafka 代理,然后使用 create topic 命令在 Kafka 代理中创建您自己的主题。之后,创建一个名为 SimpleProducer.java 的 Java 类,并输入以下代码。

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer”
public class SimpleProducer {
   
   public static void main(String[] args) throws Exception{
      
      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      
      //Assign topicName to string variable
      String topicName = args[0].toString();
      
      // create instance for properties to access producer configs   
      Properties props = new Properties();
      
      //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");
      
      //Set acknowledgements for producer requests.      
      props.put("acks", “all");
      
      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);
      
      //Specify buffer size in config
      props.put("batch.size", 16384);
      
      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);
      
      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serialization.StringSerializer");
         
      props.put("value.serializer", 
         "org.apache.kafka.common.serialization.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);
            
      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();
   }
}

编译 − 可以使用以下命令编译应用程序。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

执行 − 可以使用以下命令执行应用程序。

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>

输出

Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10

简单消费者示例

现在我们已经创建了一个生产者来向 Kafka 集群发送消息。现在让我们创建一个消费者来从 Kafka 集群消费消息。KafkaConsumer API 用于从 Kafka 集群消费消息。KafkaConsumer 类构造函数定义如下。

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

configs − 返回消费者配置的映射。

KafkaConsumer 类具有以下重要的函数,列在下表中。

序号 方法和说明
1

public java.util.Set<TopicPartition> assignment()

获取消费者当前分配的分区集合。

2

public string subscription()

订阅给定的主题列表以动态获取分配的分区。

3

public void subscribe(java.util.List<java.lang.String> topics, ConsumerRebalanceListener listener)

订阅给定的主题列表以动态获取分配的分区。

4

public void unsubscribe()

取消订阅给定分区列表中的主题。

5

public void subscribe(java.util.List<java.lang.String> topics)

订阅给定的主题列表以动态获取分配的分区。如果给定的主题列表为空,则其处理方式与 unsubscribe() 相同。

6

public void subscribe(java.util.regex.Pattern pattern, ConsumerRebalanceListener listener)

参数 pattern 指的是正则表达式格式的订阅模式,参数 listener 获取来自订阅模式的通知。

7

public void assign(java.util.List<TopicPartition> partitions)

手动将分区列表分配给消费者。

8

poll()

获取使用 subscribe/assign API 之一指定主题或分区的数据。如果在轮询数据之前没有订阅主题,则将返回错误。

9

public void commitSync()

为所有已订阅的主题和分区提交上次 poll() 返回的偏移量。相同的操作应用于 commitAsync()。

10

public void seek(TopicPartition partition, long offset)

获取消费者将在下一个 poll() 方法中使用的当前偏移量值。

11

public void resume()

恢复已暂停的分区。

12

public void wakeup()

唤醒消费者。

ConsumerRecord API

ConsumerRecord API 用于接收来自 Kafka 集群的记录。此 API 包含主题名称、分区编号(从中接收记录)以及指向 Kafka 分区中记录的偏移量。ConsumerRecord 类用于创建具有特定主题名称、分区计数和 对的消费者记录。它的签名如下所示。

public ConsumerRecord(string topic,int partition, long offset,K key, V value)
  • 主题 (Topic) − 从 Kafka 集群接收到的消费者记录的主题名称。

  • 分区 (Partition) − 主题的分区。

  • 键 (Key) − 记录的键,如果不存在键,则返回 null。

  • 值 (Value) − 记录内容。

ConsumerRecords API

ConsumerRecords API 充当 ConsumerRecord 的容器。此 API 用于保存特定主题每个分区的 ConsumerRecord 列表。其构造函数定义如下。

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
  • TopicPartition − 返回特定主题的分区映射。

  • Records − 返回 ConsumerRecord 列表。

ConsumerRecords 类定义了以下方法。

序号 方法和描述
1

public int count()

所有主题的记录数量。

2

public Set partitions()

此记录集中包含数据的分区集(如果未返回数据,则该集为空)。

3

public Iterator iterator()

迭代器使您可以循环遍历集合,获取或移除元素。

4

public List records()

获取给定分区的记录列表。

配置设置

Consumer 客户端 API 的配置设置,主要的配置设置如下所示:

序号 设置和描述
1

bootstrap.servers

引导程序代理列表。

2

group.id

将单个消费者分配给一个组。

3

enable.auto.commit

如果值为 true,则启用偏移量的自动提交,否则不提交。

4

auto.commit.interval.ms

返回多久将更新的已消费偏移量写入 ZooKeeper。

5

session.timeout.ms

指示 Kafka 在放弃并继续消费消息之前,将等待 ZooKeeper 多久才能响应请求(读取或写入)。

SimpleConsumer 应用

生产者应用程序的步骤在这里保持不变。首先,启动您的 ZooKeeper 和 Kafka 代理。然后创建一个名为 “SimpleConsumer.java” 的 java 类,并输入以下代码,创建 SimpleConsumer 应用程序。

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", 
         "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serialization.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);
      
      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))
      
      //print the topic name
      System.out.println("Subscribed to topic " + topicName);
      int i = 0;
      
      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
         
         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n", 
            record.offset(), record.key(), record.value());
      }
   }
}

编译 − 可以使用以下命令编译应用程序。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

执行 − 可以使用以下命令执行应用程序

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>

输入 − 打开生产者 CLI 并向主题发送一些消息。您可以将示例输入设置为“Hello Consumer”。

输出 − 输出如下所示。

Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer

Apache Kafka - 消费者组示例

消费者组是从 Kafka 主题进行多线程或多机器消费。

消费者组

  • 消费者可以使用相同的 group.id 加入组。

  • 组的最大并行度是组中消费者的数量 ← 分区的数量。

  • Kafka 将主题的分区分配给组中的消费者,以便组中的每个消费者只消费一个分区。

  • Kafka 保证一条消息只会被组中的一个消费者读取。

  • 消费者可以按他们在日志中存储的顺序查看消息。

消费者的重新平衡

添加更多进程/线程将导致 Kafka 重新平衡。如果任何消费者或代理未能向 ZooKeeper 发送心跳,则可以通过 Kafka 集群重新配置它。在此重新平衡期间,Kafka 将把可用的分区分配给可用的线程,可能会将分区移动到另一个进程。

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class ConsumerGroup {
   public static void main(String[] args) throws Exception {
      if(args.length < 2){
         System.out.println("Usage: consumer <topic> <groupname>");
         return;
      }
      
      String topic = args[0].toString();
      String group = args[1].toString();
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", group);
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer",          
         "org.apache.kafka.common.serialization.ByteArraySerializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serialization.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
      
      consumer.subscribe(Arrays.asList(topic));
      System.out.println("Subscribed to topic " + topic);
      int i = 0;
         
      while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
               System.out.printf("offset = %d, key = %s, value = %s\n", 
               record.offset(), record.key(), record.value());
      }     
   }  
}

编译

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java

执行

>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group

在这里,我们创建了一个名为 my-group 的示例组,其中包含两个消费者。类似地,您可以创建您的组和组中的消费者数量。

输入

打开生产者 CLI 并发送一些消息,例如:

Test consumer group 01
Test consumer group 02

第一个进程的输出

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01

第二个进程的输出

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02

现在,希望您已经了解了如何使用 Java 客户端演示来使用 SimpleConsumer 和 ConsumeGroup。现在您了解了如何使用 Java 客户端发送和接收消息。让我们在下一章继续 Kafka 与大数据技术的集成。

Apache Kafka - 与 Storm 集成

在本章中,我们将学习如何将 Kafka 与 Apache Storm 集成。

关于 Storm

Storm 最初由 BackType 的 Nathan Marz 和团队创建。在很短的时间内,Apache Storm 成为分布式实时处理系统的标准,允许您处理海量数据。Storm 非常快,基准测试显示其每个节点每秒处理超过一百万个元组。Apache Storm 持续运行,从配置的源(Spout)中消费数据,并将数据传递到处理管道(Bolt)。Spout 和 Bolt 组合构成一个 Topology。

与 Storm 集成

Kafka 和 Storm 自然地相互补充,它们强大的合作使您可以对快速移动的大数据进行实时流分析。Kafka 和 Storm 集成是为了使开发人员更容易地从 Storm topology 中提取和发布数据流。

概念流程

Spout 是流的来源。例如,Spout 可以从 Kafka 主题读取元组并将其作为流发出。Bolt 消费输入流,处理并可能发出新的流。Bolt 可以执行任何操作,从运行函数、过滤元组、进行流聚合、流连接、与数据库通信等等。Storm topology 中的每个节点都并行执行。topology 会无限期运行,直到您终止它。Storm 将自动重新分配任何失败的任务。此外,Storm 保证不会丢失数据,即使机器出现故障并且消息丢失。

让我们详细了解 Kafka-Storm 集成 API。将 Kafka 与 Storm 集成的三个主要类如下:

BrokerHosts - ZkHosts & StaticHosts

BrokerHosts 是一个接口,ZkHosts 和 StaticHosts 是其两个主要实现。ZkHosts 用于通过维护 ZooKeeper 中的详细信息来动态跟踪 Kafka 代理,而 StaticHosts 用于手动/静态设置 Kafka 代理及其详细信息。ZkHosts 是访问 Kafka 代理的简单快捷方法。

ZkHosts 的签名如下:

public ZkHosts(String brokerZkStr, String brokerZkPath)
public ZkHosts(String brokerZkStr)

其中 brokerZkStr 是 ZooKeeper 主机,brokerZkPath 是维护 Kafka 代理详细信息的 ZooKeeper 路径。

KafkaConfig API

此 API 用于定义 Kafka 集群的配置设置。Kafka Config 的签名定义如下

public KafkaConfig(BrokerHosts hosts, string topic)

    Hosts − BrokerHosts 可以是 ZkHosts/StaticHosts。

    Topic − 主题名称。

SpoutConfig API

Spoutconfig 是 KafkaConfig 的扩展,支持其他 ZooKeeper 信息。

public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)
  • Hosts − BrokerHosts 可以是 BrokerHosts 接口的任何实现

  • Topic − 主题名称。

  • zkRoot − ZooKeeper 根路径。

  • id − Spout 将其消费的偏移量的状态存储在 Zookeeper 中。id 应唯一标识您的 spout。

SchemeAsMultiScheme

SchemeAsMultiScheme 是一个接口,它决定了从 Kafka 消费的 ByteBuffer 如何转换为 storm 元组。它派生自 MultiScheme 并接受 Scheme 类的实现。Scheme 类有很多实现,其中一种实现是 StringScheme,它将字节解析为简单的字符串。它还控制输出字段的命名。签名定义如下。

public SchemeAsMultiScheme(Scheme scheme)
  • Scheme − 从 kafka 消费的字节缓冲区。

KafkaSpout API

KafkaSpout 是我们的 spout 实现,它将与 Storm 集成。它从 kafka 主题中获取消息,并将其作为元组发出到 Storm 生态系统中。KafkaSpout 从 SpoutConfig 获取其配置详细信息。

下面是创建简单的 Kafka spout 的示例代码。

// ZooKeeper connection string
BrokerHosts hosts = new ZkHosts(zkConnString);

//Creating SpoutConfig Object
SpoutConfig spoutConfig = new SpoutConfig(hosts, 
   topicName, "/" + topicName UUID.randomUUID().toString());

//convert the ByteBuffer to String.
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

//Assign SpoutConfig to KafkaSpout.
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

Bolt 创建

Bolt 是一个组件,它接收元组作为输入,处理元组,并产生新的元组作为输出。Bolt 将实现 IRichBolt 接口。在这个程序中,使用了两个 bolt 类 WordSplitter-Bolt 和 WordCounterBolt 来执行操作。

IRichBolt 接口具有以下方法:

  • Prepare − 为 bolt 提供执行环境。执行器将运行此方法来初始化 spout。

  • Execute − 处理单个输入元组。

  • Cleanup − 当 bolt 将要关闭时调用。

  • declareOutputFields − 声明元组的输出模式。

让我们创建 SplitBolt.java,它实现将句子拆分成单词的逻辑,以及 CountBolt.java,它实现分离唯一单词并计算其出现次数的逻辑。

SplitBolt.java

import java.util.Map;

import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class SplitBolt implements IRichBolt {
   private OutputCollector collector;
   
   @Override
   public void prepare(Map stormConf, TopologyContext context,
      OutputCollector collector) {
      this.collector = collector;
   }
   
   @Override
   public void execute(Tuple input) {
      String sentence = input.getString(0);
      String[] words = sentence.split(" ");
      
      for(String word: words) {
         word = word.trim();
         
         if(!word.isEmpty()) {
            word = word.toLowerCase();
            collector.emit(new Values(word));
         }
         
      }

      collector.ack(input);
   }
   
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }

   @Override
   public void cleanup() {}
   
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
   
}

CountBolt.java

import java.util.Map;
import java.util.HashMap;

import backtype.storm.tuple.Tuple;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class CountBolt implements IRichBolt{
   Map<String, Integer> counters;
   private OutputCollector collector;
   
   @Override
   public void prepare(Map stormConf, TopologyContext context,
   OutputCollector collector) {
      this.counters = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple input) {
      String str = input.getString(0);
      
      if(!counters.containsKey(str)){
         counters.put(str, 1);
      }else {
         Integer c = counters.get(str) +1;
         counters.put(str, c);
      }
   
      collector.ack(input);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counters.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
   
   }

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

提交到 Topology

Storm topology 基本上是一个 Thrift 结构。TopologyBuilder 类提供简单易用的方法来创建复杂的 topology。TopologyBuilder 类具有设置 spout (setSpout) 和设置 bolt (setBolt) 的方法。最后,TopologyBuilder 有 createTopology 来创建 topology。shuffleGrouping 和 fieldsGrouping 方法有助于为 spout 和 bolt 设置流分组。

本地集群 − 出于开发目的,我们可以使用 LocalCluster 对象创建一个本地集群,然后使用 LocalCluster 类的 submitTopology 方法提交 topology。

KafkaStormSample.java

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import backtype.storm.spout.SchemeAsMultiScheme;
import storm.kafka.trident.GlobalPartitionInformation;
import storm.kafka.ZkHosts;
import storm.kafka.Broker;
import storm.kafka.StaticHosts;
import storm.kafka.BrokerHosts;
import storm.kafka.SpoutConfig;
import storm.kafka.KafkaConfig;
import storm.kafka.KafkaSpout;
import storm.kafka.StringScheme;

public class KafkaStormSample {
   public static void main(String[] args) throws Exception{
      Config config = new Config();
      config.setDebug(true);
      config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
      String zkConnString = "localhost:2181";
      String topic = "my-first-topic";
      BrokerHosts hosts = new ZkHosts(zkConnString);
      
      SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic,    
         UUID.randomUUID().toString());
      kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4;
      kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4;
      kafkaSpoutConfig.forceFromStart = true;
      kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutCon-fig));
      builder.setBolt("word-spitter", new SplitBolt()).shuffleGroup-ing("kafka-spout");
      builder.setBolt("word-counter", new CountBolt()).shuffleGroup-ing("word-spitter");
         
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("KafkaStormSample", config, builder.create-Topology());

      Thread.sleep(10000);
      
      cluster.shutdown();
   }
}

在进行编译之前,Kakfa-Storm 集成需要 curator ZooKeeper 客户端 Java 库。Curator 2.9.1 版本支持 Apache Storm 0.9.5 版本(我们在本教程中使用)。下载下面指定的 jar 文件并将其放在 Java 类路径中。

  • curator-client-2.9.1.jar
  • curator-framework-2.9.1.jar

包含依赖文件后,使用以下命令编译程序:

javac -cp "/path/to/Kafka/apache-storm-0.9.5/lib/*" *.java

执行

启动 Kafka Producer CLI(在上一章中解释),创建一个名为 my-first-topic 的新主题,并提供一些示例消息,如下所示:

hello
kafka
storm
spark
test message
another test message

现在使用以下命令执行应用程序:

java -cp “/path/to/Kafka/apache-storm-0.9.5/lib/*”:. KafkaStormSample

此应用程序的示例输出如下所示:

storm : 1
test : 2
spark : 1
another : 1
kafka : 1
hello : 1
message : 2

Apache Kafka - 与 Spark 集成

在本章中,我们将讨论如何将 Apache Kafka 与 Spark Streaming API 集成。

关于 Spark

Spark Streaming API 能够对实时数据流进行可扩展、高吞吐量、容错的流处理。数据可以从许多来源(如 Kafka、Flume、Twitter 等)提取,并可以使用复杂的算法(例如 map、reduce、join 和 window 等高级函数)进行处理。最后,处理后的数据可以推送到文件系统、数据库和实时仪表板。弹性分布式数据集 (RDD) 是 Spark 的基本数据结构。它是一个不可变的分布式对象集合。RDD 中的每个数据集都细分为逻辑分区,这些分区可以在集群的不同节点上计算。

与 Spark 集成

Kafka 是 Spark 流式处理的潜在消息传递和集成平台。Kafka 充当实时数据流的中心枢纽,并使用 Spark Streaming 中的复杂算法进行处理。一旦数据被处理,Spark Streaming 就可以将结果发布到另一个 Kafka 主题,或者存储在 HDFS、数据库或仪表板中。下图描述了概念流程。

Integration with Spark

现在,让我们详细了解 Kafka-Spark API。

SparkConf API

它表示 Spark 应用程序的配置。用于将各种 Spark 参数设置为键值对。

SparkConf 类具有以下方法:

  • set(string key, string value) − 设置配置变量。

  • remove(string key) − 从配置中删除键。

  • setAppName(string name) − 为您的应用程序设置应用程序名称。

  • get(string key) − 获取键

StreamingContext API

这是 Spark 功能的主要入口点。SparkContext 表示与 Spark 集群的连接,可用于在集群上创建 RDD、累加器和广播变量。签名定义如下所示。

public StreamingContext(String master, String appName, Duration batchDuration, 
   String sparkHome, scala.collection.Seq<String> jars, 
   scala.collection.Map<String,String> environment)
  • master − 要连接到的集群 URL(例如 mesos://host:port、spark://host:port、local[4])。

  • appName − 作业的名称,显示在集群 Web UI 上

  • batchDuration − 流数据将被划分为批次的 时间间隔

public StreamingContext(SparkConf conf, Duration batchDuration)

通过提供新的 SparkContext 所需的配置来创建 StreamingContext。

  • conf − Spark 参数

  • batchDuration − 流数据将被划分为批次的 时间间隔

KafkaUtils API

KafkaUtils API 用于将 Kafka 集群连接到 Spark Streaming。此 API 包含一个重要的名为 createStream 的方法,其签名如下所示。

public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
   StreamingContext ssc, String zkQuorum, String groupId,
   scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)

上述方法用于创建一个输入流,该流从 Kafka Broker 拉取消息。

  • ssc − StreamingContext 对象。

  • zkQuorum − ZooKeeper 集群地址。

  • groupId − 此消费者的组 ID。

  • topics − 返回要消费的主题映射。

  • storageLevel − 用于存储接收到的对象的存储级别。

KafkaUtils API 还有另一个方法 createDirectStream,用于创建一个输入流,该流直接从 Kafka Broker 拉取消息,无需使用任何接收器。此流可以保证 Kafka 中的每条消息都只在转换中包含一次。

示例应用程序是用 Scala 编写的。要编译该应用程序,请下载并安装 sbt,这是一个 Scala 构建工具(类似于 Maven)。主要的应用程序代码如下所示。

import java.util.HashMap

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object KafkaWordCount {
   def main(args: Array[String]) {
      if (args.length < 4) {
         System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
         System.exit(1)
      }

      val Array(zkQuorum, group, topics, numThreads) = args
      val sparkConf = new SparkConf().setAppName("KafkaWordCount")
      val ssc = new StreamingContext(sparkConf, Seconds(2))
      ssc.checkpoint("checkpoint")

      val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
      val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1L))
         .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
      wordCounts.print()

      ssc.start()
      ssc.awaitTermination()
   }
}

构建脚本

spark-kafka 集成依赖于 spark、spark streaming 和 spark Kafka 集成 jar 包。创建一个名为 build.sbt 的新文件,并指定应用程序的详细信息及其依赖项。sbt 将在编译和打包应用程序时下载必要的 jar 包。

name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"

编译/打包

运行以下命令来编译和打包应用程序的 jar 文件。我们需要将 jar 文件提交到 Spark 控制台才能运行应用程序。

sbt package

提交到 Spark

启动 Kafka Producer CLI(在上一章中解释),创建一个名为 my-first-topic 的新主题,并提供一些示例消息,如下所示。

Another spark test message

运行以下命令将应用程序提交到 Spark 控制台。

/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>

此应用程序的示例输出如下所示。

spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..

实时应用(Twitter)

让我们分析一个实时应用程序,以获取最新的 Twitter 馈送及其主题标签。前面,我们已经看到了 Storm 和 Spark 与 Kafka 的集成。在这两种情况下,我们都创建了一个 Kafka Producer(使用 cli)来向 Kafka 生态系统发送消息。然后,Storm 和 Spark 集成使用 Kafka Consumer 读取消息,并分别将其注入到 Storm 和 Spark 生态系统中。因此,实际上我们需要创建一个 Kafka Producer,它应该:

  • 使用“Twitter Streaming API”读取 Twitter 馈送,
  • 处理馈送,
  • 提取主题标签,以及
  • 将其发送到 Kafka。

一旦 Kafka 接收到 主题标签,Storm/Spark 集成就会接收信息并将其发送到 Storm/Spark 生态系统。

Twitter Streaming API

可以使用任何编程语言访问“Twitter Streaming API”。“twitter4j”是一个开源的、非官方的 Java 库,它提供了一个基于 Java 的模块,可以轻松访问“Twitter Streaming API”。“twitter4j”提供了一个基于监听器的框架来访问推文。要访问“Twitter Streaming API”,我们需要注册 Twitter 开发者帐户,并获取以下OAuth身份验证详细信息。

  • 客户密钥 (Customer key)
  • 客户密钥密码 (Customer secret)
  • 访问令牌 (Access token)
  • 访问令牌密码 (Access token secret)

创建开发者帐户后,下载“twitter4j”jar 文件并将其放入 Java 类路径。

完整的 Twitter Kafka Producer 代码 (KafkaTwitterProducer.java) 如下所示:

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;

import twitter4j.*;
import twitter4j.conf.*;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaTwitterProducer {
   public static void main(String[] args) throws Exception {
      LinkedBlockingQueue<Status> queue = new LinkedBlockingQueue<Sta-tus>(1000);
      
      if(args.length < 5){
         System.out.println(
            "Usage: KafkaTwitterProducer <twitter-consumer-key>
            <twitter-consumer-secret> <twitter-access-token>
            <twitter-access-token-secret>
            <topic-name> <twitter-search-keywords>");
         return;
      }
      
      String consumerKey = args[0].toString();
      String consumerSecret = args[1].toString();
      String accessToken = args[2].toString();
      String accessTokenSecret = args[3].toString();
      String topicName = args[4].toString();
      String[] arguments = args.clone();
      String[] keyWords = Arrays.copyOfRange(arguments, 5, arguments.length);

      ConfigurationBuilder cb = new ConfigurationBuilder();
      cb.setDebugEnabled(true)
         .setOAuthConsumerKey(consumerKey)
         .setOAuthConsumerSecret(consumerSecret)
         .setOAuthAccessToken(accessToken)
         .setOAuthAccessTokenSecret(accessTokenSecret);

      TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).get-Instance();
      StatusListener listener = new StatusListener() {
        
         @Override
         public void onStatus(Status status) {      
            queue.offer(status);

            // System.out.println("@" + status.getUser().getScreenName() 
               + " - " + status.getText());
            // System.out.println("@" + status.getUser().getScreen-Name());

            /*for(URLEntity urle : status.getURLEntities()) {
               System.out.println(urle.getDisplayURL());
            }*/

            /*for(HashtagEntity hashtage : status.getHashtagEntities()) {
               System.out.println(hashtage.getText());
            }*/
         }
         
         @Override
         public void onDeletionNotice(StatusDeletionNotice statusDeletion-Notice) {
            // System.out.println("Got a status deletion notice id:" 
               + statusDeletionNotice.getStatusId());
         }
         
         @Override
         public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
            // System.out.println("Got track limitation notice:" + 
               num-berOfLimitedStatuses);
         }

         @Override
         public void onScrubGeo(long userId, long upToStatusId) {
            // System.out.println("Got scrub_geo event userId:" + userId + 
            "upToStatusId:" + upToStatusId);
         }      
         
         @Override
         public void onStallWarning(StallWarning warning) {
            // System.out.println("Got stall warning:" + warning);
         }
         
         @Override
         public void onException(Exception ex) {
            ex.printStackTrace();
         }
      };
      twitterStream.addListener(listener);
      
      FilterQuery query = new FilterQuery().track(keyWords);
      twitterStream.filter(query);

      Thread.sleep(5000);
      
      //Add Kafka producer config settings
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("acks", "all");
      props.put("retries", 0);
      props.put("batch.size", 16384);
      props.put("linger.ms", 1);
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", 
         "org.apache.kafka.common.serialization.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer<String, String>(props);
      int i = 0;
      int j = 0;
      
      while(i < 10) {
         Status ret = queue.poll();
         
         if (ret == null) {
            Thread.sleep(100);
            i++;
         }else {
            for(HashtagEntity hashtage : ret.getHashtagEntities()) {
               System.out.println("Hashtag: " + hashtage.getText());
               producer.send(new ProducerRecord<String, String>(
                  top-icName, Integer.toString(j++), hashtage.getText()));
            }
         }
      }
      producer.close();
      Thread.sleep(5000);
      twitterStream.shutdown();
   }
}

编译

使用以下命令编译应用程序:

javac -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:. KafkaTwitterProducer.java

执行

打开两个控制台。在一个控制台中,运行上面编译的应用程序,如下所示。

java -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:
. KafkaTwitterProducer <twitter-consumer-key>
<twitter-consumer-secret>
<twitter-access-token>
<twitter-ac-cess-token-secret>
my-first-topic food

在另一个窗口中运行上一章中解释的任何一个 Spark/Storm 应用程序。需要注意的主要一点是,在这两种情况下使用的主题应该相同。在这里,我们使用“my-first-topic”作为主题名称。

输出

此应用程序的输出将取决于关键字和 Twitter 的当前馈送。下面是一个示例输出(Storm 集成)。

. . .
food : 1
foodie : 2
burger : 1
. . .

Apache Kafka - 工具

打包在“org.apache.kafka.tools.*”下的 Kafka 工具。工具分为系统工具和复制工具。

系统工具

可以使用 run class 脚本从命令行运行系统工具。语法如下:

bin/kafka-run-class.sh package.class - - options

下面提到一些系统工具:

  • Kafka 迁移工具 − 此工具用于将代理从一个版本迁移到另一个版本。

  • 镜像制作器 (Mirror Maker) − 此工具用于将一个 Kafka 集群镜像到另一个集群。

  • 消费者偏移量检查器 (Consumer Offset Checker) − 此工具显示指定主题和消费者组的消费者组、主题、分区、偏移量、日志大小和所有者。

复制工具

Kafka 复制是一个高级设计工具。添加复制工具的目的是为了更强的持久性和更高的可用性。下面提到一些复制工具:

  • 创建主题工具 (Create Topic Tool) − 这将创建一个具有默认分区数、复制因子并使用 Kafka 的默认方案进行副本分配的主题。

  • 列出主题工具 (List Topic Tool) − 此工具列出给定主题列表的信息。如果命令行中未提供任何主题,则该工具将查询 ZooKeeper 以获取所有主题并列出它们的信息。该工具显示的字段是主题名称、分区、领导者、副本和 ISR。

  • 添加分区工具 (Add Partition Tool) − 创建主题时,必须指定主题的分区数。之后,当主题的容量增加时,可能需要为主题添加更多分区。此工具有助于为特定主题添加更多分区,并允许手动分配添加的分区的副本。

Apache Kafka - 应用

Kafka 支持当今许多最佳的工业应用。本章将简要概述 Kafka 在一些最值得注意的应用中的应用。

Twitter

Twitter 是一种在线社交网络服务,提供了一个发送和接收用户推文的平台。注册用户可以阅读和发布推文,但未注册用户只能阅读推文。Twitter 使用 Storm-Kafka 作为其流处理基础架构的一部分。

LinkedIn

LinkedIn 使用 Apache Kafka 来处理活动流数据和运营指标。Kafka 消息系统帮助 LinkedIn 提供各种产品,例如 LinkedIn 信息流、LinkedIn 今日新闻(用于在线消息消费)以及离线分析系统(例如 Hadoop)。Kafka 的强大持久性也是与 LinkedIn 连接的关键因素之一。

Netflix

Netflix 是一家美国跨国公司,提供按需互联网流媒体服务。Netflix 使用 Kafka 进行实时监控和事件处理。

Mozilla

Mozilla 是一个自由软件社区,由 Netscape 成员于 1998 年创建。Kafka 将很快取代 Mozilla 当前生产系统的一部分,用于从最终用户的浏览器收集性能和使用数据,用于遥测、测试飞行员等项目。

Oracle

Oracle 从其名为 OSB(Oracle Service Bus)的企业服务总线产品中提供对 Kafka 的原生连接,这允许开发人员利用 OSB 内置的调解功能来实现分阶段数据管道。

广告