如何验证 PySpark 数据框列类型?


PySpark 是 Apache Spark 的 Python API,提供了一个强大且可扩展的大数据处理和分析框架。在使用 PySpark DataFrame 时,了解并验证每个列的数据类型至关重要。准确的列类型验证可确保数据完整性,并使您能够准确地执行操作和转换。在本文中,我们将探讨验证 PySpark DataFrame 列类型的各种方法,并提供示例以帮助更好地理解。

PySpark DataFrame 列类型的概述

在 PySpark 中,DataFrame 表示一个组织成命名列的分布式数据集合。每列都有一个特定的数据类型,可以是任何有效的 PySpark 数据类型,例如 IntegerType、StringType、BooleanType 等。了解列类型至关重要,因为它允许您根据预期的数据类型执行操作。

使用 printSchema() 方法

printSchema() 方法提供了一个简洁且结构化的 DataFrame 模式表示,包括列名及其对应的数据类型。它是验证列类型最简单的方法之一。

语法

df.printSchema()

此处,df.printSchema() 语法用于显示 PySpark DataFrame 的模式。它打印列名及其相应的数据类型,以及它们是否允许空值。

示例

在下面的示例中,我们创建一个 SparkSession 并为 PySpark DataFrame 定义一个模式。然后使用样本数据创建 DataFrame,其中包含名为“col1”、“col2”和“col3”的列,分别具有 IntegerType、StringType 和 DoubleType 的对应数据类型。最后,使用 printSchema() 方法打印 DataFrame 的模式,该方法显示列名及其数据类型。

from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, DoubleType

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Sample data
data = [
    (1, "John", 3.14),
    (2, "Jane", 2.71),
    (3, "Alice", 1.23)
]

# Define the schema
schema = [
    ("col1", IntegerType(), True),
    ("col2", StringType(), True),
    ("col3", DoubleType(), True)
]

# Create a DataFrame with the provided data and schema
df = spark.createDataFrame(data, schema)

# Print the DataFrame schema
df.printSchema()

输出

root
 |-- col1: integer (nullable = true)
 |-- col2: string (nullable = true)
 |-- col3: double (nullable = true)

使用 dtypes 检查列类型

dtypes 属性返回一个元组列表,其中每个元组包含列名及其对应的数据类型。此方法允许以编程方式访问列类型。

语法

column_types = df.dtypes
for column_name, data_type in column_types:
    print(f"Column '{column_name}' has data type: {data_type}")

此处,df.dtypes 从 PySpark DataFrame 中检索列名及其对应的数据类型作为元组列表。for 循环遍历每个元组,提取列名和数据类型,然后使用 f-字符串格式打印它们。

示例

在下面的示例中,我们使用 SparkSession 创建一个 PySpark DataFrame。它将样本数据定义为元组列表,并创建一个名为 df 的 DataFrame,其中包含“col1”、“col2”和“col3”列。df.dtypes 属性检索列名及其对应的数据类型作为元组列表。for 循环遍历每个元组,提取列名和数据类型,然后使用 f-字符串格式打印它们。

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Sample data
data = [
    (1, "John", 3.14),
    (2, "Jane", 2.71),
    (3, "Alice", 1.23)
]

# Create a DataFrame
df = spark.createDataFrame(data, ["col1", "col2", "col3"])

# Get the column types
column_types = df.dtypes

# Display the column types
for column_name, data_type in column_types:
    print(f"Column '{column_name}' has data type: {data_type}")

输出

输出显示列名(col1、col2、col3)及其对应的数据类型(int、string、double)。此信息是使用 DataFrame 的 dtypes 属性获得的,该属性返回一个元组列表,其中每个元组包含列名及其数据类型。

Column 'col1' has data type: int
Column 'col2' has data type: string
Column 'col3' has data type: double

使用 selectExpr() 验证列类型

selectExpr() 方法允许我们选择列并在其上应用表达式或转换。将其与 typeof() 函数结合使用,您可以直接检查特定列的数据类型。

语法

from pyspark.sql.functions import expr

column_names = ["col1", "col2", "col3"]
exprs = [expr(f"typeof({col}) as {col}_type") for col in column_names]
df.selectExpr(*exprs).show()

此处,**typeof()** 函数检索每列的数据类型,并将其与包含“_type”的新列名关联。然后,**df.selectExpr(*exprs).show()** 将这些表达式应用于 DataFrame,选择动态创建的列并显示其结果。

示例

在下面的示例中,我们创建一个 SparkSession 并定义一个名为 df 的 PySpark DataFrame,其中包含三列:“col1”、“col2”和“col3”。为了验证列类型,代码在 DataFrame 上使用 **selectExpr()** 方法。它使用列表推导式创建一个表达式列表,其中每个表达式都使用 **typeof()** 函数确定列的数据类型,并将其与包含“_type”的新列名关联。最后,df.selectExpr(*exprs).show() 将这些表达式应用于 DataFrame,选择动态创建的列及其列名和相应的数据类型。show() 方法显示结果 DataFrame。

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Sample data
data = [
    (1, "John", 3.14),
    (2, "Jane", 2.71),
    (3, "Alice", 1.23)
]

# Create a DataFrame
df = spark.createDataFrame(data, ["col1", "col2", "col3"])

# Verify column types using selectExpr()
column_names = ["col1", "col2", "col3"]
exprs = [f"typeof({col}) as {col}_type" for col in column_names]
df.selectExpr(*exprs).show()

输出

+---------+---------+---------+
|col1_type|col2_type|col3_type|
+---------+---------+---------+
|  integer|   string|   double|
|  integer|   string|   double|
|  integer|   string|   double|
+---------+---------+---------+

使用 cast() 检查列类型

cast() 函数允许我们显式地将列转换为不同的数据类型。通过比较原始列和转换后的列,您可以验证转换是否成功,这表明原始列具有预期的数据类型。

示例

在下面的示例中,我们创建一个 SparkSession 并定义一个名为 df 的 PySpark DataFrame,其中包含三列:“col1”、“col2”和“col3”,以及样本数据。代码定义了一个字典 expected_data_types,该字典指定每列的预期数据类型。for 循环遍历 expected_data_types 字典中的每个项目。在循环内,代码使用 cast() 函数尝试将列转换为预期的数据类型。它使用转换后的值创建一个新列,并将其与原始列进行比较,以识别转换成功的行。

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Sample data
data = [
    (1, "John", 3.14),
    (2, "Jane", 2.71),
    (3, "Alice", 1.23)
]

# Create a DataFrame
df = spark.createDataFrame(data, ["col1", "col2", "col3"])

# Define the expected data types
expected_data_types = {
    "col1": "integer",
    "col2": "string",
    "col3": "double"
}

# Check column types using cast()
for column_name, expected_type in expected_data_types.items():
    cast_column = df.select(col(column_name).cast(expected_type).alias(column_name))
    matched_rows = df.filter(col(column_name) == cast_column[column_name])
    print(f"Column '{column_name}' has the expected data type: {expected_type}?")
    matched_rows.show()

输出

输出通过尝试使用 cast() 函数将其转换为预期类型来验证每列的数据类型。原始 DataFrame 根据转换后匹配的行进行过滤,如果所有行都匹配,则表明该列具有预期的数据类型。

Column 'col1' has the expected data type: integer?
+----+-----+----+
|col1|col2 |col3|
+----+-----+----+
|   1| John|3.14|
|   2| Jane|2.71|
|   3|Alice|1.23|
+----+-----+----+

Column 'col2' has the expected data type: string?
+----+-----+----+
|col1|col2 |col3|
+----+-----+----+
|   1| John|3.14|
|   2| Jane|2.71|
|   3|Alice|1.23|
+----+-----+----+

Column 'col3' has the expected data type: double?
+----+-----+----+
|col1|col2 |col3|
+----+-----+----+
|   1| John|3.14|
|   2| Jane|2.71|
|   3|Alice|1.23|
+----+-----+----+

结论

在本文中,我们讨论了如何验证 Pyspark 数据框列类型。验证 PySpark DataFrame 列类型对于确保数据准确性和执行有意义的操作至关重要。在本文中,我们探讨了几种验证列类型的方法,包括使用 printSchema()、dtypes、selectExpr()、cast()。

更新于: 2023年10月16日

1K+ 阅读量

开启你的 职业生涯

通过完成课程获得认证

立即开始
广告