如何在PySpark中按值排序?
PySpark是一个分布式数据处理引擎,用于编写API代码。PySpark是Apache Spark和Python的结合。Spark是一个大规模数据处理平台,能够处理PB级数据。在Python中,PySpark内置函数如orderBy()、sort()、sortBy()、createDataFrame()、collect()和asc_nulls_last()可用于对值进行排序。
语法
以下语法用于示例:
createDataFrame()
这是Python中的一个内置函数,它表示从PySpark模块创建DataFrame的另一种方法。
orderBy()
这是Python中的内置方法,它遵循PySpark模块来表示一列或多列的升序或降序。
sort()
此方法用于列出升序。如果我们在sort函数上设置任何条件,则允许我们以降序工作。
sortBy()
Python中的sortBy()方法遵循sparkContext,可用于按有序序列对数据进行排序。
parallelize()
此方法包含在sparkContext中,允许数据分布在多个节点上。
collect()
这通常被称为PySpark RDD或DataFrame collect,可用于访问数据集中的所有项目或数字。
asc_nulls_last("column_name")
这是Python中的一个内置函数,它返回升序序列。
所需安装:
pip install pyspark
此命令有助于基于PySpark运行程序。
示例1
在以下示例中,我们将展示**如何按单个列对DataFrame进行排序**。首先,开始导入名为pyspark.sql.SparkSession的模块。然后创建一个名为SparkSession的对象。然后将元组列表存储在变量spark中。接下来,使用spark.createDataFrame()创建一个DataFrame,并提供数据和列名。接下来,在DataFrame上使用orderBy()函数按所需的列(在本例中为“Age”)进行排序。最后,使用show方法使用排序后的DataFrame来获得最终结果。
from pyspark.sql import SparkSession # Creation of SparkSession spark = SparkSession.builder.getOrCreate() # Creation of DataFrame stu_data = [("Akash", 25), ("Bhuvan", 23), ("Peter", 18), ("Mohan", 26)] df = spark.createDataFrame(stu_data, ["Name", "Age"]) # Sorting of Dataframe column(Age) in ascending order sorted_df = df.orderBy("Age") # sorted DataFrame sorted_df.show()
输出
+------+---+ | Name|Age| +------+---+ | Peter| 18| |Bhuvan| 23| | Akash| 25| | Mohan| 26| +------+---+
示例2
在以下示例中,我们将展示如何按多列对DataFrame进行排序。这里它使用orderBy方法,该方法接受两个参数- list(设置列名)和ascending(将值设置为true,允许排序序列),并将其存储在变量sorted_df中。然后使用show()方法和sorted_df获得结果。
from pyspark.sql import SparkSession # Create SparkSession spark = SparkSession.builder.getOrCreate() # Create a DataFrame data = [("Umbrella", 125), ("Bottle", 20), ("Colgate", 118)] df = spark.createDataFrame(data, ["Product", "Price"]) # Sort DataFrame by product and price in ascending order sorted_df = df.orderBy(["Price", "Product"], ascending=[True, True]) # Show the sorted DataFrame sorted_df.show()
输出
+--------+-----+ | Product|Price| +--------+-----+ | Bottle| 20| | Colgate| 118| |Umbrella| 125| +--------+-----+
示例3
在以下示例中,我们将展示如何以降序对DataFrame进行排序。这里它使用内置函数createDataframe手动将DataFrame设置为二维结构。然后初始化名为sorted_df的变量,该变量使用名为sort()的方法存储值[sort函数的参数是内置函数desc(“Age”),它将遵循降序序列]。最后,我们使用sorted_df.show()打印结果。
from pyspark.sql import SparkSession from pyspark.sql.functions import desc # Creation of SparkSession spark = SparkSession.builder.getOrCreate() # Creation of DataFrame Emp_data = [("Abhinav", 25, "Male"), ("Meera", 32, "Female"), ("Riya", 18, "Female"), ("Deepak", 33, "Male"), ("Elon", 50, "Male")] df = spark.createDataFrame(Emp_data, ["Name", "Age", "Gender"]) # Sort DataFrame by Age in descending order sorted_df = df.sort(desc("Age")) # Show the sorted DataFrame sorted_df.show()
输出
+-------+---+------+ | Name|Age|Gender| +-------+---+------+ | Elon| 50| Male| | Deepak| 33| Male| | Meera| 32|Female| |Abhinav| 25| Male| | Riya| 18|Female| +-------+---+------+
示例4
在以下示例中,**我们将展示如何按值对RDD进行排序**。这里我们从元组列表创建一个RDD。然后在RDD上使用sortBy()函数,并提供一个lambda函数,该函数提取要按每个元组的第二个元素排序的值。接下来,收集并迭代排序后的RDD以访问排序后的记录。
# Sorting RDD by Value from pyspark.sql import SparkSession # Creation of SparkSession spark = SparkSession.builder.getOrCreate() # Creation of RDD data = [("X", 25), ("Y", 32), ("Z", 18)] rdd = spark.sparkContext.parallelize(data) # Sort RDD by value in ascending order sorted_rdd = rdd.sortBy(lambda x: x[1]) # Print the sorted RDD for record in sorted_rdd.collect(): print(record)
输出
('Z', 18) ('X', 25) ('Y', 32)
示例5
在以下示例中,我们将展示**如何对包含空值的DataFrame进行排序**。这里,它使用DataFrame上的sort()函数按“Price”列升序排序,其中包含空值,即none。接下来,它将asc_nulls_last(“Price”)作为参数传递给sort(),然后指定升序到降序的排序顺序。然后将排序后的DataFrame分配给变量sorted_df,然后使用show()方法和相同的变量来获得结果。
# Sorting DataFrame with Null Values from pyspark.sql import SparkSession from pyspark.sql.functions import asc_nulls_last # Creation of SparkSession spark = SparkSession.builder.getOrCreate() # Creation of DataFrame with null values data = [("Charger", None), ("Mouse", 320), ("PEN", 18), ("Bag", 1000), ("Notebook", None)] # None = null df = spark.createDataFrame(data, ["Product", "Price"]) # Sorting of DataFrame column(Price) in ascending order with null values last sorted_df = df.sort(asc_nulls_last("Price")) # Show the sorted DataFrame sorted_df.show()
输出
+--------+-----+ | Product|Price| +--------+-----+ | PEN| 18| | Mouse| 320| | Bag| 1000| | Charger| null| |Notebook| null| +--------+-----+
结论
我们讨论了在PuSpark中对值进行排序的不同方法。我们使用了一些内置函数,如orderBy()、sort()、asc_nulls_last()等。排序的目的是获得升序或降序的序列顺序。PySpark的各种应用,例如实时库、大规模数据处理和构建API。