PySpark DataFrame中的全外部连接
全外部连接是一种操作,它结合了左外部连接和右外部连接的结果。在 PySpark 中,它用于基于特定条件连接两个 DataFrame,其中无论是否匹配,两个 DataFrame 的所有记录都包含在输出中。本文将详细解释如何在 PySpark 中执行全外部连接,并提供一个实际示例来说明其实现。
安装和设置
在 PySpark 中执行全外部连接之前,我们需要设置一个工作环境。首先,我们需要通过在终端运行命令 **“pip install pyspark”** 来安装 PySpark。其次,我们需要通过运行以下命令导入必要的模块:
from pyspark.sql import SparkSession from pyspark.sql.functions import col
语法
在 PySpark 中执行全外部连接的语法如下:
df_full = df1.join(df2, (df1.column_name == df2.column_name), 'full')
算法
首先,我们导入必要的模块,即 **SparkSession** 和 **col**。
我们使用 **builder()** 方法创建一个 SparkSession 对象,并指定应用程序名称和主节点 URL。
我们从 CSV 文件读取数据,并使用 **read.csv()** 方法将它们转换为 DataFrame。在本例中,我们使用的是虚拟 DataFrame。
我们使用 join() 方法执行全外部连接操作,并将条件作为参数传递。
我们使用 **show()** 方法显示生成的 DataFrame。
示例
让我们考虑两个 DataFrame,“sales_df” 和 “customer_df”。“sales_df” 包含公司销售信息,“customer_df” 包含客户购买信息。我们希望根据 “customer_id” 列连接这两个 DataFrame,并获取这两个 DataFrame 的所有记录。
from pyspark.sql import SparkSession from pyspark.sql.functions import col # Create a SparkSession object spark = SparkSession.builder.appName("Full Outer Join").getOrCreate() # Create sample dataframes data_sales = [("S1", "Product1", 100), ("S2", "Product2", 200), ("S3", "Product3", 300), ("S4", "Product4", 400), ("S5", "Product5", 500), ("S6", "Product6", 600), ("S7", "Product7", 700), ("S8", "Product8", 800), ("S9", "Product9", 900), ("S10", "Product10", 1000)] df_sales = spark.createDataFrame(data_sales, ["sale_id", "product", "amount"]) data_customers = [("C1", "John"), ("C2", "Jane"), ("C3", "Mike"), ("C4", "Emily"), ("C5", "Bob"), ("C6", "Alice"), ("C7", "Dave"), ("C8", "Jenny"), ("C9", "Peter"), ("C10", "Sarah")] df_customers = spark.createDataFrame(data_customers, ["customer_id", "name"]) # Perform the full outer join operation df_full = df_sales.join(df_customers, (df_sales.sale_id == df_customers.customer_id), 'full') # Display the resultant dataframe df_full.show()
输出
sale_id product amount customer_id name S1 Product1 100 C1 John S2 Product2 200 C2 Jane S3 Product3 300 C3 Mike S4 Product4 400 C4 Emily S5 Product5 500 C5 Bob S6 Product6 600 C6 Alice S7 Product7 700 C7 Dave S8 Product8 800 C8 Jenny S9 Product9 900 C9 Peter S10 Product10 1000 C10 Sarah
这段代码创建了两个 DataFrame,df_sales 和 df_customers,每个 DataFrame 包含 10 组示例数据。df_sales DataFrame 包含三个变量:销售 ID、商品和金额。df_customers DataFrame 包含两个变量:客户 ID 和姓名。然后,使用 join() 方法和 full join 类型在两个 DataFrame 之间执行全外部连接操作。连接必须满足 df_customers 中的 customer_id 列和 df_sales 中的 sales_id 列匹配。
然后,脚本使用 show() 方法显示最终的 DataFrame。这样,组合后的 DataFrame df_full 中就会显示来自两个 DataFrame 的列。如果一个 DataFrame 中的条目在另一个 DataFrame 中没有对应的记录,则缺失值将被替换为 null。
应用
在处理可能包含缺失数据或空值的大型数据库时,全外部连接是一个有用的操作。它可以应用于各种场景,包括数据清理、组合来自不同来源的数据以及分析来自不同领域的的数据。
结论
基于预定义条件,可以使用称为全外部连接的强大操作来组合来自两个 DataFrame 的数据。通过将条件作为参数传递给 PySpark 中的 join() 方法,我们可以执行全外部连接。按照本文提供的说明,并利用其在数据分析和处理任务中的优势,您可以轻松地在 PySpark 中执行全外部连接。