- Apache Kafka 教程
- Apache Kafka - 首页
- Apache Kafka - 简介
- Apache Kafka - 基础知识
- Apache Kafka - 集群架构
- Apache Kafka - 工作流程
- Apache Kafka - 安装步骤
- Apache Kafka - 基本操作
- 简单的生产者示例
- 消费者组示例
- 与Storm集成
- 与Spark集成
- 实时应用(Twitter)
- Apache Kafka - 工具
- Apache Kafka - 应用
- Apache Kafka 有用资源
- Apache Kafka - 快速指南
- Apache Kafka - 有用资源
- Apache Kafka - 讨论
Apache Kafka - 与Spark集成
本章将讨论如何将Apache Kafka与Spark Streaming API集成。
关于Spark
Spark Streaming API 能够对实时数据流进行可扩展、高吞吐量、容错的流处理。数据可以从许多来源导入,例如Kafka、Flume、Twitter等,并可以使用复杂的算法(例如map、reduce、join和window等高级函数)进行处理。最后,处理后的数据可以推送到文件系统、数据库和实时仪表板。弹性分布式数据集 (RDD) 是Spark的基本数据结构。它是一个不可变的分布式对象集合。RDD中的每个数据集都划分为逻辑分区,这些分区可以在集群的不同节点上计算。
与Spark集成
Kafka是一个潜在的用于Spark流处理的消息传递和集成平台。Kafka充当实时数据流的中心枢纽,并使用Spark Streaming中的复杂算法进行处理。数据处理完成后,Spark Streaming可以将结果发布到另一个Kafka主题,或者存储在HDFS、数据库或仪表板中。下图描述了概念流程。
现在,让我们详细了解Kafka-Spark API。
SparkConf API
它表示Spark应用程序的配置。用于将各种Spark参数设置为键值对。
SparkConf
类具有以下方法:
set(string key, string value) − 设置配置变量。
remove(string key) − 从配置中删除键。
setAppName(string name) − 为您的应用程序设置应用程序名称。
get(string key) − 获取键
StreamingContext API
这是Spark功能的主要入口点。SparkContext表示与Spark集群的连接,可用于在集群上创建RDD、累加器和广播变量。签名定义如下所示。
public StreamingContext(String master, String appName, Duration batchDuration, String sparkHome, scala.collection.Seq<String> jars, scala.collection.Map<String,String> environment)
master − 要连接到的集群URL(例如mesos://host:port,spark://host:port,local[4])。
appName − 您的作业的名称,用于在集群Web UI上显示
batchDuration − 流数据将被分成批次的时间间隔
public StreamingContext(SparkConf conf, Duration batchDuration)
通过提供新的SparkContext所需的配置来创建StreamingContext。
conf − Spark参数
batchDuration − 流数据将被分成批次的时间间隔
KafkaUtils API
KafkaUtils API 用于将Kafka集群连接到Spark Streaming。此API具有重要的createStream
方法,签名如下所示。
public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream( StreamingContext ssc, String zkQuorum, String groupId, scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)
上述方法用于创建一个输入流,该输入流从Kafka Broker拉取消息。
ssc − StreamingContext对象。
zkQuorum − Zookeeper集群地址。
groupId − 此消费者的组ID。
topics − 返回要消费的主题映射。
storageLevel − 用于存储接收到的对象的存储级别。
KafkaUtils API 还有另一个方法createDirectStream,它用于创建一个输入流,该输入流直接从Kafka Broker拉取消息,而无需使用任何接收器。此流可以保证Kafka中的每条消息都只包含在转换中一次。
示例应用程序是用Scala编写的。要编译应用程序,请下载并安装sbt
,Scala构建工具(类似于maven)。主要应用程序代码如下所示。
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
构建脚本
spark-kafka集成依赖于spark、spark streaming和spark Kafka集成jar包。创建一个新文件build.sbt
并指定应用程序详细信息及其依赖项。sbt
将在编译和打包应用程序时下载必要的jar包。
name := "Spark Kafka Project" version := "1.0" scalaVersion := "2.10.5" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0" libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0" libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"
编译/打包
运行以下命令来编译和打包应用程序的jar文件。我们需要将jar文件提交到spark控制台才能运行应用程序。
sbt package
提交到Spark
启动Kafka Producer CLI(在上一章中解释),创建一个名为my-first-topic
的新主题,并提供一些示例消息,如下所示。
Another spark test message
运行以下命令将应用程序提交到spark控制台。
/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming -kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark -kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>
此应用程序的示例输出如下所示。
spark console messages .. (Test,1) (spark,1) (another,1) (message,1) spark console message ..