PySpark DataFrame中的全外部连接


全外部连接是一种操作,它结合了左外部连接和右外部连接的结果。在 PySpark 中,它用于基于特定条件连接两个 DataFrame,其中无论是否匹配,两个 DataFrame 的所有记录都包含在输出中。本文将详细解释如何在 PySpark 中执行全外部连接,并提供一个实际示例来说明其实现。

安装和设置

在 PySpark 中执行全外部连接之前,我们需要设置一个工作环境。首先,我们需要通过在终端运行命令 **“pip install pyspark”** 来安装 PySpark。其次,我们需要通过运行以下命令导入必要的模块:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

语法

在 PySpark 中执行全外部连接的语法如下:

df_full = df1.join(df2, (df1.column_name == df2.column_name), 'full')

算法

  • 首先,我们导入必要的模块,即 **SparkSession** 和 **col**。

  • 我们使用 **builder()** 方法创建一个 SparkSession 对象,并指定应用程序名称和主节点 URL。

  • 我们从 CSV 文件读取数据,并使用 **read.csv()** 方法将它们转换为 DataFrame。在本例中,我们使用的是虚拟 DataFrame。

  • 我们使用 join() 方法执行全外部连接操作,并将条件作为参数传递。

  • 我们使用 **show()** 方法显示生成的 DataFrame。

示例

让我们考虑两个 DataFrame,“sales_df” 和 “customer_df”。“sales_df” 包含公司销售信息,“customer_df” 包含客户购买信息。我们希望根据 “customer_id” 列连接这两个 DataFrame,并获取这两个 DataFrame 的所有记录。

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a SparkSession object
spark = SparkSession.builder.appName("Full Outer Join").getOrCreate()

# Create sample dataframes
data_sales = [("S1", "Product1", 100), 
              ("S2", "Product2", 200), 
              ("S3", "Product3", 300),
              ("S4", "Product4", 400),
              ("S5", "Product5", 500),
              ("S6", "Product6", 600),
              ("S7", "Product7", 700),
              ("S8", "Product8", 800),
              ("S9", "Product9", 900),
              ("S10", "Product10", 1000)]
df_sales = spark.createDataFrame(data_sales, ["sale_id", "product", "amount"])

data_customers = [("C1", "John"), 
                  ("C2", "Jane"), 
                  ("C3", "Mike"), 
                  ("C4", "Emily"), 
                  ("C5", "Bob"), 
                  ("C6", "Alice"),
                  ("C7", "Dave"), 
                  ("C8", "Jenny"), 
                  ("C9", "Peter"), 
                  ("C10", "Sarah")]
df_customers = spark.createDataFrame(data_customers, ["customer_id", "name"])

# Perform the full outer join operation
df_full = df_sales.join(df_customers, (df_sales.sale_id == df_customers.customer_id), 'full')

# Display the resultant dataframe
df_full.show()

输出

sale_id	product	amount	customer_id	name
S1	      Product1	 100	       C1	   John
S2	      Product2	 200	       C2	   Jane
S3	      Product3	 300	       C3	   Mike
S4	      Product4	 400	       C4	   Emily
S5	      Product5	 500	       C5	   Bob
S6	      Product6	 600	       C6	   Alice
S7	      Product7	 700	       C7	   Dave
S8	      Product8	 800	       C8	   Jenny
S9	      Product9	 900	       C9	   Peter
S10	      Product10 1000	       C10	   Sarah

这段代码创建了两个 DataFrame,df_sales 和 df_customers,每个 DataFrame 包含 10 组示例数据。df_sales DataFrame 包含三个变量:销售 ID、商品和金额。df_customers DataFrame 包含两个变量:客户 ID 和姓名。然后,使用 join() 方法和 full join 类型在两个 DataFrame 之间执行全外部连接操作。连接必须满足 df_customers 中的 customer_id 列和 df_sales 中的 sales_id 列匹配。

然后,脚本使用 show() 方法显示最终的 DataFrame。这样,组合后的 DataFrame df_full 中就会显示来自两个 DataFrame 的列。如果一个 DataFrame 中的条目在另一个 DataFrame 中没有对应的记录,则缺失值将被替换为 null。

应用

在处理可能包含缺失数据或空值的大型数据库时,全外部连接是一个有用的操作。它可以应用于各种场景,包括数据清理、组合来自不同来源的数据以及分析来自不同领域的的数据。

结论

基于预定义条件,可以使用称为全外部连接的强大操作来组合来自两个 DataFrame 的数据。通过将条件作为参数传递给 PySpark 中的 join() 方法,我们可以执行全外部连接。按照本文提供的说明,并利用其在数据分析和处理任务中的优势,您可以轻松地在 PySpark 中执行全外部连接。

更新于:2023年5月8日

2K+ 次浏览

启动您的 职业生涯

通过完成课程获得认证

开始
广告