使用Dask进行并行计算


Dask是一个灵活的开源Python库,用于并行计算。在本文中,我们将学习并行计算以及为什么我们应该选择Dask来实现此目的。

我们将将其与Spark、Ray和Modin等其他库进行比较。我们还讨论了Dask的用例。

并行计算

并行计算是一种计算类型,它同时执行多个计算或进程。大型问题通常被分解成更小的、可以独立解决的部分。

并行计算的四大类是:

  • 位级

  • 指令级

  • 数据级

  • 作业并行。

虽然并行性长期以来一直被用于高性能计算,但由于频率扩展的物理限制,它最近才变得越来越流行。

Dask的需求

一个问题是,我们为什么需要Dask?

借助NumPy、Sklearn、Seaborn等Python库,数据操作和机器学习任务变得简单。对于大多数数据分析任务来说,Python的[pandas]模块已经足够了。可以使用多种方法操作数据,并使用这些数据创建机器学习模型。

但是,如果您的数据超过可用RAM的大小,pandas将变得不够用。这是一个非常普遍的问题。您可以使用Spark或Hadoop来解决这个问题。但是,这些都不是Python环境。因此,您无法使用NumPy、pandas、sklearn、TensorFlow和其他知名的Python机器学习工具。有没有办法解决这个问题?是的!这时Dask就派上用场了。

Dask简介

Dask是一个用于并行计算的框架,可以与Jupyter Notebook无缝集成。最初,它是为了扩展NumPy、Pandas和Scikit-learn的计算能力而创建的,以克服单机存储的限制。可以使用DASK类似物进行学习,但很快它就被用作通用的分布式系统。

Dask有两个主要优势:

可扩展性

Dask与Pandas、NumPy和Scikit-Learn的Python版本原生兼容,并且可以在具有多个内核的集群上可靠地运行。它也可以缩小规模以在单个系统上运行。

规划

类似于Airflow、Luigi,Dask任务调度器针对计算进行了优化。它提供快速反馈,使用任务图管理任务,并支持本地和分布式诊断,使其动态且响应迅速。

此外,Dask提供了一个实时动态仪表板,每100毫秒更新一次,显示进度、内存使用情况等各种信息。

根据您的喜好,您可以克隆git仓库或使用Conda/pip安装Dask。

conda install dask

仅安装核心:

conda install dask-core

Dask-core是Dask的精简版本,只安装核心组件。pip也一样。如果您只关心使用dask数据框和dask数组来扩展pandas、numpy或两者,您也可以只安装dask数据框或dask数组。

python -m pip install dask

安装数据框所需组件

python -m pip install "dask[dataframe]" #

安装数组所需组件

python -m pip install "dask[list]"

让我们看看这个库用于并行计算的几个例子。我们的代码使用dask.delayed来实现并行性。

注意:下面的两个代码片段应该在Jupyter Notebook的两个不同的单元格中运行

import time import random def calcprofit(a, b): time.sleep(random.random()) return a + b def calcloss(a, b): time.sleep(random.random()) return a - b def calctotal(a, b): time.sleep(random.random()) return a + b

现在运行下面的代码片段:

%%time profit = calcprofit(10, 22) loss = calcloss(18, 3) total = calctotal(profit, loss) print(total)

输出

47
CPU times: user 4.13 ms, sys: 1.23 ms, total: 5.36 ms
Wall time: 1.35 s

尽管它们相互独立,但这些函数将按顺序一个接一个地执行。因此,我们可以同时执行它们以节省时间。

import dask calcprofit = dask.delayed(calcprofit) calcloss = dask.delayed(calcloss) calctotal = dask.delayed(calctotal)

现在运行下面的代码片段:

%%time profit = calcprofit(10, 22) loss = calcloss(18, 3) total = calctotal(profit, loss) print(total)

输出

Delayed('calctotal-9e3e896e-b4de-400c-aeb8-9e4c0961fe11')
CPU times: user 3.3 ms, sys: 0 ns, total: 3.3 ms
Wall time: 10.2 ms

即使在这个简单的例子中,运行时间也得到了改善。我们可以按照如下方式查看任务图:

total.visualize(rankdir='LR')

Spark vs. Dask

Spark是一个强大的集群计算框架工具,它将数据和处理分成可管理的部分,将它们分布在任何大小的集群上,并同时执行它们。

尽管Spark是大数据分析的事实上的标准技术,但Dask看起来非常有前景。Dask是轻量级的,并作为Python组件开发,而Spark具有额外的功能,主要用Scala开发,也支持Python/R。如果您想要一个切实可行的解决方案,或者甚至拥有JVM基础设施,Spark可以是您的首选。但是,如果您想要快速、轻量级的并行处理,Dask是一个可行的选择。快速pip install后即可使用。

Dask、Ray和Modin

Ray和Dask具有不同的调度策略。Dask使用中央调度器管理集群的所有作业。由于Ray是去中心化的,每台计算机都有自己的调度器,允许在特定机器级别而不是整个集群级别解决计划任务的问题。Ray缺乏Dask提供的丰富的上层集合API(例如数据框、分布式数组等)。

另一方面,Modin运行在Dask或Ray之上。只需添加一行代码`import modin.pandas as pd`,我们就可以快速地使用Modin扩展我们的Pandas进程。虽然Modin努力尽可能地并行化Pandas API的大部分,但Dask DataFrame有时不会扩展完整的Pandas API。

Dask用例示例

Dask的应用案例分为两类:

  • 我们可以使用动态任务调度来优化计算。

  • 可以使用“大数据”集合(例如并行数组和数据框)来处理大型数据集。

使用Dask集合创建任务图,这是数据处理作业组织的可视化表示。

使用Dask调度器执行任务图。

Dask使用并行编程来执行作业。

术语“并行编程”是指同时执行多个任务。

通过这样做,我们可以有效地利用我们的资源,并同时完成多个任务。

让我们看看Dask提供的一些数据集。

  • Dask.array - 使用NumPy接口,dask.array将大型数组分成较小的数组,使我们能够对大于系统内存的数组进行计算。

  • Dask.bag - 它对标准Python对象的集合执行操作,例如filter、map、group by和fold。

  • Dask.dataframe - 分布式数据框,类似于Pandas。它是由多个较小的数据框构成的大型并行数据框。

结论

在本文中,我们学习了Dask和并行计算。希望它能帮助您提高对Dask的了解,包括它的需求以及与其他库的比较。

更新于:2023年1月9日

493 次浏览

开启你的职业生涯

通过完成课程获得认证

开始学习
广告