- PySpark 教程
- PySpark - 首页
- PySpark - 简介
- PySpark - 环境搭建
- PySpark - SparkContext
- PySpark - RDD
- PySpark - 广播变量 & 累加器
- PySpark - SparkConf
- PySpark - SparkFiles
- PySpark - 存储级别
- PySpark - MLlib
- PySpark - 序列化器
- PySpark 有用资源
- PySpark - 快速指南
- PySpark - 有用资源
- PySpark - 讨论
PySpark - RDD
现在我们已经在系统上安装并配置了 PySpark,我们就可以在 Apache Spark 上使用 Python 进行编程了。但在开始之前,让我们先了解 Spark 中一个基本的概念 - RDD。
RDD 代表 **弹性分布式数据集**,它们是在多个节点上运行和操作的元素,用于在集群上进行并行处理。RDD 是不可变的元素,这意味着一旦创建了 RDD 就无法更改它。RDD 也是容错的,因此在发生任何故障时,它们会自动恢复。您可以对这些 RDD 应用多个操作来完成特定任务。
要对这些 RDD 应用操作,有两种方法:
- 转换和
- 行动
让我们详细了解这两种方法。
**转换** - 这些是在 RDD 上应用的操作,用于创建新的 RDD。Filter、groupBy 和 map 是转换的示例。
**行动** - 这些是在 RDD 上应用的操作,指示 Spark 执行计算并将结果发送回驱动程序。
要在 PySpark 中应用任何操作,我们首先需要创建一个 **PySpark RDD**。以下代码块详细介绍了 PySpark RDD 类:
class pyspark.RDD ( jrdd, ctx, jrdd_deserializer = AutoBatchedSerializer(PickleSerializer()) )
让我们看看如何使用 PySpark 运行一些基本操作。以下 Python 文件中的代码创建了 RDD words,它存储了一组提到的单词。
words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] )
我们现在将在 words 上运行一些操作。
count()
返回 RDD 中元素的数量。
----------------------------------------count.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "count app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) counts = words.count() print "Number of elements in RDD -> %i" % (counts) ----------------------------------------count.py---------------------------------------
**命令** - count() 的命令如下:
$SPARK_HOME/bin/spark-submit count.py
**输出** - 上述命令的输出为:
Number of elements in RDD → 8
collect()
返回 RDD 中的所有元素。
----------------------------------------collect.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Collect app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) coll = words.collect() print "Elements in RDD -> %s" % (coll) ----------------------------------------collect.py---------------------------------------
**命令** - collect() 的命令如下:
$SPARK_HOME/bin/spark-submit collect.py
**输出** - 上述命令的输出为:
Elements in RDD -> [ 'scala', 'java', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark' ]
foreach(f)
仅返回满足 foreach 内部函数条件的元素。在以下示例中,我们在 foreach 中调用 print 函数,该函数打印 RDD 中的所有元素。
----------------------------------------foreach.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "ForEach app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) def f(x): print(x) fore = words.foreach(f) ----------------------------------------foreach.py---------------------------------------
**命令** - foreach(f) 的命令如下:
$SPARK_HOME/bin/spark-submit foreach.py
**输出** - 上述命令的输出为:
scala java hadoop spark akka spark vs hadoop pyspark pyspark and spark
filter(f)
返回一个新的 RDD,其中包含满足 filter 内部函数的元素。在以下示例中,我们过滤掉包含“spark”的字符串。
----------------------------------------filter.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Filter app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) words_filter = words.filter(lambda x: 'spark' in x) filtered = words_filter.collect() print "Fitered RDD -> %s" % (filtered) ----------------------------------------filter.py----------------------------------------
**命令** - filter(f) 的命令如下:
$SPARK_HOME/bin/spark-submit filter.py
**输出** - 上述命令的输出为:
Fitered RDD -> [ 'spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark' ]
map(f, preservesPartitioning = False)
通过将函数应用于 RDD 中的每个元素来返回一个新的 RDD。在以下示例中,我们形成了一个键值对,并将每个字符串映射到值为 1。
----------------------------------------map.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Map app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) words_map = words.map(lambda x: (x, 1)) mapping = words_map.collect() print "Key value pair -> %s" % (mapping) ----------------------------------------map.py---------------------------------------
**命令** - map(f, preservesPartitioning=False) 的命令如下:
$SPARK_HOME/bin/spark-submit map.py
**输出** - 上述命令的输出为:
Key value pair -> [ ('scala', 1), ('java', 1), ('hadoop', 1), ('spark', 1), ('akka', 1), ('spark vs hadoop', 1), ('pyspark', 1), ('pyspark and spark', 1) ]
reduce(f)
在执行指定的交换和关联二元运算后,返回 RDD 中的元素。在以下示例中,我们从 operator 中导入 add 包,并将其应用于 'num' 以执行简单的加法运算。
----------------------------------------reduce.py--------------------------------------- from pyspark import SparkContext from operator import add sc = SparkContext("local", "Reduce app") nums = sc.parallelize([1, 2, 3, 4, 5]) adding = nums.reduce(add) print "Adding all the elements -> %i" % (adding) ----------------------------------------reduce.py---------------------------------------
**命令** - reduce(f) 的命令如下:
$SPARK_HOME/bin/spark-submit reduce.py
**输出** - 上述命令的输出为:
Adding all the elements -> 15
join(other, numPartitions = None)
它返回一个 RDD,其中包含具有匹配键的元素对以及该特定键的所有值。在以下示例中,两个不同的 RDD 中有两个元素对。连接这两个 RDD 后,我们得到一个 RDD,其中包含具有匹配键及其值的元素。
----------------------------------------join.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Join app") x = sc.parallelize([("spark", 1), ("hadoop", 4)]) y = sc.parallelize([("spark", 2), ("hadoop", 5)]) joined = x.join(y) final = joined.collect() print "Join RDD -> %s" % (final) ----------------------------------------join.py---------------------------------------
**命令** - join(other, numPartitions = None) 的命令如下:
$SPARK_HOME/bin/spark-submit join.py
**输出** - 上述命令的输出为:
Join RDD -> [ ('spark', (1, 2)), ('hadoop', (4, 5)) ]
cache()
使用默认存储级别 (MEMORY_ONLY) 持久化此 RDD。您还可以检查 RDD 是否已缓存。
----------------------------------------cache.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Cache app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) words.cache() caching = words.persist().is_cached print "Words got chached > %s" % (caching) ----------------------------------------cache.py---------------------------------------
**命令** - cache() 的命令如下:
$SPARK_HOME/bin/spark-submit cache.py
**输出** - 上述程序的输出为:
Words got cached -> True
这些是在 PySpark RDD 上执行的一些最重要的操作。