Spark 中的 RDD 共享变量
RDD的全称是弹性分布式数据集。Spark 的性能基于这个容错的数据集,使其能够有效地处理各种大数据处理任务,包括 MapReduce、流处理、SQL、机器学习、图计算等。
Spark 支持多种编程语言,包括 Scala、Python 和 R。RDD 也支持这些语言的持久化。
如何创建 RDD
Spark 在许多地方支持 RDD 架构,包括本地文件系统、HDFS 文件系统、内存和 HBase。
对于本地文件系统,我们可以通过以下方式创建 RDD:
val distFile = sc.textFile("file:///user/root/rddData.txt")
默认情况下,Spark 从 HDFS 文件系统获取数据。因此,在 HDFS 文件系统中创建 RDD 的方法如下:
val distFile = sc.textFile("/user/root/rddData.txt")
用户也可以通过以下方式指定 HDFS URL:
val distFile = sc.textFile("hdfs://:4440/user/rddData.txt")
RDD 共享变量
如果任何函数在 Spark 中传播到执行函数,它都会应用于集群节点。Spark 使用计算中使用的每个变量的不同副本。这些更改会复制到每台机器,并且不会将对远程设备的动态更新恢复到驱动程序系统。
如果远程节点执行 Spark 的执行函数来工作,系统会将所有函数变量复制到该节点。如果这些变量在其他节点上更新,系统将不会更新当前节点变量,直到它被恢复到驱动程序系统。通常,在所有活动中灵活的读写能力效果不佳。Spark 使用两种类型的共享变量:
累加器
类似于 C 语言中的全局变量,Spark 支持多个函数更新同一个变量。这允许多个任务顺序地更新同一个变量。
创建累加器时,可以使用 SparkContext.longAccumulator() 或 SparkContext.doubleAccumulator() 创建长整型和双精度型两种类型的累加器。任务可以使用 add 方法向累加器添加值。执行器无法读取累加器变量的值,只有驱动程序才能读取其最终值。
scala> val accum = sc.longAccumulator("Accumulator Data")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(Accumulator Data), value: 0)
scala> sc.parallelize(Array(6, 7, 8, 9)).foreach(x => accum.add(x))
22/02/09 01:37:51 INFO SparkContext: Tasks finished in 0.274529s
广播变量
广播变量允许开发者在每个位置存储一个只读的只读副本,而无需将其复制到每个任务。这使得系统可以有效地将大型数据集复制到每个节点。Spark 通过使用广播变量来减少网络传输成本。
只有当任务有多个阶段,需要相同的数据,或者缓存的数据被反序列化时,广播变量才有效。要创建广播变量,可以使用以下命令:
scala> val broadcastVar = sc.broadcast(Array(6, 7, 8)) broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) scala> broadcastVar.value res0: Array[Int] = Array(6, 7, 8)
找到值字段中的广播变量。顾名思义,广播变量是单向从驱动程序发送到任务的。系统无法更新广播变量,驱动程序也无法更新它们。确保所有节点都接收相同的数据。
调用 unpersist() 方法释放广播变量使用的资源。如果应用程序再次使用该变量,系统会重新创建它。如果要永久删除广播变量,可以调用 destroy()。
结论
因此,在本文中,我们解释了 Spark 中的 RDD 共享变量。广播变量用于只读数据,可以在每个位置的第一次使用之前复制,然后用于进一步计算。
然后,我们了解了累加器如何帮助管理共享状态。希望通过这篇文章,您已经理解了共享变量的概念。
数据结构
网络
关系数据库管理系统 (RDBMS)
操作系统
Java
iOS
HTML
CSS
Android
Python
C 编程
C++
C#
MongoDB
MySQL
Javascript
PHP