在 PySpark DataFrame 中删除重复行
PySpark 是由Apache Spark社区设计的一个工具,用于实时处理数据并在本地 Python 环境中分析结果。Spark DataFrame与其他 DataFrame不同,因为它会分发信息并遵循模式。
Spark 可以处理流处理和批处理,这就是它们受欢迎的原因。PySpark DataFrame需要一个会话才能生成入口点,它在系统上执行数据处理(RAM)。您可以使用以下命令在 Windows 上安装 PySpark 模块 -
pip install pyspark
在本文中,我们将创建一个 PySpark DataFrame 并讨论从该 DataFrame 中删除重复行的不同方法。让我们了解 PySpark DataFrame 的概念。
创建和理解 PySpark DataFrame
与任何其他 DataFrame 一样,PySpark 以表格方式存储数据。它允许程序员处理结构化和半结构化数据,并提供高级 API(Python、Java)来处理复杂的数据集。它可以非常快速地分析数据,因此它在流处理和批处理中都非常有用。
既然我们已经讨论了 PySpark DataFrame 的基础知识,让我们使用 Python 代码创建一个。我们将创建一个包含与不同赛车手相关的信息的 PySpark DataFrame。
示例
我们导入了必要的库,包括“pandas”和“pyspark”。我们还导入了一个名为“SparkSession”的统一接口。
此接口确保 Spark 框架正常运行。它充当 Spark API 的“入口点”,从而提高数据处理效率。简而言之,我们创建 SparkSession 以设置所需的配置。
在此之后,我们使用“builder”API 创建了此 SparkSession 的实例。我们还使用了“getorCreate()”方法来锁定现有会话或将其替换为新的会话。
完成配置部分后,我们准备了一个包含不同汽车特征的数据集字典。我们使用此数据集生成了一个 Pandas DataFrame。
生成的 5X3 DataFrame 存储在“dataframe_pd”变量中。此变量作为“SparkSession”的“createDataFrame()”方法的参数传递,以创建 PySpark DataFrame。
我们使用 Pandas DataFrame 生成了 PySpark DataFrame,但这并非强制步骤。我们可以直接使用元组列表来创建数据集,然后将其传递给“createDataFrame()”方法。
最后,我们使用“dataframe_spk.show()”方法显示了 DataFrame。
示例
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession.builder.getOrCreate()
dataset = {"Carname":["Audi", "Mercedes", "BMW", "Audi", "Audi"], "Max Speed": ["300 KPH", "250 KPH", "220 KPH", "300 KPH", "300 KPH"], "Car number":["MS321", "QR345", "WX281", "MS321", "MS321"]}
dataframe_pd = pd.DataFrame(dataset, index= ["Racer1", "Racer2", "Racer3", "Racer1", "Racer1"])
dataframe_spk = spark.createDataFrame(dataframe_pd)
print("The original data frame is like: -")
dataframe_spk.show()
输出
The original data frame is like: -
Carname Max Speed Car number
Audi 300 KPH MS321
Mercedes 250 KPH QR345
BMW 220 KPH WX281
Audi 300 KPH MS321
Audi 300 KPH MS321
既然我们创建了一个 PySpark DataFrame,让我们讨论从该 DataFrame 中删除行的不同方法。
使用 Distinct() 函数删除行
此函数返回一个包含不同或唯一行的新的 DataFrame。它消除了 DataFrame 中的所有重复行。
示例
我们不为该函数传递任何参数。让我们看看它的实现。
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession.builder.getOrCreate()
dataset = {"Carname":["Audi", "Mercedes", "BMW", "Audi", "Audi"], "Max Speed": ["300 KPH", "250 KPH", "220 KPH", "300 KPH", "300 KPH"], "Car number":["MS321", "QR345", "WX281", "MS321", "MS321"]}
dataframe_pd = pd.DataFrame(dataset)
dataframe_spk = spark.createDataFrame(dataframe_pd)
print("The original data frame is like: -")
dataframe_spk.show()
print("After dropping the duplicate rows we get: -")
dataframe_spk.distinct().show()
输出
The original data frame is like: -
Carname Max Speed Car number
Audi 300 KPH MS321
Mercedes 250 KPH QR345
BMW 220 KPH WX281
Audi 300 KPH MS321
Audi 300 KPH MS321
After dropping the duplicate rows we get: -
Carname Max Speed Car number
Mercedes 250 KPH QR345
BMW 220 KPH WX281
Audi 300 KPH MS321
创建 PySpark DataFrame 后,我们使用distinct()函数来定位唯一行并将其从 DataFrame 中删除。
使用 dropDuplicate() 函数
这是一种替代方法,其工作方式与distinct()函数相同。我们可以定位列并相应地删除行。让我们构建代码。
示例
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession.builder.getOrCreate()
dataset = {"Carname":["Audi", "Mercedes", "BMW", "Audi", "Audi"], "Max Speed": ["300 KPH", "250 KPH", "220 KPH", "300 KPH", "300 KPH"], "Car number":["MS321", "QR345", "WX281", "MS321", "MS321"]}
dataframe_pd = pd.DataFrame(dataset)
dataframe_spk = spark.createDataFrame(dataframe_pd)
print("The original data frame is like: -")
dataframe_spk.show()
print("After dropping the duplicate rows we get: -")
dataframe_spk.dropDuplicates().show()
输出
The original data frame is like: - +--------+---------+----------+ | Carname|Max Speed|Car number| +--------+---------+----------+ | Audi| 300 KPH| MS321| |Mercedes| 250 KPH| QR345| | BMW| 220 KPH| WX281| | Audi| 300 KPH| MS321| | Audi| 300 KPH| MS321| +--------+---------+----------+ After dropping the duplicate rows we get: - +--------+---------+----------+ | Carname|Max Speed|Car number| +--------+---------+----------+ |Mercedes| 250 KPH| QR345| | Audi| 300 KPH| MS321| | BMW| 220 KPH| WX281| +--------+---------+----------+
定位特定列
我们可以借助“select()”函数检查特定列的重复值。我们将对选定的列使用 dropDuplicate() 函数。
示例
dataframe_spk.select(["Carname"]).dropDuplicates().show()
输出
+--------+ | Carname| +--------+ |Mercedes| | BMW| | Audi| +--------+
结论
本文解释了删除“列”值包含任何重复或重复数据的行的基本操作。我们讨论了涉及的不同函数,包括“dropDuplicate()”、“Distinct()”和“select()”。我们创建了一个参考 DataFrame 并从中删除了重复值。
数据结构
网络
关系数据库管理系统
操作系统
Java
iOS
HTML
CSS
Android
Python
C 语言编程
C++
C#
MongoDB
MySQL
Javascript
PHP