- PySpark 教程
- PySpark - 首页
- PySpark - 简介
- PySpark - 环境搭建
- PySpark - SparkContext
- PySpark - RDD
- PySpark - 广播变量 & 累加器
- PySpark - SparkConf
- PySpark - SparkFiles
- PySpark - 存储级别 (StorageLevel)
- PySpark - MLlib
- PySpark - 序列化器
- PySpark 有用资源
- PySpark - 快速指南
- PySpark - 有用资源
- PySpark - 讨论
PySpark - 广播变量 & 累加器
为了并行处理,Apache Spark 使用共享变量。当驱动程序向集群中的执行器发送任务时,共享变量的副本会复制到集群的每个节点上,以便用于执行任务。
Apache Spark 支持两种类型的共享变量:
- 广播变量 (Broadcast)
- 累加器 (Accumulator)
让我们详细了解它们。
广播变量 (Broadcast)
广播变量用于在所有节点上保存数据的副本。此变量缓存在所有机器上,不会发送到具有任务的机器上。以下代码块包含 PySpark 广播类的详细信息。
class pyspark.Broadcast ( sc = None, value = None, pickle_registry = None, path = None )
以下示例显示如何使用广播变量。广播变量具有一个名为 value 的属性,该属性存储数据并用于返回广播值。
----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Broadcast app")
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"])
data = words_new.value
print "Stored data -> %s" % (data)
elem = words_new.value[2]
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------
命令 - 广播变量的命令如下:
$SPARK_HOME/bin/spark-submit broadcast.py
输出 - 以下命令的输出如下所示。
Stored data -> [ 'scala', 'java', 'hadoop', 'spark', 'akka' ] Printing a particular element in RDD -> hadoop
累加器 (Accumulator)
累加器变量用于通过关联和交换运算聚合信息。例如,您可以将累加器用于求和运算或计数器(在 MapReduce 中)。以下代码块包含 PySpark 累加器类的详细信息。
class pyspark.Accumulator(aid, value, accum_param)
以下示例显示如何使用累加器变量。累加器变量具有一个名为 value 的属性,类似于广播变量。它存储数据并用于返回累加器的值,但仅可在驱动程序程序中使用。
在此示例中,累加器变量由多个工作器使用并返回累积值。
----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Accumulator app")
num = sc.accumulator(10)
def f(x):
global num
num+=x
rdd = sc.parallelize([20,30,40,50])
rdd.foreach(f)
final = num.value
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------
命令 - 累加器变量的命令如下:
$SPARK_HOME/bin/spark-submit accumulator.py
输出 - 以上命令的输出如下所示。
Accumulated value is -> 150
广告