从 PySpark 数据框中获取特定行
PySpark 是一个强大的数据处理和分析工具。在使用 PySpark DataFrame 处理数据时,有时需要从数据框中获取特定行。它帮助用户以分布式和并行的方式轻松操作和访问数据,使其成为大数据应用的理想选择。在本文中,我们将探讨如何使用 PySpark 中的各种方法从 PySpark 数据框中获取特定行。我们将介绍使用 PySpark 的 DataFrame API 进行函数式编程的方法。
在继续之前,让我们创建一个示例数据框,从中获取行。
from colorama import Fore from pyspark.sql import SparkSession # Building a SparkSession named "column_sum" spark = SparkSession.builder.appName("column_sum").getOrCreate() # Creating the Spark DataFrame df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3']) # Printing the schema of the DataFrame df.printSchema() # Showing the DataFrame df.show()
输出
此 Python 脚本将首先打印我们创建的数据框的模式,然后打印数据框本身。
root |-- __: string (nullable = true) |-- Col1: long (nullable = true) |-- Col2: long (nullable = true) |-- Col3: long (nullable = true) +----+----+----+----+ | __|Col1|Col2|Col3| +----+----+----+----+ |Row1| 1| 2| 3| |Row2| 4| 5| 6| |Row3| 7| 8| 9| +----+----+----+----+
下面提到了可以用来完成任务的方法
方法
使用 collect()
使用 first()
使用 show()
使用 head()
使用 tail()
使用 select() 和 collect()
使用 filter() 和 collect()
使用 where() 和 collect()
使用 take()
现在让我们讨论每种方法以及如何使用它们来添加列。
方法 1:使用 collect()
在 PySpark 中,collect() 方法可用于从 PySpark DataFrame 中检索所有数据并将其作为列表返回。此函数通常用于查看或操作数据框中的数据。以下是使用的语法
dataframe.collect()[index]
这里
dataframe 是我们应用此方法的数据框
Index 是我们要获取的行。
将数据框以列表的形式获取后,我们可以将表示所需行的索引传递给列表。
算法
首先,使用上面的代码创建一个数据框。
使用 collect() 函数从 DataFrame 中检索所需的行,并将每一行存储在单独的变量中。
将包含所需行的变量的值打印到控制台。
示例
# Retrieving the first row of the DataFrame using collect() function Row1 = df.collect()[0] print(Row1) # Retrieving the last row of the DataFrame using collect() function Row2 = df.collect()[-1] print(Row2) # Retrieving the second row of the DataFrame using collect() function Row3 = df.collect()[1] print(Row3)
输出
Row(__='Row1', Col1=1, Col2=2, Col3=3) Row(__='Row3', Col1=7, Col2=8, Col3=9) Row(__='Row2', Col1=4, Col2=5, Col3=6)
方法 2:使用 first()
PySpark 中的 first() 函数返回数据框或 RDD 的第一个元素。我们可以使用此函数从数据框中提取特定行。此函数通常用于查看数据框中的数据。以下是使用的语法
dataframe.first()
这里
dataframe 是我们应用此方法的数据框
算法
导入必要的库
创建 SparkSession
创建 DataFrame
使用 first() 函数检索 DataFrame 的第一行
将第一行打印到控制台
示例
# Import necessary libraries from pyspark.sql import SparkSession # Create a SparkSession spark = SparkSession.builder.appName("column_sum").getOrCreate() # Create the DataFrame df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3']) # Retrieve the first row Row1 = df.first() print(Row1)
输出
Row(Row1, 1, 2, 3)
方法 3:使用 show()
在 PySpark 中,show() 函数用于显示 Python 数据框中前 n 行。此函数的返回值是由前 n 行组成的小型数据框。以下是使用的语法
dataframe.show(n)
这里
dataframe 是我们应用此方法的数据框
n 是行数
算法
导入必要的库
创建 SparkSession
创建 DataFrame
通过将行参数传递为 1,使用 show() 函数检索 DataFrame 的第一行
将第一行打印到控制台
通过将行参数传递为 2,使用 show() 函数检索 DataFrame 的前两行
将前两行打印到控制台
通过将行参数传递为 3,使用 show() 函数检索 DataFrame 的前三行
将前三行打印到控制台
示例
# Import necessary libraries from pyspark.sql import SparkSession # Create a SparkSession spark = SparkSession.builder.appName("column_sum").getOrCreate() # Create the DataFrame df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3']) # Retrieve the first row df1= df.show(1) print(df1) # Retrieve the first two rows df2= df.show(2) print(df2) # Retrieve the first three rows df3= df.show(3) print(df3)
输出
+----+----+----+----+ |__ |Col1|Col2|Col3| +----+----+----+----+ |Row1| 1| 2| 3| +----+----+----+----+ +----+----+----+----+ |__ |Col1|Col2|Col3| +----+----+----+----+ |Row1| 1| 2| 3| |Row2| 4| 5| 6| +----+----+----+----+ +----+----+----+----+ |__ |Col1|Col2|Col3| +----+----+----+----+ |Row1| 1| 2| 3| |Row2| 4| 5| 6| |Row3| 7| 8| 9| +----+----+----+----+
方法 4:使用 head()
在 PySpark 中,head() 函数用于显示 Python 数据框中前 n 行。此函数的返回值是由前 n 行组成的小型数据框。以下是使用的语法
dataframe.head(n)
这里
dataframe 是我们应用此方法的数据框
n 是行数
算法
导入必要的库
创建 SparkSession
创建 DataFrame
通过将行参数传递为 1,使用 head() 函数检索 DataFrame 的第一行
将第一行打印到控制台
通过将行参数传递为 2,使用 head() 函数检索 DataFrame 的前两行
将前两行打印到控制台
通过将行参数传递为 3,使用 head() 函数检索 DataFrame 的前三行
将前三行打印到控制台
示例
# Import necessary libraries from pyspark.sql import SparkSession # Create a SparkSession spark = SparkSession.builder.appName("column_sum").getOrCreate() # Create the DataFrame df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3']) # Retrieve the first row df1= df.head(1) print(df1) # Retrieve the first two rows df2= df.head(2) print(df2) # Retrieve the first three rows df3= df.head(3) print(df3)
输出
[Row(__='Row1', Col1=1, Col2=2, Col3=3)] [Row(__='Row1', Col1=1, Col2=2, Col3=3), Row(__='Row2', Col1=4, Col2=5, Col3=6)] [Row(__='Row1', Col1=1, Col2=2, Col3=3), Row(__='Row2', Col1=4, Col2=5, Col3=6), Row(__='Row3', Col1=7, Col2=8, Col3=9)]
方法 5:使用 tail()
在 PySpark 中,tail() 函数用于显示 Python 数据框中最后 n 行。此函数的返回值是由最后 n 行组成的小型数据框。以下是使用的语法
dataframe.tail(n)
这里
dataframe 是我们应用此方法的数据框
n 是行数
算法
导入必要的库
创建 SparkSession
创建 DataFrame
通过将行参数传递为 1,使用 tail() 函数检索 DataFrame 的最后一行
将最后一行打印到控制台
通过将行参数传递为 2,使用 tail() 函数检索 DataFrame 的最后两行
将最后两行打印到控制台
通过将行参数传递为 3,使用 tail() 函数检索 DataFrame 的最后三行
将最后三行打印到控制台
示例
# Create a SparkSession spark = SparkSession.builder.appName("column_sum").getOrCreate() # Create the DataFrame df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3']) # Retrieve the last row df1= df.tail(1) print(df1) # Retrieve the last two rows df2= df.tail(2) print(df2) # Retrieve the last three rows df3= df.tail(3) print(df3)
输出
[Row(__='Row3', Col1=7, Col2=8, Col3=9)] [Row(__='Row2', Col1=4, Col2=5, Col3=6), Row(__='Row3', Col1=7, Col2=8, Col3=9)] [Row(__='Row1', Col1=1, Col2=2, Col3=3), Row(__='Row2', Col1=4, Col2=5, Col3=6), Row(__='Row3', Col1=7, Col2=8, Col3=9)]
方法 6:使用 select() 和 collect()
我们可以结合使用 select() 函数和 collect() 方法来显示 Pyspark 数据框中的特定行。以下是使用的语法
dataframe.select([columns]).collect()[index]
这里
dataframe 是我们应用此方法的数据框
columns 是我们想要在输出中包含的列的列表。
Index 是我们想要在输出中包含的行号。
算法
导入必要的库
创建 SparkSession
创建 DataFrame
结合使用 select() 函数和 collect() 函数从 DataFrame 中检索所需的行,并将每一行存储在单独的变量中。
将包含所需行的变量的值打印到控制台。
示例
# Import necessary libraries from pyspark.sql import SparkSession # Create a SparkSession spark = SparkSession.builder.appName("column_sum").getOrCreate() # Create the DataFrame df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3']) # Retrieve the last row df1= df.select(['Col1', 'Col2', 'Col3']).collect(0) print(df1) # Retrieve the last two rows df2= df.select(['Col1', 'Col2', 'Col3']).collect(-1) print(df2) # Retrieve the last three rows df3= df.select(['Col1', 'Col2', 'Col3']).collect(1) print(df3)
输出
[Row(__='Row3', Col1=7, Col2=8, Col3=9)] [Row(__='Row2', Col1=4, Col2=5, Col3=6), Row(__='Row3', Col1=7, Col2=8, Col3=9)] [Row(__='Row1', Col1=1, Col2=2, Col3=3), Row(__='Row2', Col1=4, Col2=5, Col3=6), Row(__='Row3', Col1=7, Col2=8, Col3=9)]
方法 7:使用 filter() 和 collect()
我们可以结合使用 filter() 函数和 collect() 方法来显示 Pyspark 数据框中的特定行。以下是使用的语法
dataframe.filter(condition).collect()[index]
这里
dataframe 是我们应用此方法的数据框
Condition 是根据其过滤数据框行的条件。
Index 是我们想要在输出中包含的行号。
算法
导入必要的库
创建 SparkSession
创建 DataFrame
结合使用 filter() 函数和 collect() 函数从 DataFrame 中检索所需的行,并将每一行存储在单独的变量中。
将包含所需行的变量的值打印到控制台。
示例
# Import necessary libraries from pyspark.sql import SparkSession # Create a SparkSession spark = SparkSession.builder.appName("filter_collect_example").getOrCreate() # Create the DataFrame df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3']) # Filter the DataFrame df1 = df.filter(df.Col1 > 1).collect()[0] # Print the collected data print(df1) # Filter the DataFrame df2 = df.filter(df.Col1 > 1).collect()[1] # Print the collected data print(df2) # Filter the DataFrame df3 = df.filter(df.Col1 > 1).collect()[-1] # Print the collected data print(df3)
输出
Row(Col1=4, Col2=5, Col3=6) Row(Col1=7, Col2=8, Col3=9) Row(Col1=7, Col2=8, Col3=9)
方法 8:使用 where() 和 collect()
我们可以结合使用 where() 函数和 collect() 方法来显示 Pyspark 数据框中的特定行。使用 where() 方法,我们可以根据方法中传递的条件过滤特定行,然后我们可以应用 collect() 方法将结果存储在变量中。以下是使用的语法
dataframe.where(condition).collect()[index]
这里
dataframe 是我们应用此方法的数据框
Condition 是根据其过滤数据框行的条件。
Index 是我们想要在输出中包含的行号。
算法
导入必要的库
创建 SparkSession
创建 DataFrame
结合使用 where() 函数和 collect() 函数从 DataFrame 中检索所需的行,并将每一行存储在单独的变量中。
将包含所需行的变量的值打印到控制台。
示例
# Import necessary libraries from pyspark.sql import SparkSession # Create a SparkSession spark = SparkSession.builder.appName("filter_collect_example").getOrCreate() # Create the DataFrame df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3']) # Filter the DataFrame df1 = df.where(df.Col1 > 1).collect()[0] # Print the collected data print(df1) # Filter the DataFrame df2 = df.where(df.Col1 > 1).collect()[1] # Print the collected data print(df2) # Filter the DataFrame df3 = df.where(df.Col1 > 1).collect()[-1] # Print the collected data print(df3)
输出
Row(Col1=4, Col2=5, Col3=6) Row(Col1=7, Col2=8, Col3=9) Row(Col1=7, Col2=8, Col3=9)
方法 9:使用 take()
在 PySpark 中,take() 函数也用于显示 Python 数据框中前 n 行。此函数的返回值是由前 n 行组成的小型数据框。以下是使用的语法
dataframe.take(n)
这里
dataframe 是我们应用此方法的数据框
n 是行数
算法
导入必要的库
创建 SparkSession
创建 DataFrame
通过将行参数传递为 1,使用 take() 函数检索 DataFrame 的第一行
将第一行打印到控制台
通过将行参数传递为 2,使用 take() 函数检索 DataFrame 的前两行
将前两行打印到控制台
通过将行参数传递为 3,使用 take() 函数检索 DataFrame 的前三行
将前三行打印到控制台
示例
# Import necessary libraries from pyspark.sql import SparkSession # Create a SparkSession spark = SparkSession.builder.appName("column_sum").getOrCreate() # Create the DataFrame df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3']) # Retrieve the first row df1= df.take(1) print(df1) # Retrieve the first two rows df2= df.take(2) print(df2) # Retrieve the first three rows df3= df.take(3) print(df3)
输出
[Row(__='Row1', Col1=1, Col2=2, Col3=3)] [Row(__='Row1', Col1=1, Col2=2, Col3=3), Row(__='Row2', Col1=4, Col2=5, Col3=6)] [Row(__='Row1', Col1=1, Col2=2, Col3=3), Row(__='Row2', Col1=4, Col2=5, Col3=6), Row(__='Row3', Col1=7, Col2=8, Col3=9)]
结论
根据用例的不同,每种方法的效率可能高于或低于其他方法,并且每种方法都有其自身的优点或缺点。为特定任务选择最佳方法最为重要。由于这些方法的效率高,因此也可以将其应用于大型数据集。