Spark 中的 RDD 共享变量


RDD的全称是弹性分布式数据集。Spark 的性能基于这个容错的数据集,使其能够有效地处理各种大数据处理任务,包括 MapReduce、流处理、SQL、机器学习、图计算等。

Spark 支持多种编程语言,包括 Scala、Python 和 R。RDD 也支持这些语言的持久化。

如何创建 RDD

Spark 在许多地方支持 RDD 架构,包括本地文件系统、HDFS 文件系统、内存和 HBase。

对于本地文件系统,我们可以通过以下方式创建 RDD:

val distFile = sc.textFile("file:///user/root/rddData.txt")

默认情况下,Spark 从 HDFS 文件系统获取数据。因此,在 HDFS 文件系统中创建 RDD 的方法如下:

val distFile = sc.textFile("/user/root/rddData.txt")

用户也可以通过以下方式指定 HDFS URL:

val distFile = sc.textFile("hdfs://:4440/user/rddData.txt")

RDD 共享变量

如果任何函数在 Spark 中传播到执行函数,它都会应用于集群节点。Spark 使用计算中使用的每个变量的不同副本。这些更改会复制到每台机器,并且不会将对远程设备的动态更新恢复到驱动程序系统。

如果远程节点执行 Spark 的执行函数来工作,系统会将所有函数变量复制到该节点。如果这些变量在其他节点上更新,系统将不会更新当前节点变量,直到它被恢复到驱动程序系统。通常,在所有活动中灵活的读写能力效果不佳。Spark 使用两种类型的共享变量:

累加器

类似于 C 语言中的全局变量,Spark 支持多个函数更新同一个变量。这允许多个任务顺序地更新同一个变量。

创建累加器时,可以使用 SparkContext.longAccumulator() 或 SparkContext.doubleAccumulator() 创建长整型和双精度型两种类型的累加器。任务可以使用 add 方法向累加器添加值。执行器无法读取累加器变量的值,只有驱动程序才能读取其最终值。

scala> val accum = sc.longAccumulator("Accumulator Data")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(Accumulator Data), value: 0)
scala> sc.parallelize(Array(6, 7, 8, 9)).foreach(x => accum.add(x))
22/02/09 01:37:51 INFO SparkContext: Tasks finished in 0.274529s

广播变量

广播变量允许开发者在每个位置存储一个只读的只读副本,而无需将其复制到每个任务。这使得系统可以有效地将大型数据集复制到每个节点。Spark 通过使用广播变量来减少网络传输成本。

只有当任务有多个阶段,需要相同的数据,或者缓存的数据被反序列化时,广播变量才有效。要创建广播变量,可以使用以下命令:

scala> val broadcastVar = sc.broadcast(Array(6, 7, 8))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(6, 7, 8)

找到值字段中的广播变量。顾名思义,广播变量是单向从驱动程序发送到任务的。系统无法更新广播变量,驱动程序也无法更新它们。确保所有节点都接收相同的数据。

调用 unpersist() 方法释放广播变量使用的资源。如果应用程序再次使用该变量,系统会重新创建它。如果要永久删除广播变量,可以调用 destroy()。

结论

因此,在本文中,我们解释了 Spark 中的 RDD 共享变量。广播变量用于只读数据,可以在每个位置的第一次使用之前复制,然后用于进一步计算。

然后,我们了解了累加器如何帮助管理共享状态。希望通过这篇文章,您已经理解了共享变量的概念。

更新于:2022年8月25日

浏览量:513

开启您的职业生涯

完成课程获得认证

开始学习
广告
© . All rights reserved.