创建 PySpark DataFrame


在大数据分析中,PySpark 是一个将流行的编程语言 Python 与开源大数据框架 Apache Spark 相结合的栈。PySpark 提供了一个优秀的大数据分析接口,而这个栈中的一个重要组成部分是 Spark 的 DataFrame API。在这里,我们将为想要创建 PySpark DataFrame 的人提供技术指南,其中包括有用的提示和实际示例。

pyspark 的主要优势是什么?哪些行业主要使用它?

Pyspark 是 Apache Spark 的 Python API,Apache Spark 是一个分布式计算框架,它提供快速、可扩展和容错的数据处理。Pyspark 的一些主要优势包括:

  • 可扩展性 - Pyspark 可以处理大型数据集,并且可以轻松地进行扩展或缩减以满足不断变化的数据处理需求。

  • 速度 - Pyspark 旨在进行快速数据处理,可以快速有效地处理大型数据集。

  • 容错性 - Pyspark 旨在具有容错性,这意味着它可以在不丢失数据或影响性能的情况下从硬件或软件故障中恢复。

  • 灵活性 - Pyspark 可用于各种数据处理任务,包括批处理、流处理、机器学习和图处理。

  • 与其他技术的集成 - Pyspark 可以与各种其他技术集成,包括 Hadoop、SQL 和 NoSQL 数据库。

使用 Pyspark 的行业包括:

  • 金融服务 - Pyspark 用于金融服务中的风险分析、欺诈检测和其他数据处理任务。

  • 医疗保健 - Pyspark 用于医疗保健中的医学影像分析、疾病诊断和其他数据处理任务。

  • 零售 - Pyspark 用于零售中的客户细分、销售预测和其他数据处理任务。

  • 电信 - Pyspark 用于电信中的网络分析、呼叫数据分析和其他数据处理任务。

总的来说,Pyspark 为可扩展和快速的数据处理提供了一个强大的平台,可用于各种行业和应用。

第一部分:创建 SparkSession

在 PySpark 中创建 DataFrame 之前,必须首先创建一个 SparkSession 来与 Spark 交互。SparkSession 用于创建 DataFrame、将 DataFrame 注册为表以及执行 SQL 查询。

语法

from pyspark.sql import SparkSession # create a SparkSession spark = SparkSession.builder \ .appName('my_app_name') \ .config('spark.some.config.option', 'some-value') \ .getOrCreate()
  • `appName` 指定 Spark 应用程序的名称。

  • `config` 用于设置配置属性,例如数据存储选项。

  • `getOrCreate` 将创建一个新的 SparkSession,或者如果已存在则获取现有 SparkSession。

Explore our latest online courses and learn new skills at your own pace. Enroll and become a certified expert to boost your career.

第二部分:从 CSV 文件创建 DataFrame

创建 PySpark DataFrame 的最常见方法之一是从 CSV 文件加载数据。为此,您应该

语法

# load data from csv file df = spark.read.csv('path/to/myfile.csv', header=True)

`header=True` 告诉 Spark CSV 文件的第一行包含标题。

第三部分:从 SQL 查询创建 DataFrame

从 SQL 查询的结果创建 DataFrame 也是 PySpark 中的常见做法。为此 -

# create a Spark DataFrame from a SQL query df = spark.sql('SELECT * FROM my_table')

`spark.sql` 从 SQL 查询创建 DataFrame。

第四部分:从 RDD 创建 DataFrame

PySpark 还允许您从 RDD 创建 DataFrame。这是一个示例 -

# create a RDD rdd = spark.sparkContext.parallelize([(1, "John"), (2, "Sarah"), (3, "Lucas")]) # create a DataFrame df = spark.createDataFrame(rdd, ["Id", "Name"])
  • `parallelize` 从 Python 列表创建 RDD。

  • `createDataFrame` 从 RDD 创建 DataFrame。

第五部分:操作 DataFrame

创建 PySpark DataFrame 后,您通常需要对其进行操作。以下是一些常见操作:

选择列

# select two columns df.select('column1', 'column2')

过滤数据

# filter rows with a condition df.filter(df.column1 > 100)

分组数据

# group by column1 and calculate the mean of column2 df.groupby('column1').mean('column2')

连接 DataFrame

# 连接两个 DataFrame df1.join(df2, df1.id == df2.id)

最终程序,代码

# Creating a session from pyspark.sql import SparkSession # create a SparkSession spark = SparkSession.builder \ .appName('my_app_name') \ .config('spark.some.config.option', 'some-value') \ .getOrCreate() # Dataframe from CSV # load data from csv file df = spark.read.csv('path/to/myfile.csv', header=True) # data frame from SQL query # create a Spark DataFrame from a SQL query df = spark.sql('SELECT * FROM my_table') #Dataframe from RDD # create a RDD rdd = spark.sparkContext.parallelize([(1, "John"), (2, "Sarah"), (3, "Lucas")]) # create a DataFrame df = spark.createDataFrame(rdd, ["Id", "Name"])

输出

The output will be in the form a of a dataframe which can be accessed from different sources using different methods.

结论

在 PySpark 中创建 DataFrame 是大数据分析中的一项基本技能。通过使用 SparkSession,您可以使用 CSV 文件、SQL 查询或 RDD 创建 DataFrame。创建 DataFrame 后,您可以通过多种方式对其进行操作,例如选择列、过滤数据、分组数据和连接 DataFrame。使用这些方法,您可以为您的数据分析需求创建定制管道。

更新于:2023年4月25日

1K+ 次浏览

开启您的 职业生涯

通过完成课程获得认证

开始学习
广告