如何将PySpark数据框按行分成两个数据框?


PySpark 数据框定义为可在不同机器上使用的分布式数据集合,并将结构化数据生成到命名列中。“切片”一词通常用于表示数据的划分。在 Python 中,我们有一些内置函数,如 limit()、collect()、exceptAll() 等,可用于将 PySpark 数据框按行分成两个数据框。

语法

以下语法在示例中使用:

limit()

这是 Python 中的内置方法,可用于通过指定整数值来设置行的范围。

subtract()

subtract() 方法返回行结果,形成一个新数据框,其中不包含另一个数据框中的数据。

collect()

Pyspark collect 用于检索给定数据集中的所有元素,它通过循环和变量使用。

createDataFrame()

这是 Python 中的内置方法,它采用 schema 参数来定义数据框的模式。

[: before_slicing] [after_slicing :]

以上表示法称为列表切片,它将用于将数据框分成两行。

head()

通常,Python 中的 head() 方法表示数据表中的前 5 行,但此处它接受某些参数并根据给定条件返回结果。

exceptAll()

这是 Python 中的内置函数,它遵循 PySpark 模块,返回包含 DataFrame 中的行但不在另一个 DataFrame 中的新 DataFrame,同时保留重复项。

count()

这是 Python 中的内置函数,它将用于返回指定数量的行作为结果。

drop()

drop 方法消除特定行或列。

Window.orderBy()

PySpark 窗口函数通过计算结果(例如行号或排名)来定义。orderBy() 是对数据进行分区的唯一方法。

安装要求

pip install pyspark

此必要的命令用于安装有助于运行 PySpark 程序的工具。

使用 Limit() 和 Subtract() 方法

limit() 和 subtract 方法用于将单个数据转换为两个行数据框。limit() 用于通过为其分配整数值来设置特定数量的行,而 subtract 方法可用于包含另一个 DataFrame 中不存在的唯一行。

示例

在以下示例中,我们将首先导入 pyspark 和 SparkSession 模块,这将创建数据框的会话。然后在变量 rows 中设置值作为行数据。接下来,在变量 cols 中设置数据的列值。现在使用名为 createDataFrame() 的方法和 SparkSession 模块来设置行和列,这定义了数据框的两个不同模式,并将其存储在变量 df_first 中。然后初始化变量 df_second,它将值设置为名为 subtract() 的内置函数,该函数接受名为变量 df_first 的参数,这将导致返回新的数据框。最后,我们对这两个变量 df_first 和 df_second 使用 show() 方法来获取结果。

# Import the PySpark module
import pyspark
from pyspark.sql 
import SparkSession
# Create the session
Spark_Session = SparkSession.builder.appName(
   'EMPLOYEE DATA'
).getOrCreate()
# rows of Dataframe
rows = [['1', 'RAHUL', 'INDIA','1243'],
   ['2','PETER', 'SRI LANKA','5461'],
   [ '3',' JOHN', 'SOUTH KOREA','2224'],
   [ '4', 'MARK', 'NEWYORK','9985'],
   [ '5', 'SUNNY', 'BANGLADESH','8912']
]
# Columns of DataFrame
cols = ['S.N', 'EMPLOYEE NAME', 'COUNTRY', 'EMP_ID']
# DataFrame creation for rows and columns
df = Spark_Session.createDataFrame(rows, cols)
# Getting the first two slicing of rows
df_first = df.limit(2)
# Getting the second slicing by removing the variable df1
df_second = df.subtract(df_first)
# first slice with 2 rows with columns names
df_first.show()
# Second slice with 3 rows with columns names
df_second.show()

输出

+---+-------------+---------+------+
|S.N|EMPLOYEE NAME|  COUNTRY|EMP_ID|
+---+-------------+---------+------+
|  1|        RAHUL|    INDIA|  1243|
|  2|        PETER|SRI LANKA|  5461|
+---+-------------+---------+------+

+---+-------------+-----------+------+
|S.N|EMPLOYEE NAME|    COUNTRY|EMP_ID|
+---+-------------+-----------+------+
|  3|         JOHN|SOUTH KOREA|  2224|
|  5|        SUNNY| BANGLADESH|  8912|
|  4|         MARK|    NEWYORK|  9985|
+---+-------------+-----------+------+

使用 Collect() 和 CreateDataFrame() 方法

collect 方法用于从给定数据中检索所有元素,而 createDataFrame() 将数据框的两个模式分开。

请注意,模式由表的结构定义。

示例

在以下示例中,首先使用 SparkSession 创建会话。然后初始化变量 data,它将以列表格式设置行数据。然后使用带有 spark 的 createDataframe() 方法,该方法接受参数 - data(给定的行)和 ["Name", "Age"](设置列的名称)。为了获取行列表,它将使用 collect() 方法作为变量 df 的对象引用,并将其存储在变量 rows 中。接下来,我们在变量 rows1 和 rows2 中分别使用两个列表切片,即之前和之后。继续使用内置方法 createDataframe(),该方法接受两个参数 - name_of_rows(rows1 和 rows2)和 df.schema(设置数据框的模式),并将其分别存储在变量 df1 和 df2 中。最后,它将对这两个变量 df1 和 df2 使用 show 函数来获取结果。

from pyspark.sql 
import SparkSession
# Create the Spark session
spark = SparkSession.builder.appName("EMPLOYEE DATA").getOrCreate()
# Create the sample dataframe
data = [("Vivek", 31), ("Aman", 20), ("Sohan", 13), ("David", 24)]
df = spark.createDataFrame(data, ["Name", "Age"])
# Getting the list of row objects using the collect function
rows = df.collect()
# Getting two rows of the list by using slicing
rows1 = rows[:2]
rows2 = rows[2:]
# Convert the lists of Rows to PySpark DataFrames
df1 = spark.createDataFrame(rows1, df.schema)
df2 = spark.createDataFrame(rows2, df.schema)
# result
df1.show()
df2.show()

输出

+-----+---+

| Name|Age|
+-----+---+
|Vivek| 31|
| Aman| 20|
+-----+---+

+-----+---+
| Name|Age|
+-----+---+
|Sohan| 13|
|David| 24|
+-----+---+

使用 Count()、Filter() 和 Drop() 方法

在此程序中,将数据框分成两个行数据框需要 count() 和 filter() 方法,它们将划分特定唯一行。count() 返回总行数,而 filter() 用于划分 DataFrame 的两行。然后,Drop() 方法删除表示数据框划分的行。

示例

在以下示例中,首先构建 spark 会话,然后在名为 data 的变量中设置行数据。然后使用带有 spark 的 createDataFrame() 设置列名称,该方法接受数据框的两个参数 - data(设置行)和列表(设置列名),并将其存储在变量 df 中。然后在变量 total_rows 中使用 df.count() 来查找总行数。接下来,在变量 n_rows_first_df 中定义第一个数据框的行数。然后,我们使用内置方法 row_number()、over() 和 Window.orderBy() 将行号列添加到数据框中。现在使用内置方法 filter() 将数据框分成两个不同的行,并将其存储在其各自的变量中。最后,它将对两个不同的变量使用两个 show() 方法,以两个行数据框的形式获取结果。

from pyspark.sql 
import SparkSession, Window
from pyspark.sql.functions import row_number
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
# Create the original DataFrame
data = [("Rabina", 35), ("Stephen", 31), ("Raman", 33), ("Salman", 44),("Meera",37)]
df = spark.createDataFrame(data, ["Name", "Age"])
# Get the total number of rows
total_rows = df.count()
# Define the number of rows for the first DataFrame
n_rows_first_df = 2
# Add a row number column to the DataFrame
df_with_row_number = df.withColumn("row_number", row_number().over(Window.orderBy("Name")))
# Slice the DataFrame into two using filter()
first_df = df_with_row_number.filter(df_with_row_number.row_number <= n_rows_first_df).drop("row_number")
second_df = df_with_row_number.filter(df_with_row_number.row_number > n_rows_first_df).drop("row_number")
# Show the resulting DataFrames
first_df.show()
second_df.show()

输出

+------+---+
|  Name|Age|
+------+---+
| Meera| 37|
|Rabina| 35|
+------+---+

+-------+---+
|   Name|Age|
+-------+---+
|  Raman| 33|
| Salman| 44|
|Stephen| 31|
+-------+---+

使用 Head() 和 ExceptAll() 方法

将数据框分成两个行数据框,它使用 head() 和 exceptAll() 这两种方法,这些方法将用于分离具有唯一数据行的两个数据框。

示例

在以下示例中,它使用内置方法 count 获取总行数。然后它在变量 n_rows_first_df 中分配第一个 DataFrame 的行数。为了创建两个数据框,它将使用三个不同的内置函数,如 head()、createDataFrame() 和 exceptAll(),并将它们存储在其各自的变量中。最后,它将使用两个 show() 函数来获取两个行数据框。

from pyspark.sql 
import SparkSession

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Create the original DataFrame
data = [("karisma", 25), ("Bobby", 30), ("Champak", 35), ("Mark", 40)]
df = spark.createDataFrame(data, ["Name", "Age"])

# Get the total number of rows
total_rows = df.count()

# Define the number of rows for the first DataFrame
n_rows_first_df = 2

# Slice the DataFrame into two using head() and exceptAll()
first_rows = df.head(n_rows_first_df)
first_df = spark.createDataFrame(first_rows, df.schema)
second_df = df.exceptAll(first_df)

# Show the resulting DataFrames
first_df.show()
second_df.show()

输出

+-------+---+
|   Name|Age|
+-------+---+
|karisma| 25|
|  Bobby| 30|
+-------+---+

+-------+---+
|   Name|Age|
+-------+---+
|Champak| 35|
|   Mark| 40|
+-------+---+

结论

我们讨论了四种独特的方法来将 PySpark 数据框按行分成两个数据框。所有这些方法都具有表示数据框划分的独特方式。PySpark 数据框是高级交互式数据,可供数据工程师和数据科学家使用。Spark 和 ML 的 Python API 是可视化 PySpark 数据框的常见示例。

更新于:2023年7月17日

693 次浏览

启动您的职业生涯

完成课程获得认证

开始
广告