使用Dask进行并行计算
Dask是一个灵活的开源Python库,用于并行计算。在本文中,我们将学习并行计算以及为什么我们应该选择Dask来实现此目的。
我们将将其与Spark、Ray和Modin等其他库进行比较。我们还讨论了Dask的用例。
并行计算
并行计算是一种计算类型,它同时执行多个计算或进程。大型问题通常被分解成更小的、可以独立解决的部分。
并行计算的四大类是:
位级
指令级
数据级
作业并行。
虽然并行性长期以来一直被用于高性能计算,但由于频率扩展的物理限制,它最近才变得越来越流行。
Dask的需求
一个问题是,我们为什么需要Dask?
借助NumPy、Sklearn、Seaborn等Python库,数据操作和机器学习任务变得简单。对于大多数数据分析任务来说,Python的[pandas]模块已经足够了。可以使用多种方法操作数据,并使用这些数据创建机器学习模型。
但是,如果您的数据超过可用RAM的大小,pandas将变得不够用。这是一个非常普遍的问题。您可以使用Spark或Hadoop来解决这个问题。但是,这些都不是Python环境。因此,您无法使用NumPy、pandas、sklearn、TensorFlow和其他知名的Python机器学习工具。有没有办法解决这个问题?是的!这时Dask就派上用场了。
Dask简介
Dask是一个用于并行计算的框架,可以与Jupyter Notebook无缝集成。最初,它是为了扩展NumPy、Pandas和Scikit-learn的计算能力而创建的,以克服单机存储的限制。可以使用DASK类似物进行学习,但很快它就被用作通用的分布式系统。
Dask有两个主要优势:
可扩展性
Dask与Pandas、NumPy和Scikit-Learn的Python版本原生兼容,并且可以在具有多个内核的集群上可靠地运行。它也可以缩小规模以在单个系统上运行。
规划
类似于Airflow、Luigi,Dask任务调度器针对计算进行了优化。它提供快速反馈,使用任务图管理任务,并支持本地和分布式诊断,使其动态且响应迅速。
此外,Dask提供了一个实时动态仪表板,每100毫秒更新一次,显示进度、内存使用情况等各种信息。
根据您的喜好,您可以克隆git仓库或使用Conda/pip安装Dask。
conda install dask
仅安装核心:
conda install dask-core
Dask-core是Dask的精简版本,只安装核心组件。pip也一样。如果您只关心使用dask数据框和dask数组来扩展pandas、numpy或两者,您也可以只安装dask数据框或dask数组。
python -m pip install dask
安装数据框所需组件
python -m pip install "dask[dataframe]" #
安装数组所需组件
python -m pip install "dask[list]"
让我们看看这个库用于并行计算的几个例子。我们的代码使用dask.delayed来实现并行性。
注意:下面的两个代码片段应该在Jupyter Notebook的两个不同的单元格中运行
import time import random def calcprofit(a, b): time.sleep(random.random()) return a + b def calcloss(a, b): time.sleep(random.random()) return a - b def calctotal(a, b): time.sleep(random.random()) return a + b
现在运行下面的代码片段:
%%time profit = calcprofit(10, 22) loss = calcloss(18, 3) total = calctotal(profit, loss) print(total)
输出
47 CPU times: user 4.13 ms, sys: 1.23 ms, total: 5.36 ms Wall time: 1.35 s
尽管它们相互独立,但这些函数将按顺序一个接一个地执行。因此,我们可以同时执行它们以节省时间。
import dask calcprofit = dask.delayed(calcprofit) calcloss = dask.delayed(calcloss) calctotal = dask.delayed(calctotal)
现在运行下面的代码片段:
%%time profit = calcprofit(10, 22) loss = calcloss(18, 3) total = calctotal(profit, loss) print(total)
输出
Delayed('calctotal-9e3e896e-b4de-400c-aeb8-9e4c0961fe11') CPU times: user 3.3 ms, sys: 0 ns, total: 3.3 ms Wall time: 10.2 ms
即使在这个简单的例子中,运行时间也得到了改善。我们可以按照如下方式查看任务图:
total.visualize(rankdir='LR')
Spark vs. Dask
Spark是一个强大的集群计算框架工具,它将数据和处理分成可管理的部分,将它们分布在任何大小的集群上,并同时执行它们。
尽管Spark是大数据分析的事实上的标准技术,但Dask看起来非常有前景。Dask是轻量级的,并作为Python组件开发,而Spark具有额外的功能,主要用Scala开发,也支持Python/R。如果您想要一个切实可行的解决方案,或者甚至拥有JVM基础设施,Spark可以是您的首选。但是,如果您想要快速、轻量级的并行处理,Dask是一个可行的选择。快速pip install后即可使用。
Dask、Ray和Modin
Ray和Dask具有不同的调度策略。Dask使用中央调度器管理集群的所有作业。由于Ray是去中心化的,每台计算机都有自己的调度器,允许在特定机器级别而不是整个集群级别解决计划任务的问题。Ray缺乏Dask提供的丰富的上层集合API(例如数据框、分布式数组等)。
另一方面,Modin运行在Dask或Ray之上。只需添加一行代码`import modin.pandas as pd`,我们就可以快速地使用Modin扩展我们的Pandas进程。虽然Modin努力尽可能地并行化Pandas API的大部分,但Dask DataFrame有时不会扩展完整的Pandas API。
Dask用例示例
Dask的应用案例分为两类:
我们可以使用动态任务调度来优化计算。
可以使用“大数据”集合(例如并行数组和数据框)来处理大型数据集。
使用Dask集合创建任务图,这是数据处理作业组织的可视化表示。
使用Dask调度器执行任务图。
Dask使用并行编程来执行作业。
术语“并行编程”是指同时执行多个任务。
通过这样做,我们可以有效地利用我们的资源,并同时完成多个任务。
让我们看看Dask提供的一些数据集。
Dask.array - 使用NumPy接口,dask.array将大型数组分成较小的数组,使我们能够对大于系统内存的数组进行计算。
Dask.bag - 它对标准Python对象的集合执行操作,例如filter、map、group by和fold。
Dask.dataframe - 分布式数据框,类似于Pandas。它是由多个较小的数据框构成的大型并行数据框。
结论
在本文中,我们学习了Dask和并行计算。希望它能帮助您提高对Dask的了解,包括它的需求以及与其他库的比较。