使用 Apache Spark 在 Python 中清洗数据


在当今时代,随着大量数据的高速流动,Apache Spark 作为一个开源的大数据处理框架,成为了一个普遍的选择,因为它允许对数据进行并行和分布式处理。对这些数据的清洗是一个重要的步骤,Apache Spark 为我们提供了各种工具和方法来清洗数据。在本教程中,我们将了解如何在 Python 中使用 Apache Spark 清洗数据,具体步骤如下:

  • 将数据加载到 Spark DataFrame 中 - SparkSession.read 方法允许我们从各种来源读取数据,例如 CSV、JSON、Parquet 等。

  • 处理缺失值 - DataFrame.dropna 或 DataFrame.fillna 方法分别允许我们删除包含任何缺失值的数据或用特定值填充缺失值。

  • 处理重复项 - 数据中经常包含重复条目。为了处理这种情况,DataFrame.dropDuplicates 方法允许我们从 DataFrame 中删除重复项。

  • 处理异常值 - DataFrame.filter 方法允许我们删除所有包含异常值的行。

  • 处理数据类型转换 - 为了转换列的数据类型,我们有一个名为 DataFrame.cast 的方法。

在继续学习如何使用 PySpark 清洗数据之前,我们必须安装 PySpark 库。为此,我们必须在终端中运行以下命令:

pip install pyspark

处理缺失值

在 Apache Spark 中处理缺失值涉及到识别和处理存储在 Spark DataFrame 中的数据集中缺失或不完整的数据。在 Spark 中处理缺失值有几种方法,包括:

  • 删除包含缺失值的记录 - 这涉及到从 DataFrame 中删除包含缺失值的记录。

  • 估算缺失值 - 这涉及到用计算出的值(例如列中数据的平均值、中位数或众数)替换缺失值。

  • 填充缺失值 - 这涉及到用特定值(例如零或默认值)替换缺失值。

  • 插值缺失值 - 这涉及到使用数学方法(例如线性插值或样条插值)来估计缺失值。

处理缺失值的方法取决于数据分析的具体需求和目标。以一致且可重复的方式处理缺失值非常重要,以确保数据的完整性和结果的准确性。

在 Apache Spark 中,pyspark.sql.DataFrame 和 pyspark.sql.DataFrameNaFunctions 模块提供的函数可用于处理缺失值。这些函数包括 dropna、fillna 和 interpolate。

示例

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MissingData").getOrCreate()

# Create a sample data frame
data = [("John", 25, None), ("Jane", 30, 35.5), ("Jim", None, 40.0), ("Joan", 32, None)]
columns = ["Name", "Age", "Salary"]
df = spark.createDataFrame(data, columns)

# Display the original data frame
print("Original Data Frame:")
df.show()

# Replacing Missing Values with Mean
from pyspark.sql.functions import mean
mean_age = df.agg(mean(df["Age"])).first()[0]
mean_salary = df.agg(mean(df["Salary"])).first()[0]
df = df.fillna({"Age": mean_age, "Salary": mean_salary})

# Display the cleaned data frame
print("Cleaned Data Frame:")
df.show()

spark.stop()

输出

Original Data Frame:
+----+----+------+
|Name| Age|Salary|
+----+----+------+
|John|  25|  null|
|Jane|  30|  35.5|
| Jim|null|  40.0|
|Joan|  32|  null|
+----+----+------+

Cleaned Data Frame:
+----+---+------+
|Name|Age|Salary|
+----+---+------+
|John| 25| 37.75|
|Jane| 30|  35.5|
| Jim| 29|  40.0|
|Joan| 32| 37.75|
+----+---+------+

处理重复项

在 Apache Spark 中处理重复项涉及到识别和处理存储在 Spark DataFrame 中的数据集中重复的记录。在 Spark 中处理重复项有几种方法,包括:

  • 删除重复项 - 这涉及到识别并从 DataFrame 中删除重复的记录。dropDuplicates 函数可用于在 Spark 中删除重复记录。

  • 保留重复项 - 这涉及到保留 DataFrame 中重复记录的所有实例,通常是为每个记录添加唯一的标识符或索引。

  • 标记重复项 - 这涉及到标记 DataFrame 中的重复记录,但不删除它们,以便可以进一步分析或处理它们。

处理重复项的方法取决于数据分析的具体需求和目标。以一致且可重复的方式处理重复项非常重要,以确保数据的完整性和结果的准确性。

在 Apache Spark 中,dropDuplicates 函数可用于删除 DataFrame 中的重复记录。该函数以一个或多个列作为输入,并删除指定列中的值完全相同的记录。dropDuplicates 函数返回一个新的 DataFrame,其中已删除重复记录。

示例

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DuplicateData").getOrCreate()

# Create a sample data frame
data = [("John", 25, 90.0), ("Jane", 30, 35.5), ("Jim", 20, 200.0), ("Joan", 32, 50.0),
   ("John", 25, 90.0), ("Jim", 20, 200.0)]
columns = ["Name", "Age", "Salary"]
df = spark.createDataFrame(data, columns)

# Display the original data frame
print("Original Data Frame:")
df.show()

# Remove duplicates
df = df.dropDuplicates()

# Display the cleaned data frame
print("Cleaned Data Frame:")
df.show()

spark.stop()

输出

Original Data Frame:
+----+---+------+
|Name|Age|Salary|
+----+---+------+
|John| 25|  90.0|
|Jane| 30|  35.5|
| Jim| 20| 200.0|
|Joan| 32|  50.0|
|John| 25|  90.0|
| Jim| 20| 200.0|
+----+---+------+

Cleaned Data Frame:
+----+---+------+
|Name|Age|Salary|
+----+---+------+
|Jane| 30|  35.5|
|John| 25|  90.0|
| Jim| 20| 200.0|
|Joan| 32|  50.0|
+----+---+------+

处理异常值

在 Apache Spark 中处理异常值是指识别并删除或转换数据集中被认为是极端值或超出正常值范围的值的过程。异常值会对统计分析的结果产生重大影响,因此通常需要以某种方式处理它们。

在 Apache Spark 中处理异常值有几种常见的方法,包括:

删除包含异常值的记录:这涉及到过滤掉特定列的值超出指定范围或超出平均值一定标准差的记录。

  • 用平均值或中位数替换异常值 - 这涉及到用列中剩余值的平均值或中位数替换被认为是异常值的那些值。

  • Winsorize 异常值 - 这涉及到用指定的百分位数值(例如第 5 或 95 百分位数)替换异常值。

  • 裁剪异常值 - 这涉及到用指定的最大值或最小值替换异常值。

为了在 Apache Spark 中处理异常值,您可以使用 pyspark.sql.functions 模块中提供的内置函数来计算平均值和标准差等统计数据,然后使用 filter 或 withColumn 方法根据需要删除或替换异常值。

示例

from pyspark.sql import SparkSession
from pyspark.sql.functions import mean, stddev, abs

spark = SparkSession.builder.appName("OutlierHandlingExample").getOrCreate()

# Create a sample data frame
data = [("John", 25, 90.0), ("Jane", 30, 35.5), ("Jim", 20, 200.0), ("Joan", 32, 50.0)]
columns = ["Name", "Age", "Salary"]
df = spark.createDataFrame(data, columns)

# Display the original data frame
print("Original Data Frame:")
df.show()

# Calculate mean and standard deviation
mean_salary = df.agg(mean(df["Salary"])).first()[0]
stddev_salary = df.agg(stddev(df["Salary"])).first()[0]

# Identify and filter out outliers
df = df.filter(abs(df["Salary"] - mean_salary) < stddev_salary)

# Display the cleaned data frame
print("Cleaned Data Frame:")
df.show()

spark.stop()

输出

Original Data Frame:
+----+---+------+
|Name|Age|Salary|
+----+---+------+
|John| 25|  90.0|
|Jane| 30|  35.5|
| Jim| 20| 200.0|
|Joan| 32|  50.0|
+----+---+------+

Cleaned Data Frame:
+----+---+------+
|Name|Age|Salary|
+----+---+------+
|John| 25|  90.0|
|Jane| 30|  35.5|
|Joan| 32|  50.0|
+----+---+------+

转换数据类型

转换数据类型是指将数据的表示形式从一种数据类型更改为另一种数据类型的过程。在数据处理和分析中,通常会遇到不同格式的数据,这些数据不适合所需的分析。在这种情况下,需要将数据类型转换为合适的格式才能正确执行分析。

例如,在 DataFrame 中,某列可能具有字符串数据类型,但该列中的值是数字。在这种情况下,需要将该列的数据类型更改为整数或浮点数,具体取决于分析的要求。类似地,某列可能具有整数数据类型,但该列中的值是日期字符串。在这种情况下,需要将该列的数据类型更改为日期类型。

转换数据类型是数据清洗和预处理中的一个重要步骤,因为它确保数据以正确的格式进行分析。

示例

from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, FloatType

spark = SparkSession.builder.appName("DataTypeConversion").getOrCreate()

# Create a sample data frame
data = [("John", "25", "90"), ("Jane", "30", "35"), ("Jim", "20", "200"), ("Joan", "32", "50")]
columns = ["Name", "Age", "Salary"]
df = spark.createDataFrame(data, columns)

# Display the original data frame
print("Original Data Frame:")
df.show()

# Convert the data type of the 'Age' column to integer
df = df.withColumn("Age", df["Age"].cast(IntegerType()))

# Convert the data type of the 'Salary' column to float
df = df.withColumn("Salary", df["Salary"].cast(FloatType()))

# Display the converted data frame
print("Converted Data Frame:")
df.show()

spark.stop()

输出

Original Data Frame:
+----+---+------+
|Name|Age|Salary|
+----+---+------+
|John| 25|   90|
|Jane| 30|   35|
| Jim| 20|   200|
|Joan| 32|   50|
+----+---+------+

Converted Data Frame:
+----+---+------+
|Name|Age|Salary|
+----+---+------+
|John| 25|  90.0|
|Jane| 30|  35.0|
| Jim| 20| 200.0|
|Joan| 32|  50.0|
+----+---+------+

结论

在 Apache Spark 中清洗数据是数据准备过程中的一个重要部分。Apache Spark 为我们提供了一个强大且高效的平台来处理大型数据集,并帮助我们同时执行各种数据清洗任务,例如处理缺失值、重复项等。pyspark.sql.functions 模块为我们提供了大量函数,这些函数与在分布式环境中执行复杂操作的能力相结合,使 Apache Spark 成为数据清洗和准备的完美选择。通过使用正确的工具和技术,我们可以为分析、机器学习或任何其他类型的数据驱动应用程序准备数据,从而提高结果的准确性。

更新于: 2023年10月4日

728 次查看

开启您的 职业生涯

通过完成课程获得认证

开始学习
广告