如何验证 PySpark 数据框列类型?
PySpark 是 Apache Spark 的 Python API,提供了一个强大且可扩展的大数据处理和分析框架。在使用 PySpark DataFrame 时,了解并验证每个列的数据类型至关重要。准确的列类型验证可确保数据完整性,并使您能够准确地执行操作和转换。在本文中,我们将探讨验证 PySpark DataFrame 列类型的各种方法,并提供示例以帮助更好地理解。
PySpark DataFrame 列类型的概述
在 PySpark 中,DataFrame 表示一个组织成命名列的分布式数据集合。每列都有一个特定的数据类型,可以是任何有效的 PySpark 数据类型,例如 IntegerType、StringType、BooleanType 等。了解列类型至关重要,因为它允许您根据预期的数据类型执行操作。
使用 printSchema() 方法
printSchema() 方法提供了一个简洁且结构化的 DataFrame 模式表示,包括列名及其对应的数据类型。它是验证列类型最简单的方法之一。
语法
df.printSchema()
此处,df.printSchema() 语法用于显示 PySpark DataFrame 的模式。它打印列名及其相应的数据类型,以及它们是否允许空值。
示例
在下面的示例中,我们创建一个 SparkSession 并为 PySpark DataFrame 定义一个模式。然后使用样本数据创建 DataFrame,其中包含名为“col1”、“col2”和“col3”的列,分别具有 IntegerType、StringType 和 DoubleType 的对应数据类型。最后,使用 printSchema() 方法打印 DataFrame 的模式,该方法显示列名及其数据类型。
from pyspark.sql import SparkSession from pyspark.sql.types import IntegerType, StringType, DoubleType # Create a SparkSession spark = SparkSession.builder.getOrCreate() # Sample data data = [ (1, "John", 3.14), (2, "Jane", 2.71), (3, "Alice", 1.23) ] # Define the schema schema = [ ("col1", IntegerType(), True), ("col2", StringType(), True), ("col3", DoubleType(), True) ] # Create a DataFrame with the provided data and schema df = spark.createDataFrame(data, schema) # Print the DataFrame schema df.printSchema()
输出
root |-- col1: integer (nullable = true) |-- col2: string (nullable = true) |-- col3: double (nullable = true)
使用 dtypes 检查列类型
dtypes 属性返回一个元组列表,其中每个元组包含列名及其对应的数据类型。此方法允许以编程方式访问列类型。
语法
column_types = df.dtypes for column_name, data_type in column_types: print(f"Column '{column_name}' has data type: {data_type}")
此处,df.dtypes 从 PySpark DataFrame 中检索列名及其对应的数据类型作为元组列表。for 循环遍历每个元组,提取列名和数据类型,然后使用 f-字符串格式打印它们。
示例
在下面的示例中,我们使用 SparkSession 创建一个 PySpark DataFrame。它将样本数据定义为元组列表,并创建一个名为 df 的 DataFrame,其中包含“col1”、“col2”和“col3”列。df.dtypes 属性检索列名及其对应的数据类型作为元组列表。for 循环遍历每个元组,提取列名和数据类型,然后使用 f-字符串格式打印它们。
from pyspark.sql import SparkSession # Create a SparkSession spark = SparkSession.builder.getOrCreate() # Sample data data = [ (1, "John", 3.14), (2, "Jane", 2.71), (3, "Alice", 1.23) ] # Create a DataFrame df = spark.createDataFrame(data, ["col1", "col2", "col3"]) # Get the column types column_types = df.dtypes # Display the column types for column_name, data_type in column_types: print(f"Column '{column_name}' has data type: {data_type}")
输出
输出显示列名(col1、col2、col3)及其对应的数据类型(int、string、double)。此信息是使用 DataFrame 的 dtypes 属性获得的,该属性返回一个元组列表,其中每个元组包含列名及其数据类型。
Column 'col1' has data type: int Column 'col2' has data type: string Column 'col3' has data type: double
使用 selectExpr() 验证列类型
selectExpr() 方法允许我们选择列并在其上应用表达式或转换。将其与 typeof() 函数结合使用,您可以直接检查特定列的数据类型。
语法
from pyspark.sql.functions import expr column_names = ["col1", "col2", "col3"] exprs = [expr(f"typeof({col}) as {col}_type") for col in column_names] df.selectExpr(*exprs).show()
此处,**typeof()** 函数检索每列的数据类型,并将其与包含“_type”的新列名关联。然后,**df.selectExpr(*exprs).show()** 将这些表达式应用于 DataFrame,选择动态创建的列并显示其结果。
示例
在下面的示例中,我们创建一个 SparkSession 并定义一个名为 df 的 PySpark DataFrame,其中包含三列:“col1”、“col2”和“col3”。为了验证列类型,代码在 DataFrame 上使用 **selectExpr()** 方法。它使用列表推导式创建一个表达式列表,其中每个表达式都使用 **typeof()** 函数确定列的数据类型,并将其与包含“_type”的新列名关联。最后,df.selectExpr(*exprs).show() 将这些表达式应用于 DataFrame,选择动态创建的列及其列名和相应的数据类型。show() 方法显示结果 DataFrame。
from pyspark.sql import SparkSession # Create a SparkSession spark = SparkSession.builder.getOrCreate() # Sample data data = [ (1, "John", 3.14), (2, "Jane", 2.71), (3, "Alice", 1.23) ] # Create a DataFrame df = spark.createDataFrame(data, ["col1", "col2", "col3"]) # Verify column types using selectExpr() column_names = ["col1", "col2", "col3"] exprs = [f"typeof({col}) as {col}_type" for col in column_names] df.selectExpr(*exprs).show()
输出
+---------+---------+---------+ |col1_type|col2_type|col3_type| +---------+---------+---------+ | integer| string| double| | integer| string| double| | integer| string| double| +---------+---------+---------+
使用 cast() 检查列类型
cast() 函数允许我们显式地将列转换为不同的数据类型。通过比较原始列和转换后的列,您可以验证转换是否成功,这表明原始列具有预期的数据类型。
示例
在下面的示例中,我们创建一个 SparkSession 并定义一个名为 df 的 PySpark DataFrame,其中包含三列:“col1”、“col2”和“col3”,以及样本数据。代码定义了一个字典 expected_data_types,该字典指定每列的预期数据类型。for 循环遍历 expected_data_types 字典中的每个项目。在循环内,代码使用 cast() 函数尝试将列转换为预期的数据类型。它使用转换后的值创建一个新列,并将其与原始列进行比较,以识别转换成功的行。
from pyspark.sql import SparkSession from pyspark.sql.functions import col # Create a SparkSession spark = SparkSession.builder.getOrCreate() # Sample data data = [ (1, "John", 3.14), (2, "Jane", 2.71), (3, "Alice", 1.23) ] # Create a DataFrame df = spark.createDataFrame(data, ["col1", "col2", "col3"]) # Define the expected data types expected_data_types = { "col1": "integer", "col2": "string", "col3": "double" } # Check column types using cast() for column_name, expected_type in expected_data_types.items(): cast_column = df.select(col(column_name).cast(expected_type).alias(column_name)) matched_rows = df.filter(col(column_name) == cast_column[column_name]) print(f"Column '{column_name}' has the expected data type: {expected_type}?") matched_rows.show()
输出
输出通过尝试使用 cast() 函数将其转换为预期类型来验证每列的数据类型。原始 DataFrame 根据转换后匹配的行进行过滤,如果所有行都匹配,则表明该列具有预期的数据类型。
Column 'col1' has the expected data type: integer? +----+-----+----+ |col1|col2 |col3| +----+-----+----+ | 1| John|3.14| | 2| Jane|2.71| | 3|Alice|1.23| +----+-----+----+ Column 'col2' has the expected data type: string? +----+-----+----+ |col1|col2 |col3| +----+-----+----+ | 1| John|3.14| | 2| Jane|2.71| | 3|Alice|1.23| +----+-----+----+ Column 'col3' has the expected data type: double? +----+-----+----+ |col1|col2 |col3| +----+-----+----+ | 1| John|3.14| | 2| Jane|2.71| | 3|Alice|1.23| +----+-----+----+
结论
在本文中,我们讨论了如何验证 Pyspark 数据框列类型。验证 PySpark DataFrame 列类型对于确保数据准确性和执行有意义的操作至关重要。在本文中,我们探讨了几种验证列类型的方法,包括使用 printSchema()、dtypes、selectExpr()、cast()。