高级 Spark 编程



Spark 包含两种不同类型的共享变量——一种是**广播变量**,另一种是**累加器**。

  • **广播变量** - 用于高效地分发大型值。

  • **累加器** - 用于聚合特定集合的信息。

广播变量

广播变量允许程序员将只读变量缓存在每台机器上,而不是随任务一起发送它的副本。例如,它们可以用于以有效的方式为每个节点提供大型输入数据集的副本。Spark 还尝试使用高效的广播算法来分发广播变量,以降低通信成本。

Spark 操作通过一系列阶段执行,这些阶段由分布式“洗牌”操作分隔。Spark 自动广播每个阶段内任务所需的公共数据。

以这种方式广播的数据以序列化形式缓存,并在运行每个任务之前进行反序列化。这意味着,仅当跨多个阶段的任务需要相同的数据或当以反序列化形式缓存数据很重要时,显式创建广播变量才有用。

通过调用**SparkContext.broadcast(v)**从变量**v**创建广播变量。广播变量是**v**的包装器,其值可以通过调用**value**方法来访问。下面给出的代码显示了这一点 -

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))

**输出** -

broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

创建广播变量后,应在集群上运行的任何函数中使用它,而不是使用值**v**,以确保**v**不会多次发送到节点。此外,在广播后不应修改对象**v**,以确保所有节点获得广播变量的相同值。

累加器

累加器是仅通过关联操作“添加”的变量,因此可以在并行环境中得到有效支持。它们可以用于实现计数器(如 MapReduce 中)或总和。Spark 原生支持数值类型的累加器,程序员可以添加对新类型的支持。如果使用名称创建累加器,它们将显示在**Spark 的 UI** 中。这对于了解正在运行的阶段的进度很有用(注意 - Python 中尚不支持此功能)。

通过调用**SparkContext.accumulator(v)**从初始值**v**创建累加器。然后,在集群上运行的任务可以使用**add**方法或+=运算符(在 Scala 和 Python 中)添加到它。但是,它们无法读取其值。只有驱动程序程序可以使用其**value**方法读取累加器的值。

下面给出的代码显示了一个用于将数组元素加起来的累加器 -

scala> val accum = sc.accumulator(0) 
 
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

如果要查看上述代码的输出,请使用以下命令 -

scala> accum.value 

输出

res2: Int = 10 

数值 RDD 操作

Spark 允许您使用预定义的 API 方法之一对数值数据执行不同的操作。Spark 的数值操作使用流算法实现,该算法允许一次构建一个元素的模型。

这些操作通过调用**status()**方法计算并作为**StatusCounter**对象返回。

以下是**StatusCounter**中可用的数值方法列表。

序号 方法及含义
1

count()

RDD 中元素的数量。

2

Mean()

RDD 中元素的平均值。

3

Sum()

RDD 中元素的总值。

4

Max()

RDD 中所有元素中的最大值。

5

Min()

RDD 中所有元素中的最小值。

6

Variance()

元素的方差。

7

Stdev()

标准差。

如果只想使用其中一种方法,可以直接在 RDD 上调用相应的方法。

广告