- Apache Spark 教程
- Apache Spark - 首页
- Apache Spark - 简介
- Apache Spark - RDD
- Apache Spark - 安装
- Apache Spark - 核心编程
- Apache Spark - 部署
- 高级 Spark 编程
- Apache Spark 有用资源
- Apache Spark - 快速指南
- Apache Spark - 有用资源
- Apache Spark - 讨论
Apache Spark - 部署
Spark 应用程序使用 `spark-submit` 命令行工具部署到集群。它通过统一的接口使用所有相应的集群管理器,因此无需为每个管理器单独配置应用程序。
示例
让我们以之前用过的单词计数为例,使用shell命令。这里我们将相同的例子作为一个Spark应用程序。
示例输入
以下文本是输入数据,文件名是 in.txt。
people are not as beautiful as they look, as they walk or as they talk. they are only as beautiful as they love, as they care as they share.
查看以下程序:
SparkWordCount.scala
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark._ object SparkWordCount { def main(args: Array[String]) { val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) /* local = master URL; Word Count = application name; */ /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ /* Map = variables to work nodes */ /*creating an inputRDD to read text file (in.txt) through Spark context*/ val input = sc.textFile("in.txt") /* Transform the inputRDD into countRDD */ val count = input.flatMap(line ⇒ line.split(" ")) .map(word ⇒ (word, 1)) .reduceByKey(_ + _) /* saveAsTextFile method is an action that effects on the RDD */ count.saveAsTextFile("outfile") System.out.println("OK"); } }
将上述程序保存到名为 SparkWordCount.scala 的文件中,并将其放置在名为 spark-application 的用户定义目录中。
注意:将 inputRDD 转换为 countRDD 时,我们使用 flatMap() 将文本文件中的行标记化为单词,使用 map() 方法计算单词频率,并使用 reduceByKey() 方法计算每个单词的重复次数。
请按照以下步骤提交此应用程序。在终端通过 spark-application 目录执行所有步骤。
步骤 1:下载 Spark Jar
编译需要 Spark core jar 包,因此,请从以下链接下载 spark-core_2.10-1.3.0.jar Spark core jar 并将 jar 文件从下载目录移动到 spark-application 目录。
步骤 2:编译程序
使用以下命令编译上述程序。此命令应从 spark-application 目录执行。这里,/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar 是从 Spark 库中获取的 Hadoop 支持 jar 包。
$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala
步骤 3:创建 JAR 包
使用以下命令创建 Spark 应用程序的 jar 文件。这里,wordcount 是 jar 文件的文件名。
jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar
步骤 4:提交 Spark 应用程序
使用以下命令提交 Spark 应用程序:
spark-submit --class SparkWordCount --master local wordcount.jar
如果成功执行,您将看到以下输出。以下输出中的 OK 用于用户标识,它是程序的最后一行。如果您仔细阅读以下输出,您会发现不同的内容,例如:
- 成功启动服务“sparkDriver”在端口 42954 上
- MemoryStore 以 267.3 MB 的容量启动
- SparkUI 启动于 http://192.168.1.217:4040
- 已添加 JAR 文件:/home/hadoop/piapplication/count.jar
- ResultStage 1 (saveAsTextFile at SparkPi.scala:11) 在 0.566 秒内完成
- 停止 Spark web UI 在 http://192.168.1.217:4040
- MemoryStore 已清除
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started 15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954. 15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42954] 15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB 15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server 15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707. 15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040 15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029 15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader 15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54 15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable 15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11) 15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s 15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s OK 15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook 15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040 15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler 15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion. 15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared 15/07/08 13:56:14 INFO BlockManager: BlockManager stopped 15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped 15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext 15/07/08 13:56:14 INFO Utils: Shutdown hook called 15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af 15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
步骤 5:检查输出
程序成功执行后,您将在 spark-application 目录中找到名为 outfile 的目录。
以下命令用于打开和检查 outfile 目录中的文件列表。
$ cd outfile $ ls Part-00000 part-00001 _SUCCESS
检查 part-00000 文件中输出的命令:
$ cat part-00000 (people,1) (are,2) (not,1) (as,8) (beautiful,2) (they, 7) (look,1)
检查 part-00001 文件中输出的命令:
$ cat part-00001 (walk, 1) (or, 1) (talk, 1) (only, 1) (love, 1) (care, 1) (share, 1)
请阅读下一节以了解有关“spark-submit”命令的更多信息。
Spark-submit 语法
spark-submit [options] <app jar | python file> [app arguments]
选项
序号 | 选项 | 描述 |
---|---|---|
1 | --master | spark://host:port, mesos://host:port, yarn 或 local。 |
2 | --deploy-mode | 是否在本地 ("client") 或集群中的一个工作机器上 ("cluster") 启动驱动程序程序(默认:client)。 |
3 | --class | 应用程序的主类(对于 Java/Scala 应用程序)。 |
4 | --name | 应用程序的名称。 |
5 | --jars | 要包含在驱动程序和执行程序类路径中的逗号分隔的本地 jar 文件列表。 |
6 | --packages | 要包含在驱动程序和执行程序类路径中的 jar 文件的 Maven 坐标的逗号分隔列表。 |
7 | --repositories | 用于搜索使用 --packages 给出的 Maven 坐标的其他远程存储库的逗号分隔列表。 |
8 | --py-files | 要放在 Python 应用程序的 PYTHON PATH 上的 .zip、.egg 或 .py 文件的逗号分隔列表。 |
9 | --files | 要放在每个执行程序的工作目录中的文件的逗号分隔列表。 |
10 | --conf (prop=val) | 任意的 Spark 配置属性。 |
11 | --properties-file | 要从中加载额外属性的文件的路径。如果未指定,这将查找 conf/spark-defaults.conf。 |
12 | --driver-memory | 驱动程序的内存(例如 1000M、2G)(默认:512M)。 |
13 | --driver-java-options | 要传递给驱动程序的额外 Java 选项。 |
14 | --driver-library-path | 要传递给驱动程序的额外库路径条目。 |
15 | --driver-class-path | 要传递给驱动程序的额外类路径条目。 请注意,使用 --jars 添加的 jar 包会自动包含在类路径中。 |
16 | --executor-memory | 每个执行程序的内存(例如 1000M、2G)(默认:1G)。 |
17 | --proxy-user | 提交应用程序时要模拟的用户。 |
18 | --help, -h | 显示此帮助消息并退出。 |
19 | --verbose, -v | 打印额外的调试输出。 |
20 | --version | 打印当前 Spark 的版本。 |
21 | --driver-cores NUM | 驱动程序的核心数(默认:1)。 |
22 | --supervise | 如果给出,则在发生故障时重新启动驱动程序。 |
23 | --kill | 如果给出,则终止指定的驱动程序。 |
24 | --status | 如果给出,则请求指定的驱动程序的状态。 |
25 | --total-executor-cores | 所有执行程序的总核心数。 |
26 | --executor-cores | 每个执行程序的核心数。(默认:在 YARN 模式下为 1,或在独立模式下为工作程序上的所有可用核心)。 |