如何在PySpark中按值排序?


PySpark是一个分布式数据处理引擎,用于编写API代码。PySpark是Apache Spark和Python的结合。Spark是一个大规模数据处理平台,能够处理PB级数据。在Python中,PySpark内置函数如orderBy()、sort()、sortBy()、createDataFrame()、collect()和asc_nulls_last()可用于对值进行排序。

语法

以下语法用于示例:

createDataFrame()

这是Python中的一个内置函数,它表示从PySpark模块创建DataFrame的另一种方法。

orderBy()

这是Python中的内置方法,它遵循PySpark模块来表示一列或多列的升序或降序。

sort()

此方法用于列出升序。如果我们在sort函数上设置任何条件,则允许我们以降序工作。

sortBy()

Python中的sortBy()方法遵循sparkContext,可用于按有序序列对数据进行排序。

parallelize() 

此方法包含在sparkContext中,允许数据分布在多个节点上。

collect()

这通常被称为PySpark RDD或DataFrame collect,可用于访问数据集中的所有项目或数字。

asc_nulls_last("column_name")

这是Python中的一个内置函数,它返回升序序列。

所需安装:

pip install pyspark

此命令有助于基于PySpark运行程序。

示例1

在以下示例中,我们将展示**如何按单个列对DataFrame进行排序**。首先,开始导入名为pyspark.sql.SparkSession的模块。然后创建一个名为SparkSession的对象。然后将元组列表存储在变量spark中。接下来,使用spark.createDataFrame()创建一个DataFrame,并提供数据和列名。接下来,在DataFrame上使用orderBy()函数按所需的列(在本例中为“Age”)进行排序。最后,使用show方法使用排序后的DataFrame来获得最终结果。

from pyspark.sql 
import SparkSession
# Creation of SparkSession
spark = SparkSession.builder.getOrCreate()

# Creation of DataFrame
stu_data = [("Akash", 25), ("Bhuvan", 23), ("Peter", 18), ("Mohan", 26)]
df = spark.createDataFrame(stu_data, ["Name", "Age"])

# Sorting of Dataframe column(Age) in ascending order
sorted_df = df.orderBy("Age")

# sorted DataFrame
sorted_df.show()

输出

+------+---+
|  Name|Age|
+------+---+
| Peter| 18|
|Bhuvan| 23|
| Akash| 25|
| Mohan| 26|
+------+---+

示例2

在以下示例中,我们将展示如何按多列对DataFrame进行排序。这里它使用orderBy方法,该方法接受两个参数- list(设置列名)和ascending(将值设置为true,允许排序序列),并将其存储在变量sorted_df中。然后使用show()方法和sorted_df获得结果。

from pyspark.sql 
import SparkSession
# Create SparkSession
spark = SparkSession.builder.getOrCreate()

# Create a DataFrame
data = [("Umbrella", 125), ("Bottle", 20), ("Colgate", 118)]
df = spark.createDataFrame(data, ["Product", "Price"])

# Sort DataFrame by product and price in ascending order
sorted_df = df.orderBy(["Price", "Product"], ascending=[True, True])

# Show the sorted DataFrame
sorted_df.show()

输出

+--------+-----+
| Product|Price|
+--------+-----+
|  Bottle|   20|
| Colgate|  118|
|Umbrella|  125|
+--------+-----+

示例3

在以下示例中,我们将展示如何以降序对DataFrame进行排序。这里它使用内置函数createDataframe手动将DataFrame设置为二维结构。然后初始化名为sorted_df的变量,该变量使用名为sort()的方法存储值[sort函数的参数是内置函数desc(“Age”),它将遵循降序序列]。最后,我们使用sorted_df.show()打印结果。

from pyspark.sql 
import SparkSession
from pyspark.sql.functions 
import desc
# Creation of SparkSession
spark = SparkSession.builder.getOrCreate()

# Creation of DataFrame
Emp_data = [("Abhinav", 25, "Male"), ("Meera", 32, "Female"), ("Riya", 18, "Female"), ("Deepak", 33, "Male"), ("Elon", 50, "Male")]
df = spark.createDataFrame(Emp_data, ["Name", "Age", "Gender"])

# Sort DataFrame by Age in descending order
sorted_df = df.sort(desc("Age"))

# Show the sorted DataFrame
sorted_df.show()

输出

+-------+---+------+
|   Name|Age|Gender|
+-------+---+------+
|   Elon| 50|  Male|
| Deepak| 33|  Male|
|  Meera| 32|Female|
|Abhinav| 25|  Male|
|   Riya| 18|Female|
+-------+---+------+

示例4

在以下示例中,**我们将展示如何按值对RDD进行排序**。这里我们从元组列表创建一个RDD。然后在RDD上使用sortBy()函数,并提供一个lambda函数,该函数提取要按每个元组的第二个元素排序的值。接下来,收集并迭代排序后的RDD以访问排序后的记录。

# Sorting RDD by Value
from pyspark.sql 
import SparkSession

# Creation of SparkSession
spark = SparkSession.builder.getOrCreate()

# Creation of RDD
data = [("X", 25), ("Y", 32), ("Z", 18)]
rdd = spark.sparkContext.parallelize(data)

# Sort RDD by value in ascending order
sorted_rdd = rdd.sortBy(lambda x: x[1])

# Print the sorted RDD
for record in sorted_rdd.collect():
   print(record)

输出

('Z', 18)
('X', 25)
('Y', 32)

示例5

在以下示例中,我们将展示**如何对包含空值的DataFrame进行排序**。这里,它使用DataFrame上的sort()函数按“Price”列升序排序,其中包含空值,即none。接下来,它将asc_nulls_last(“Price”)作为参数传递给sort(),然后指定升序到降序的排序顺序。然后将排序后的DataFrame分配给变量sorted_df,然后使用show()方法和相同的变量来获得结果。

# Sorting DataFrame with Null Values
from pyspark.sql 
import SparkSession
from pyspark.sql.functions 
import asc_nulls_last

# Creation of SparkSession
spark = SparkSession.builder.getOrCreate()

# Creation of DataFrame with null values
data = [("Charger", None), ("Mouse", 320), ("PEN", 18), ("Bag", 1000), ("Notebook", None)] # None = null
df = spark.createDataFrame(data, ["Product", "Price"])

# Sorting of DataFrame column(Price) in ascending order with null values last
sorted_df = df.sort(asc_nulls_last("Price"))

# Show the sorted DataFrame
sorted_df.show()

输出

+--------+-----+
| Product|Price|
+--------+-----+
|     PEN|   18|
|   Mouse|  320|
|     Bag| 1000|
| Charger| null|
|Notebook| null|
+--------+-----+

结论

我们讨论了在PuSpark中对值进行排序的不同方法。我们使用了一些内置函数,如orderBy()、sort()、asc_nulls_last()等。排序的目的是获得升序或降序的序列顺序。PySpark的各种应用,例如实时库、大规模数据处理和构建API。

更新于:2023年7月17日

631 次浏览

开启你的职业生涯

通过完成课程获得认证

开始学习
广告