Apache Spark - 核心编程



Spark Core 是整个项目的基石。它提供分布式任务调度、计划和基本的 I/O 功能。Spark 使用一种称为 RDD(弹性分布式数据集)的特殊基本数据结构,它是在机器之间分区的逻辑数据集合。RDD 可以通过两种方式创建;一种是引用外部存储系统中的数据集,另一种是在现有的 RDD 上应用转换(例如 map、filter、reducer、join)。

RDD 抽象通过语言集成 API 公开。这简化了编程复杂性,因为应用程序操作 RDD 的方式类似于操作本地数据集合。

Spark Shell

Spark 提供了一个交互式 shell——一个强大的工具,用于交互式地分析数据。它以 Scala 或 Python 语言提供。Spark 的主要抽象是称为弹性分布式数据集 (RDD) 的项目的分布式集合。RDD 可以从 Hadoop 输入格式(例如 HDFS 文件)创建,也可以通过转换其他 RDD 创建。

打开 Spark Shell

以下命令用于打开 Spark shell。

$ spark-shell

创建简单的 RDD

让我们从文本文件创建一个简单的 RDD。使用以下命令创建简单的 RDD。

scala> val inputfile = sc.textFile(“input.txt”)

上述命令的输出为

inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12

Spark RDD API 引入了一些**转换**和一些**操作**来操作 RDD。

RDD 转换

RDD 转换返回指向新 RDD 的指针,并允许您在 RDD 之间创建依赖关系。依赖链(依赖关系字符串)中的每个 RDD 都有一个计算其数据的函数,并有一个指向其父 RDD 的指针(依赖关系)。

Spark 是惰性的,因此除非您调用一些将触发作业创建和执行的转换或操作,否则不会执行任何操作。请查看以下单词计数示例代码片段。

因此,RDD 转换不是一组数据,而是在程序中的一步(可能是唯一的一步),告诉 Spark 如何获取数据以及如何处理它。

下面列出了 RDD 转换。

序号 转换及含义
1

map(func)

返回一个新的分布式数据集,该数据集通过函数**func**传递源的每个元素形成。

2

filter(func)

返回一个新的数据集,该数据集通过选择**func**返回 true 的源的那些元素形成。

3

flatMap(func)

类似于 map,但每个输入项可以映射到 0 个或多个输出项(因此func应该返回 Seq 而不是单个项)。

4

mapPartitions(func)

类似于 map,但在 RDD 的每个分区(块)上单独运行,因此当在类型为 T 的 RDD 上运行时,**func**必须为 Iterator<T> ⇒ Iterator<U> 类型。

5

mapPartitionsWithIndex(func)

类似于 map Partitions,但还为**func**提供一个表示分区索引的整数值,因此当在类型为 T 的 RDD 上运行时,**func**必须为 (Int, Iterator<T>) ⇒ Iterator<U> 类型。

6

sample(withReplacement, fraction, seed)

使用给定的随机数生成器种子对数据进行**fraction**采样,可以替换或不替换。

7

union(otherDataset)

返回一个新的数据集,其中包含源数据集和参数中的元素的并集。

8

intersection(otherDataset)

返回一个新的 RDD,其中包含源数据集和参数中元素的交集。

9

distinct([numTasks])

返回一个新的数据集,其中包含源数据集的唯一元素。

10

groupByKey([numTasks])

在 (K, V) 对的数据集上调用时,返回 (K, Iterable<V>) 对的数据集。

注意 - 如果您为了对每个键执行聚合(例如总和或平均值)而进行分组,则使用 reduceByKey 或 aggregateByKey 将产生更好的性能。

11

reduceByKey(func, [numTasks])

在 (K, V) 对的数据集上调用时,返回 (K, V) 对的数据集,其中每个键的值使用给定的归约函数func聚合,该函数必须为 (V, V) ⇒ V 类型。与 groupByKey 一样,归约任务的数量可以通过可选的第二个参数进行配置。

12

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

在 (K, V) 对的数据集上调用时,返回 (K, U) 对的数据集,其中每个键的值使用给定的组合函数和中性“零”值聚合。允许与输入值类型不同的聚合值类型,同时避免不必要的分配。与 groupByKey 一样,归约任务的数量可以通过可选的第二个参数进行配置。

13

sortByKey([ascending], [numTasks])

在 (K, V) 对的数据集上调用时,其中 K 实现 Ordered,返回 (K, V) 对的数据集,这些对按键升序或降序排序,如布尔型 ascending 参数中指定的那样。

14

join(otherDataset, [numTasks])

在类型为 (K, V) 和 (K, W) 的数据集上调用时,返回 (K, (V, W)) 对的数据集,其中包含每个键的所有元素对。通过 leftOuterJoin、rightOuterJoin 和 fullOuterJoin 支持外部联接。

15

cogroup(otherDataset, [numTasks])

在类型为 (K, V) 和 (K, W) 的数据集上调用时,返回 (K, (Iterable<V>, Iterable<W>)) 元组的数据集。此操作也称为 group With。

16

cartesian(otherDataset)

在类型为 T 和 U 的数据集上调用时,返回 (T, U) 对的数据集(所有元素对)。

17

pipe(command, [envVars])

通过 shell 命令(例如 Perl 或 bash 脚本)将 RDD 的每个分区传递到管道中。RDD 元素写入进程的 stdin,输出到其 stdout 的行作为字符串的 RDD 返回。

18

coalesce(numPartitions)

将 RDD 中的分区数量减少到 numPartitions。在筛选大型数据集后更有效地运行操作很有用。

19

repartition(numPartitions)

随机重新整理 RDD 中的数据以创建更多或更少的分区,并在它们之间进行平衡。这始终通过网络混洗所有数据。

20

repartitionAndSortWithinPartitions(partitioner)

根据给定的分区器重新分区 RDD,并在每个结果分区中按其键对记录排序。这比调用 repartition 然后在每个分区中排序更有效,因为它可以将排序推送到混洗机制中。

操作

下表列出了返回值的**操作**。

序号 操作及含义
1

reduce(func)

使用函数**func**聚合数据集的元素(该函数接受两个参数并返回一个)。该函数应该是可交换的和关联的,以便能够在并行中正确计算。

2

collect()

将数据集的所有元素作为驱动程序程序中的数组返回。这通常在筛选或其他返回足够小的数据子集的操作之后很有用。

3

count()

返回数据集中元素的数量。

4

first()

返回数据集的第一个元素(类似于 take (1))。

5

take(n)

返回一个数组,其中包含数据集的前**n**个元素。

6

takeSample (withReplacement,num, [seed])

返回一个数组,其中包含数据集的**num**个元素的随机样本,可以替换或不替换,可以选择预先指定随机数生成器种子。

7

takeOrdered(n, [ordering])

使用其自然顺序或自定义比较器返回 RDD 的前**n**个元素。

8

saveAsTextFile(path)

将数据集的元素作为文本文件(或一组文本文件)写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统中的给定目录中。Spark 对每个元素调用 toString 以将其转换为文件中的文本行。

9

saveAsSequenceFile(path) (Java 和 Scala)

将数据集的元素作为 Hadoop SequenceFile 写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统中的给定路径中。这在实现 Hadoop 的 Writable 接口的键值对的 RDD 上可用。在 Scala 中,它也可用于隐式可转换为 Writable 的类型(Spark 包含基本类型(如 Int、Double、String 等)的转换)。

10

saveAsObjectFile(path) (Java 和 Scala)

使用 Java 序列化以简单的格式写入数据集的元素,然后可以使用 SparkContext.objectFile() 加载。

11

countByKey()

仅在类型为 (K, V) 的 RDD 上可用。返回 (K, Int) 对的哈希映射,其中包含每个键的计数。

12

foreach(func)

在数据集的每个元素上运行函数**func**。这通常是为了副作用,例如更新累加器或与外部存储系统交互。

注意 - 修改 foreach() 外部累加器以外的变量可能会导致未定义的行为。有关更多详细信息,请参阅理解闭包。

使用 RDD 进行编程

让我们通过示例了解 RDD 编程中一些 RDD 转换和操作的实现。

示例

考虑一个单词计数示例——它计算文档中出现的每个单词。将以下文本作为输入,并将其另存为主目录中的input.txt文件。

input.txt - 输入文件。

people are not as beautiful as they look, 
as they walk or as they talk.
they are only as beautiful  as they love, 
as they care as they share.

按照以下步骤执行给定的示例。

打开 Spark-Shell

以下命令用于打开 spark shell。通常,spark 使用 Scala 构建。因此,Spark 程序在 Scala 环境中运行。

$ spark-shell

如果 Spark shell 成功打开,您将找到以下输出。查看输出的最后一行“Spark context available as sc”表示 Spark 容器自动创建名为sc的 spark context 对象。在开始程序的第一步之前,应创建 SparkContext 对象。

Spark assembly has been built with Hive, including Datanucleus jars on classpath 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
   ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server 
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292. 
Welcome to 
      ____              __ 
     / __/__  ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/  '_/ 
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.0 
      /_/  
		
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71) 
Type in expressions to have them evaluated. 
Spark context available as sc 
scala>

创建 RDD

首先,我们必须使用 Spark-Scala API 读取输入文件并创建 RDD。

以下命令用于从给定位置读取文件。此处,使用输入文件名创建新的 RDD。作为 textFile("") 方法中参数给出的字符串是输入文件名的绝对路径。但是,如果只给出文件名,则表示输入文件位于当前位置。

scala> val inputfile = sc.textFile("input.txt")

执行单词计数转换

我们的目标是计算文件中的单词。创建一个 flat map 以将每一行拆分为单词(flatMap(line ⇒ line.split(" ")))。

接下来,使用 map 函数(map(word ⇒ (word, 1))将每个单词作为键读取,值为‘1’(<key, value> = <word,1>)。

最后,通过添加相似键的值来减少这些键(reduceByKey(_+_))。

以下命令用于执行单词计数逻辑。执行此操作后,您将找不到任何输出,因为这不是操作,这是一个转换;指向新的 RDD 或告诉 spark 如何处理给定的数据)

scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);

当前 RDD

在使用RDD时,如果想了解当前RDD的信息,可以使用以下命令。它会显示有关当前RDD及其依赖项的描述,以便进行调试。

scala> counts.toDebugString

缓存转换

可以使用RDD上的`persist()`或`cache()`方法将其标记为持久化。第一次在action中计算它时,它将保存在节点的内存中。使用以下命令将中间转换存储在内存中。

scala> counts.cache()

应用Action

应用一个action,例如将所有转换存储到文本文件。`saveAsTextFile(“ ”)`方法的字符串参数是输出文件夹的绝对路径。尝试以下命令将输出保存到文本文件中。在以下示例中,'output'文件夹位于当前位置。

scala> counts.saveAsTextFile("output")

检查输出

打开另一个终端,转到主目录(Spark在另一个终端中执行的地方)。使用以下命令检查输出目录。

[hadoop@localhost ~]$ cd output/ 
[hadoop@localhost output]$ ls -1 
 
part-00000 
part-00001 
_SUCCESS

以下命令用于查看来自Part-00000文件的输出。

[hadoop@localhost output]$ cat part-00000

输出

(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1) 

以下命令用于查看来自Part-00001文件的输出。

[hadoop@localhost output]$ cat part-00001 

输出

(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1) 

取消持久化存储

在取消持久化之前,如果想查看此应用程序使用的存储空间,请在浏览器中使用以下URL。

https://127.0.0.1:4040

您将看到以下屏幕,其中显示了应用程序使用的存储空间,这些应用程序正在Spark shell上运行。

storage space

如果要取消持久化特定RDD的存储空间,请使用以下命令。

Scala> counts.unpersist()

您将看到如下输出:

15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list 
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9 
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1 
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810) 
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0 
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106) 
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14

要验证浏览器中的存储空间,请使用以下URL。

https://127.0.0.1:4040/

您将看到以下屏幕。它显示了应用程序使用的存储空间,这些应用程序正在Spark shell上运行。

Storage space for application
广告