Spark SQL - DataFrame



DataFrame 是一个分布式的数据集合,组织成命名列。从概念上讲,它等同于具有良好优化技术的关联表。

DataFrame 可以从各种不同的来源构建,例如 Hive 表、结构化数据文件、外部数据库或现有的 RDD。这个 API 是为现代大数据和数据科学应用程序设计的,其灵感来自R 编程中的 DataFramePython 中的 Pandas

DataFrame 的特性

以下是 DataFrame 的一些特征:

  • 能够在单节点集群到大型集群上处理从千字节到拍字节大小的数据。

  • 支持不同的数据格式(Avro、csv、Elasticsearch 和 Cassandra)和存储系统(HDFS、Hive 表、MySQL 等)。

  • 通过 Spark SQL Catalyst 优化器(树转换框架)进行最先进的优化和代码生成。

  • 可以通过 Spark-Core 轻松集成所有大数据工具和框架。

  • 提供 Python、Java、Scala 和 R 编程的 API。

SQLContext

SQLContext 是一个类,用于初始化 Spark SQL 的功能。初始化 SQLContext 类对象需要 SparkContext 类对象 (sc)。

以下命令用于通过 spark-shell 初始化 SparkContext。

$ spark-shell

默认情况下,当 spark-shell 启动时,SparkContext 对象以名称sc初始化。

使用以下命令创建 SQLContext。

scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

示例

让我们考虑一个名为employee.json 的 JSON 文件中的员工记录示例。使用以下命令创建一个 DataFrame (df) 并读取名为employee.json 的 JSON 文档,其内容如下所示。

employee.json - 将此文件放在当前scala> 指针所在的目录中。

{
   {"id" : "1201", "name" : "satish", "age" : "25"}
   {"id" : "1202", "name" : "krishna", "age" : "28"}
   {"id" : "1203", "name" : "amith", "age" : "39"}
   {"id" : "1204", "name" : "javed", "age" : "23"}
   {"id" : "1205", "name" : "prudvi", "age" : "23"}
}

DataFrame 操作

DataFrame 提供了一种用于结构化数据操作的特定领域语言。在这里,我们包含一些使用 DataFrame 进行结构化数据处理的基本示例。

按照以下步骤执行 DataFrame 操作:

读取 JSON 文档

首先,我们必须读取 JSON 文档。基于此,生成一个名为 (dfs) 的 DataFrame。

使用以下命令读取名为employee.json 的 JSON 文档。数据显示为一个表,字段为:id、name 和 age。

scala> val dfs = sqlContext.read.json("employee.json")

输出 - 字段名称自动从employee.json 获取。

dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]

显示数据

如果要查看 DataFrame 中的数据,请使用以下命令。

scala> dfs.show()

输出 - 你可以在表格格式中看到员工数据。

<console>:22, took 0.052610 s
+----+------+--------+
|age | id   |  name  |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
| 23 | 1204 | javed  |
| 23 | 1205 | prudvi |
+----+------+--------+

使用 printSchema 方法

如果要查看 DataFrame 的结构(模式),请使用以下命令。

scala> dfs.printSchema()

输出

root
   |-- age: string (nullable = true)
   |-- id: string (nullable = true)
   |-- name: string (nullable = true)

使用 Select 方法

使用以下命令从 DataFrame 中获取三个列中的name列。

scala> dfs.select("name").show()

输出 - 你可以看到name列的值。

<console>:22, took 0.044023 s
+--------+
|  name  |
+--------+
| satish |
| krishna|
| amith  |
| javed  |
| prudvi |
+--------+

使用 Age 过滤器

使用以下命令查找年龄大于 23 (age > 23) 的员工。

scala> dfs.filter(dfs("age") > 23).show()

输出

<console>:22, took 0.078670 s
+----+------+--------+
|age | id   | name   |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
+----+------+--------+

使用 groupBy 方法

使用以下命令计算相同年龄的员工人数。

scala> dfs.groupBy("age").count().show()

输出 - 两名员工的年龄为 23。

<console>:22, took 5.196091 s
+----+-----+
|age |count|
+----+-----+
| 23 |  2  |
| 25 |  1  |
| 28 |  1  |
| 39 |  1  |
+----+-----+

以编程方式运行 SQL 查询

SQLContext 使应用程序能够在运行 SQL 函数的同时以编程方式运行 SQL 查询,并将结果作为 DataFrame 返回。

通常,在后台,SparkSQL 支持两种不同的方法将现有的 RDD 转换为 DataFrame:

序号 方法和描述
1 使用反射推断模式

此方法使用反射来生成包含特定类型对象的 RDD 的模式。

2 以编程方式指定模式

创建 DataFrame 的第二种方法是通过编程接口,允许你构建模式,然后将其应用于现有的 RDD。

广告
© . All rights reserved.