Spark SQL 快速指南



Spark - 简介

各行业广泛使用 Hadoop 来分析其数据集。原因在于 Hadoop 框架基于简单的编程模型(MapReduce),它能够提供可扩展、灵活、容错且经济高效的计算解决方案。这里,主要关注的是在处理大型数据集时保持速度,包括查询之间的等待时间和程序运行的等待时间。

Apache 软件基金会推出了 Spark,旨在加快 Hadoop 计算软件的处理速度。

与普遍的看法相反,Spark 不是 Hadoop 的修改版本,实际上也不依赖于 Hadoop,因为它拥有自己的集群管理机制。Hadoop 只是实现 Spark 的一种方式。

Spark 以两种方式使用 Hadoop——一种是存储,另一种是处理。由于 Spark 拥有自己的集群管理计算,它只将 Hadoop 用于存储目的。

Apache Spark

Apache Spark 是一种闪电般快速的集群计算技术,专为快速计算而设计。它基于 Hadoop MapReduce,并扩展了 MapReduce 模型,以便高效地将其用于更多类型的计算,包括交互式查询和流处理。Spark 的主要特点是其内存集群计算,这提高了应用程序的处理速度。

Spark 旨在涵盖各种工作负载,例如批处理应用程序、迭代算法、交互式查询和流处理。除了支持各个系统中的所有这些工作负载外,它还减少了维护单独工具的管理负担。

Apache Spark 的发展

Spark 是 Hadoop 的一个子项目,由 Matei Zaharia 于 2009 年在加州大学伯克利分校的 AMPLab 开发。它于 2010 年在 BSD 许可下开源。它于 2013 年捐赠给 Apache 软件基金会,现在 Apache Spark 已成为自 2014 年 2 月以来的顶级 Apache 项目。

Apache Spark 的特性

Apache Spark 具有以下特性:

  • 速度 - Spark 有助于在 Hadoop 集群中运行应用程序,在内存中运行速度快达 100 倍,在磁盘上运行速度快达 10 倍。这是通过减少对磁盘的读/写操作次数实现的。它将中间处理数据存储在内存中。

  • 支持多种语言 - Spark 提供了 Java、Scala 或 Python 的内置 API。因此,您可以使用不同的语言编写应用程序。Spark 提供了 80 多个用于交互式查询的高级运算符。

  • 高级分析 - Spark 不仅支持“Map”和“reduce”,还支持 SQL 查询、流数据、机器学习 (ML) 和图算法。

Spark 基于 Hadoop

下图显示了 Spark 如何与 Hadoop 组件结合使用的三种方式。

Spark Built on Hadoop

Spark 部署方式如下所述,共有三种。

  • 独立模式 - Spark 独立模式部署意味着 Spark 位于 HDFS(Hadoop 分布式文件系统)之上,并为 HDFS 显式分配空间。在这里,Spark 和 MapReduce 将并行运行以覆盖集群上的所有 Spark 作业。

  • Hadoop Yarn - Hadoop Yarn 部署意味着 Spark 运行在 Yarn 上,无需任何预安装或 root 访问权限。它有助于将 Spark 集成到 Hadoop 生态系统或 Hadoop 堆栈中。它允许其他组件在堆栈顶部运行。

  • Spark in MapReduce (SIMR) - Spark in MapReduce 用于除了独立部署之外还启动 Spark 作业。使用 SIMR,用户可以启动 Spark 并使用其 shell,而无需任何管理权限。

Spark 的组件

下图描述了 Spark 的不同组件。

Components of Spark

Apache Spark Core

Spark Core 是 Spark 平台的基础通用执行引擎,所有其他功能都基于它构建。它提供内存计算和引用外部存储系统中的数据集。

Spark SQL

Spark SQL 是 Spark Core 之上的一个组件,它引入了一种名为 SchemaRDD 的新的数据抽象,该抽象为结构化和半结构化数据提供支持。

Spark Streaming

Spark Streaming 利用 Spark Core 的快速调度功能来执行流分析。它以小批次的形式摄取数据,并在这些小批次数据上执行 RDD(弹性分布式数据集)转换。

MLlib(机器学习库)

MLlib 是 Spark 之上的一个分布式机器学习框架,因为它基于分布式内存的 Spark 架构。根据 MLlib 开发人员对交替最小二乘法 (ALS) 实现进行的基准测试,Spark MLlib 的速度是 Hadoop 基于磁盘版本的Apache Mahout(在 Mahout 获得 Spark 接口之前)的九倍。

GraphX

GraphX 是 Spark 之上的一个分布式图处理框架。它提供了一个用于表达图计算的 API,可以使用 Pregel 抽象 API 对用户定义的图进行建模。它还为此抽象提供了优化的运行时。

Spark – RDD

弹性分布式数据集

弹性分布式数据集 (RDD) 是 Spark 的基本数据结构。它是一个不可变的分布式对象集合。RDD 中的每个数据集都分为逻辑分区,这些分区可以在集群的不同节点上计算。RDD 可以包含任何类型的 Python、Java 或 Scala 对象,包括用户定义的类。

正式地说,RDD 是一个只读的分区记录集合。RDD 可以通过对稳定存储中的数据或其他 RDD 执行确定性操作来创建。RDD 是一个容错的元素集合,可以并行操作。

创建 RDD 有两种方法:并行化驱动程序程序中已有的集合,或引用外部存储系统(例如共享文件系统、HDFS、HBase 或任何提供 Hadoop 输入格式的数据源)中的数据集。

Spark 利用 RDD 的概念来实现更快更高效的 MapReduce 操作。让我们首先讨论 MapReduce 操作是如何进行的,以及为什么它们效率不高。

MapReduce 中的数据共享速度慢

MapReduce 被广泛采用,用于在集群上使用并行分布式算法处理和生成大型数据集。它允许用户使用一组高级运算符编写并行计算,而无需担心工作分配和容错。

不幸的是,在大多数现有框架中,在计算之间(例如,在两个 MapReduce 作业之间)重用数据的唯一方法是将其写入外部稳定存储系统(例如 HDFS)。虽然此框架提供了许多用于访问集群计算资源的抽象,但用户仍然需要更多。

迭代交互式应用程序都需要在并行作业之间更快地共享数据。由于复制序列化磁盘 I/O,MapReduce 中的数据共享速度很慢。关于存储系统,大多数 Hadoop 应用程序花费 90% 以上的时间进行 HDFS 读写操作。

MapReduce 上的迭代操作

在多阶段应用程序中跨多个计算重用中间结果。下图说明了当前框架在 MapReduce 上执行迭代操作时的工作方式。由于数据复制、磁盘 I/O 和序列化,这会产生大量的开销,从而使系统变慢。

Iterative Operations on MapReduce

MapReduce 上的交互式操作

用户对同一数据集的子集运行 ad-hoc 查询。每个查询都将在稳定存储上执行磁盘 I/O,这可能会主导应用程序的执行时间。

下图说明了当前框架在 MapReduce 上执行交互式查询时的工作方式。

Interactive Operations on MapReduce

使用 Spark RDD 进行数据共享

由于复制序列化磁盘 I/O,MapReduce 中的数据共享速度很慢。大多数 Hadoop 应用程序花费 90% 以上的时间进行 HDFS 读写操作。

研究人员认识到这个问题,开发了一个名为 Apache Spark 的专用框架。Spark 的核心思想是弹性分布式数据集 (RDD);它支持内存处理计算。这意味着它将内存状态作为跨作业的对象存储,并且该对象可以在这些作业之间共享。内存中的数据共享速度比网络和磁盘快 10 到 100 倍。

现在让我们尝试找出 Spark RDD 中迭代和交互式操作是如何进行的。

Spark RDD 上的迭代操作

下图显示了 Spark RDD 上的迭代操作。它将中间结果存储在分布式内存中而不是稳定存储(磁盘)中,从而使系统更快。

注意 - 如果分布式内存 (RAM) 足够存储中间结果(作业状态),则它会将这些结果存储在磁盘上。

Iterative Operations on Spark RDD

Spark RDD 上的交互式操作

此图显示了 Spark RDD 上的交互式操作。如果对同一数据集重复运行不同的查询,则可以将此特定数据保存在内存中以获得更好的执行时间。

Interactive Operations on Spark RDD

默认情况下,每次对转换后的 RDD 运行操作时,都可能会重新计算它。但是,您也可以将 RDD 保存在内存中,在这种情况下,Spark 将在下一次查询它时将元素保留在集群中以实现更快的访问。还支持将 RDD 保存在磁盘上或跨多个节点进行复制。

Spark - 安装

Spark 是 Hadoop 的子项目。因此,最好将 Spark 安装到基于 Linux 的系统中。以下步骤显示了如何安装 Apache Spark。

步骤 1:验证 Java 安装

Java 安装是安装 Spark 的一项强制性操作。尝试使用以下命令验证 JAVA 版本。

$java -version

如果 Java 已安装在您的系统上,您将看到以下响应:

java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)

如果您系统上未安装 Java,请在继续下一步之前安装 Java。

步骤 2:验证 Scala 安装

您应该使用 Scala 语言来实现 Spark。因此,让我们使用以下命令验证 Scala 的安装。

$scala -version

如果您的系统上已经安装了 Scala,您将看到以下响应:

Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL

如果您系统上没有安装 Scala,则继续执行下一步安装 Scala。

步骤 3:下载 Scala

访问以下链接下载最新版本的 Scala 下载 Scala。在本教程中,我们使用的是 scala-2.11.6 版本。下载后,您将在下载文件夹中找到 Scala 的 tar 文件。

步骤 4:安装 Scala

按照以下步骤安装 Scala。

解压 Scala tar 文件

输入以下命令解压 Scala tar 文件:

$ tar xvf scala-2.11.6.tgz

移动 Scala 软件文件

使用以下命令将 Scala 软件文件移动到相应的目录 **(/usr/local/scala)**。

$ su –
Password:
# cd /home/Hadoop/Downloads/
# mv scala-2.11.6 /usr/local/scala
# exit

设置 Scala 的 PATH

使用以下命令设置 Scala 的 PATH。

$ export PATH = $PATH:/usr/local/scala/bin

验证 Scala 安装

安装后,最好验证一下。使用以下命令验证 Scala 安装。

$scala -version

如果您的系统上已经安装了 Scala,您将看到以下响应:

Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL

步骤 5:下载 Apache Spark

访问以下链接下载最新版本的 Spark 下载 Spark。在本教程中,我们使用的是 **spark-1.3.1-bin-hadoop2.6** 版本。下载后,您将在下载文件夹中找到 Spark 的 tar 文件。

步骤 6:安装 Spark

按照以下步骤安装 Spark。

解压 Spark tar 文件

以下命令用于解压 Spark tar 文件:

$ tar xvf spark-1.3.1-bin-hadoop2.6.tgz

移动 Spark 软件文件

以下命令用于将 Spark 软件文件移动到相应的目录 **(/usr/local/spark)**。

$ su –
Password:
# cd /home/Hadoop/Downloads/
# mv spark-1.3.1-bin-hadoop2.6 /usr/local/spark
# exit

设置 Spark 的环境

将以下行添加到 ~ **/.bashrc** 文件中。这意味着将 Spark 软件文件所在的路径添加到 PATH 变量中。

export PATH = $PATH:/usr/local/spark/bin

使用以下命令更新 ~/.bashrc 文件。

$ source ~/.bashrc

步骤 7:验证 Spark 安装

输入以下命令打开 Spark shell。

$spark-shell

如果 Spark 安装成功,您将看到以下输出。

Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
    ____             __
   / __/__ ___ _____/ /__
   _\ \/ _ \/ _ `/ __/ '_/
   /___/ .__/\_,_/_/ /_/\_\ version 1.4.0
      /_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>

Spark SQL - 简介

Spark 引入了一个用于结构化数据处理的编程模块,称为 Spark SQL。它提供了一个称为 DataFrame 的编程抽象,并且可以充当分布式 SQL 查询引擎。

Spark SQL 的特性

以下是 Spark SQL 的特性:

  • **集成性** - 将 SQL 查询与 Spark 程序无缝混合。Spark SQL 允许您将结构化数据作为 Spark 中的分布式数据集 (RDD) 进行查询,并提供 Python、Scala 和 Java 的集成 API。这种紧密集成使得轻松运行 SQL 查询以及复杂的分析算法成为可能。

  • **统一的数据访问** - 从各种来源加载和查询数据。Schema-RDD 提供了一个单一接口,用于高效地处理结构化数据,包括 Apache Hive 表、parquet 文件和 JSON 文件。

  • **Hive 兼容性** - 在现有仓库上运行未修改的 Hive 查询。Spark SQL 重用 Hive 前端和 MetaStore,使您可以完全兼容现有的 Hive 数据、查询和 UDF。只需将其与 Hive 一起安装即可。

  • **标准连接性** - 通过 JDBC 或 ODBC 连接。Spark SQL 包含一个具有行业标准 JDBC 和 ODBC 连接性的服务器模式。

  • **可扩展性** - 对交互式查询和长时间查询使用相同的引擎。Spark SQL 利用 RDD 模型来支持查询中间的容错能力,使其能够扩展到大型作业。无需担心对历史数据使用不同的引擎。

Spark SQL 架构

下图解释了 Spark SQL 的架构:

Spark SQL Architecture

此架构包含三个层,即语言 API、Schema RDD 和数据源。

  • **语言 API** - Spark 与不同的语言和 Spark SQL 兼容。它也受这些语言支持 - API(python、scala、java、HiveQL)。

  • **Schema RDD** - Spark Core 使用称为 RDD 的特殊数据结构进行设计。通常,Spark SQL 处理模式、表和记录。因此,我们可以使用 Schema RDD 作为临时表。我们可以将此 Schema RDD 称为 DataFrame。

  • **数据源** - 通常,spark-core 的数据源是文本文件、Avro 文件等。但是,Spark SQL 的数据源不同。它们是 Parquet 文件、JSON 文档、HIVE 表和 Cassandra 数据库。

我们将在后续章节中详细讨论这些内容。

Spark SQL - DataFrame

DataFrame 是一个分布式数据集合,它被组织成命名列。从概念上讲,它等效于具有良好优化技术的关联表。

DataFrame 可以从各种不同的来源构建,例如 Hive 表、结构化数据文件、外部数据库或现有的 RDD。该 API 是为现代大数据和数据科学应用程序设计的,其灵感来自 **R 编程中的 DataFrame** 和 **Python 中的 Pandas**。

DataFrame 的特性

这是一组 DataFrame 的几个特征:

  • 能够在单节点集群到大型集群上处理从千字节到拍字节大小的数据。

  • 支持不同的数据格式(Avro、csv、elastic search 和 Cassandra)和存储系统(HDFS、HIVE 表、mysql 等)。

  • 通过 Spark SQL Catalyst 优化器(树转换框架)进行最先进的优化和代码生成。

  • 可以通过 Spark-Core 轻松集成所有大数据工具和框架。

  • 提供 Python、Java、Scala 和 R 编程的 API。

SQLContext

SQLContext 是一个类,用于初始化 Spark SQL 的功能。初始化 SQLContext 类对象需要 SparkContext 类对象 (sc)。

以下命令用于通过 spark-shell 初始化 SparkContext。

$ spark-shell

默认情况下,当 spark-shell 启动时,SparkContext 对象以名称 **sc** 初始化。

使用以下命令创建 SQLContext。

scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

示例

让我们考虑一个名为 **employee.json** 的 JSON 文件中的员工记录示例。使用以下命令创建一个 DataFrame (df) 并读取名为 **employee.json** 的 JSON 文档,其内容如下。

**employee.json** - 将此文件放在当前 **scala>** 指针所在的目录中。

{
   {"id" : "1201", "name" : "satish", "age" : "25"}
   {"id" : "1202", "name" : "krishna", "age" : "28"}
   {"id" : "1203", "name" : "amith", "age" : "39"}
   {"id" : "1204", "name" : "javed", "age" : "23"}
   {"id" : "1205", "name" : "prudvi", "age" : "23"}
}

DataFrame 操作

DataFrame 提供了一种用于结构化数据操作的特定领域语言。在这里,我们包含一些使用 DataFrame 进行结构化数据处理的基本示例。

按照以下步骤执行 DataFrame 操作:

读取 JSON 文档

首先,我们必须读取 JSON 文档。基于此,生成一个名为 (dfs) 的 DataFrame。

使用以下命令读取名为 **employee.json** 的 JSON 文档。数据显示为一个表,字段为:id、name 和 age。

scala> val dfs = sqlContext.read.json("employee.json")

**输出** - 字段名称是从 **employee.json** 自动获取的。

dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]

显示数据

如果要查看 DataFrame 中的数据,请使用以下命令。

scala> dfs.show()

**输出** - 您可以以表格格式查看员工数据。

<console>:22, took 0.052610 s
+----+------+--------+
|age | id   |  name  |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
| 23 | 1204 | javed  |
| 23 | 1205 | prudvi |
+----+------+--------+

使用 printSchema 方法

如果要查看 DataFrame 的结构(模式),请使用以下命令。

scala> dfs.printSchema()

输出

root
   |-- age: string (nullable = true)
   |-- id: string (nullable = true)
   |-- name: string (nullable = true)

使用 Select 方法

使用以下命令从 DataFrame 中获取三个列中的 **name** 列。

scala> dfs.select("name").show()

**输出** - 您可以看到 **name** 列的值。

<console>:22, took 0.044023 s
+--------+
|  name  |
+--------+
| satish |
| krishna|
| amith  |
| javed  |
| prudvi |
+--------+

使用 Age 过滤器

使用以下命令查找年龄大于 23 (age > 23) 的员工。

scala> dfs.filter(dfs("age") > 23).show()

输出

<console>:22, took 0.078670 s
+----+------+--------+
|age | id   | name   |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
+----+------+--------+

使用 groupBy 方法

使用以下命令计算相同年龄的员工人数。

scala> dfs.groupBy("age").count().show()

**输出** - 两名员工的年龄为 23。

<console>:22, took 5.196091 s
+----+-----+
|age |count|
+----+-----+
| 23 |  2  |
| 25 |  1  |
| 28 |  1  |
| 39 |  1  |
+----+-----+

以编程方式运行 SQL 查询

SQLContext 使应用程序能够在运行 SQL 函数的同时以编程方式运行 SQL 查询,并将结果作为 DataFrame 返回。

通常,在后台,SparkSQL 支持两种不同的方法将现有的 RDD 转换为 DataFrame:

序号 方法和描述
1 使用反射推断模式

此方法使用反射来生成包含特定类型对象的 RDD 的模式。

2 以编程方式指定模式

创建 DataFrame 的第二种方法是通过编程接口,该接口允许您构建模式,然后将其应用于现有 RDD。

Spark SQL - 数据源

DataFrame 接口允许不同的数据源在 Spark SQL 上工作。它是一个临时表,可以像普通 RDD 一样操作。将 DataFrame 注册为表允许您在其数据上运行 SQL 查询。

在本章中,我们将描述使用不同的 Spark 数据源加载和保存数据的常用方法。此后,我们将详细讨论内置数据源可用的特定选项。

SparkSQL 中有不同类型的数据源可用,其中一些列在下面:

序号 数据源
1 JSON 数据集

Spark SQL 可以自动捕获 JSON 数据集的模式并将其加载为 DataFrame。

2 Hive 表

Hive 作为 HiveContext 与 Spark 库捆绑在一起,它继承自 SQLContext。

3 Parquet 文件

Parquet 是一种列式格式,许多数据处理系统都支持它。

广告
© . All rights reserved.