《使⽤Python和Dask实现分布式并⾏计the event
算》2.IntroducingDask(介绍Dask)
楔⼦
现在相信你已经对DAG的⼯作原理有了基本的理解,那么下⾯来看看Dask如何使⽤DAG来创建健壮的、可扩展的workload(控制器)。
下⾯我们要完成两件事:使⽤Dask的DataFrame API来分析结构化数据集;研究⼀些有⽤的诊断⼯具,并使⽤low-level Delayed API来创建⼀个简单的⾃定义任务图。
import sys
import dask
rhodesiaprint(dask.__version__) # 2.28.0
print(sys.version[: 5]) # 3.8.1
你好Dask:看⼀下你的DataFrame API
任何数据科学项⽬的⼀个基本步骤都是对数据集执⾏探索性分析,在探索性分析期间,你需要检查数据是否存在缺失值、异常值和任何其他数据质量问题。如果出现了脏数据,需要对其进⾏清洗,以确保对数据所做的任何结论不会受到错误或异常数据的影响。在使⽤Dask DataFrame API的第⼀个⽰例中,我们将逐步读取数据⽂件,并扫描数据以查找缺失值,并删除由于丢失太多数据或者对分析没有帮助的列。
检测Dask对象的元数据(metadata)
没有特殊说明,我们的代码都是在jupyter notebook上⾯运⾏。
import dask.dataframe as dd
df = dd.read_csv(r"C:\Urs\satori\Desktop\item\item.csv")
df
如果你是⼀个经验丰富的pandas使⽤者,那么你会发现上⾯的代码⾮常的熟悉,因为它们在语法上是等价的。但是结果却不⼀定是你熟悉的,pandas的DataFrame对象在打印的时候会尝试将数据的开头和结尾显⽰出来,最上⽅是列名,左侧是索引。但是我们看到Dask DataFrame在打印的时候会显⽰元数据,列名在顶部,⽽下⽅则是每个列各⾃的数据类型,因为Dask会⾮常努⼒、并且智能地从数据
中推断数据类型,就像pandas所做的那样。但是Dask在这⽅⾯的能⼒也是会受到限制的,Dask是⽤来处理不能被单机读取的中⼤型数据集的,⽽pandas是完全基于内存操作,所以它可以轻松⽽快速地扫描整个数据集,从⽽判断每个列的最佳数据类型。另⼀⽅⾯,Dask必须能够很好的处理分散在分布式⽂件系统中多个机器的数据集,因此Dask DataFrame使⽤随机抽样的⽅法从⼀⼩部分数据样本中分析和推断数据类型。但是这样有⼀个问题,如果数百万或数⼗亿⾏中只有⼀个异常⾏,那么这个异常⾏就不太可能被随机挑选出来,这将导致Dask选择不兼容的数据类型,可能会导致在后⾯的执⾏计算中出现错误。因此避免这种情况的最佳实践是显式地设置数据类型,⽽不是依赖于Dask的推断。
事实上这⼀点很好理解,因为pandas是将数据全部读取到内存中,即便是分块,它最终也是可以选择⼀个合适的类型。但是对于Dask⽽⾔,显然是没有办法这么做的,因为数据量就已经决定了光靠随机采样是达不到百分之百的准确率。
或者更好的做法是使⽤能够表⽰数据类型的⼆进制⽂件(⽐如:Parquet),类型是什么在⽂件中进⾏体现,这样就完全可以避免类型推断带来的问题。我们将在后续系列来讨论这个问题,⽬前我们就让Dask⾃⼰推断数据类型。
显然我们来看看Dask DataFrame的输出,因为这和pandas DataFrame的输出还是有很⼤差别的。Dask DataFrame显⽰的是元数据信息,它告诉了我们Dask的任务调度器在处理该⽂件时是如何对任务进⾏分解的。
npartitions显⽰了将DataFrame分隔成多少个分区,这⾥是65个,由于该⽂件的⼤⼩是3.85GB,在65个分区中,每个分区的⼤⼩⼤概是60.65MB。这意味着Dask不是⼀次性将⽂件加载到内存中,⽽是每个Dask⼯作线程⼀次处理⼀个60.65MB的⽂件块。⽽且dd.read_csv在读取这个⼤⽂件时,⼏乎是瞬间完成的,这也说明了该函数在读取csv⽂件时会执⾏懒加载。Dask将⽂件分成可以独⽴处理的多个⼩块,⽽不是马上将整个⽂件读取到内存中,⽽这些块就叫做分区。⽽Dask
DataFrame中的每⼀个分区,都相当于是⼀个较⼩的pandas DataFrame。
所以Dask是将⼤⽂件分割成多个分区,⼀次处理⼀个分区。
yet图中的Dask DataFrame是由两个分区组成,因此单个Dask DataFrame由两个较⼩的pandas DataFrame组成,每个分区都可以加载到内存中进⾏处理。⼯作节点先拾取分区1进⾏处理,然后将结果保存在临时存储空间中。然后拾取分区2进⾏处理,也将结果保存在⼀个临时空间中,然后将两个结果合并并返回给我们。因为⼯作节点⼀次可以处理较⼩的数据⽚段,所以可以将⼯作分配个多个机器,或者在本地的情况下,也可以在⾮常⼤的数据集上继续⼯作,⽽不会导致内存不⾜产⽣的错误(内存溢出)。
注意df返回的元数据,我们还有⼀个信息没有说,就是最下⾯的65 tasks,这表⽰Dask DataFrame由65个任务组成。这表⽰Dask创建了⼀个有65个节点的有向⽆环图来处理数据。⽽我们恰好也有65个
分区,表⽰每个分区⾥⾯有⼀个任务,注意:分区数和任务数不⼀定是⼀致的,因为⼀个分区可以对应多个任务。我们这⾥是65个分区,所以如果有65个worker的话,那么可以同时处理整个⽂件,但如果只有⼀个worker,那么Dask将依次循环遍历每⼀个分区。现在让我们尝试计算⼀下,整个⽂件中每个列的缺失值。
# 可以看到使⽤的api和pandas的DataFrame基本是⼀致的
missing_value = df.isnull().sum()
"""
Dask Series Structure:
npartitions=1topik成绩查询
en_description int64
item_id ...
dtype: int64
Dask Name: dataframe-sum-agg, 196 tasks
"""
虽然在计算的⽅法上和pandas是⼀致的,但是结果和之前⼀样,返回的Series对象并没有给出我们期望的输出。返回的不是缺失值的统计值,⽽是⼀些关于预期结果的元数据信息。⽽且通过输出信息我们发现返回的结果看起来像是⼀系列int64,但是实际数据在哪⾥呢?其实Dask还没有进⾏处理,因为采⽤的是惰性计算。这意味着Dask实际上做的是准备⼀个DAG,然后存储在missing_value变量中,在显式执⾏任务图之前,不会计算数据。这种⾏为使得快速构建复杂任务图成为可能,⽽不必等待每个中间步骤完成,通过返回信息的最后⼀⾏我们知道此时的任务数量已经增加到了196。Dask从DAG中获取了前65个任务,这些任务⽤于读取数据⽂件创建DataFrame,然后DataFrame⼜添加了131个任务来检查null值以及计算sum,最终将所有部分收集到⼀个单⼀的Series对象中并返回答案。
missing_count = ((missing_value / df.index.size) * 100)
missing_count
"""
Dask Series Structure:
npartitions=1
hook是什么意思en_description float64
item_id ...
dtype: float64
Dask Name: mul, 329 tasks
"""
在执⾏计算之前,我们要求Dask将缺失值的数量转成百分⽐,显然要除以DataFrame的总⾏数,再乘以100。注意:任务数量增加的同时,返回的Series对象的数据类型也从int64变成了float64。这是因为触发操作的结果不是整数,因此Dask⾃动将结果转换为浮点数,正如Dask尝试从⽂件中推断数据类型⼀样,它也会尝试推断某个操作如何影响输出的数据类型。由于我们已经像DAG中添加了⼀个⽤于两个数相除的操作,Dask推断我们可能会从整数移动到浮点数,并相应的改变元数据。
⽤compute⽅法运⾏计算
现在我们准备运⾏并⽣成结果了。
from dask.diagnostics import ProgressBar # 绘制进度条
# 在计算的时候会⾃动显⽰进度条爱华英语
with ProgressBar():
3345
missing_count_pct = pute()
missing_count_pct
结果得到的是⼀个pandas的Series对象,也和我们平时使⽤pandas得到的结果也是⼀样的,我们看到总共花费了37.7s。所以当你想要得到计算结果时,需要调⽤compute⽅法,这会告诉Dask我们需要你开始真正地执⾏了。我们看到这个过程类似于Spark中的RDD,每⼀步的操作都是⼀个懒加载(transorm),当遇见action操作时才会计算结果,如果你不了解Spark也没有关系,因为这很好理解。总之df可以经历很多很多的操作,每⼀个操作的结果也可以使⽤变量进⾏保存,但它们都是⼀个懒加载,不会⽴即执⾏。所以相当于是记录了前前后后的⾎缘关系,"谁"通过"什么操作"得到了"谁",有⼈发现了这不就是DAG吗?是的,我们⼀开始就说了任务调度器就是使⽤了DAG的概念,⽽且Spark也是如此,所以这两者在某种程度上是⽐较相似的。当执⾏compute⽅法时,才会真正从头开始计算。我们还使⽤了ProgressBar来显⽰任务进度条,以及花费的时间,这是Dask提供的⼏种跟踪⼿段之⼀,
⽤于跟踪运⾏中的任务,在使⽤本地任务调度器尤其⽅便。由于我们⽬前是在本地、没有使⽤集群,所以调度器就是本地任务调度器。
lect = missing_count_pct[missing_count_pct != 0].index # 筛选出缺失值百分⽐不为零的列
with ProgressBar():
df_lect = df[lect].persist()
df_lect
有趣的是lect是⼀个pandas⾥的对象,但是我们可以将它和Dask DataFrame的⽅法⼀起使⽤,因为Dask DataFrame的每个分区都是⼀个pandas DataFrame。在这种情况下,pandas⾥的对象对所有线程都可⽤,因此它们可以在计算中使⽤它。在集群上运⾏时,pandas Series对象会被序列化并⼴播到所有⼯作节点。
另外我们看到除了compute之外,还有⼀个persist。这⾥调⽤compute返回是pandas中的对象,调⽤persist返回的依旧是Dask中的对象,这⾥是Dask DataFrame。但是这两者确实都发⽣了计算,从进度条我们也可以看出
来,df[lect].persist()返回的Dask DataFrame只有两个字段,证明Dask的确真正执⾏了df[lect]逻辑。
六级成绩查询时间2021
另外,persist根据任务调度器的不同还会有不同的表现,如果任务调度器⽀持异步计算,那么persist会⽴即返回,返回值包含⼀个Dask Future对象;但如果任务调度器只⽀持阻塞式计算,那么persist也会处于阻塞状态,并且在计算之后依旧返回Dask中的对象,这⾥是Dask DataFrame。我们下⾯还会啰嗦⼀下persist,但是相⽐你已经猜到persist的使⽤场景了。
使⽤persist让复杂的计算更⾼效
这⾥我们再来啰嗦⼀下persist,有些时候我们不需要全部的列,因此我们需要将不要的列过滤掉,否则每次计算时都要加载额外的列。但是我们不能使⽤compute,因为这样就直接得到pandas中的对象了,所以返回的依旧得是Dask中的对象。回想⼀下,任务图中的节点⼀旦执⾏,它的中间结果就会被丢弃,因为要最⼩化内存使⽤,没有persist的话,这意味着想对做⼀些额外的事情(查看DataFrame的前五⾏)的话,我们将不得不再次重新运⾏整个转换链。为了避免多次重复的计算,Dask允许我们存储计算的中间结果,以便重⽤它们,这样的话就不需要重头计算了,⽽这⼀步就通过persist来完成。此外,如果Dask需要内存时,可能会从内存中删除⼀些分区,这些被删除的分区将在需要时被动态计算。尽管重新计算分区需要花些时间,但是它仍然⽐重新计算整个任务图要快的多。所以如果你有⼀
个需要多次重⽤、并且⾮常⼤⾮常复杂的DAG,那么适当地使⽤persist进⾏持久化对于加速计算是⾮常有⽤的。
以上我们便结束了对Dask DataFrame的基本了解,现在你已经知道了如何通过⼏⾏代码读取数据并开始为探索性分析做准备。⽽上⾯的代码有⼀个最让⼈兴奋的特点,那就是⽆论你在单台机器处理还是在多台机器处理,⽆论分析的是⼏MB的数据还是⼏PB的数据,它们的⼯作原理都是⼀样的。另外由于它和pandas的代码⾮常相似,你可以对之前的代码进⾏很少的修改即可实现通过Dask执⾏并⾏计算。在后续系列我们将更深⼊地分析,但是⽬前我们有⼀个当务之急,我们需要挖掘⼀下Dask如何使⽤DAG来管理我们刚才说的任务分发。
可视化DAG
使⽤Dask Delayed对象可视化⼀个简单的DAG
我们之前使⽤的是Dask DataFrame,现在我们后退⼀步,降低⼀个级别:Dask Delayed对象。之所以这么做的原因是,即使是简单的DataFrame对象,Dask为其创建的DAG中的节点也会有很多,这会加⼤可视化的难度。
def inc(i):
return i + 1
def add(x, y):
return x + y
# 关于delayed我们后⾯会说,这⾥可以认为将函数变成⼀个Delayed对象
# 然后传参⽅式依旧不变
x = delayed(inc)(1)
y = delayed(inc)(2)
z = delayed(add)(x, y)
# 进⾏可视化
z.visualize()
这图是程序帮我们画的,⾃下⽽上分析的话,还是很形象的,两个inc函数执⾏得到的结果传递给add
函数。逻辑很简单,重点是⾥⾯的delayed函数,它是负责将普通的函数转化为⼀个Delayed对象。⽽且我们看到Delayed对象在调⽤时,可以接收普通的值,也可以接收别的Delayed对象在调⽤之后的返回值(当然最终得到的也是⼀个普通的值)。
type(x) # dask.delayed.Delayed
这图是程序帮我们画的,⾃下⽽上分析的话,还是很形象的,两个inc函数执⾏得到的结果传递给add函数。逻辑很简单,重点是⾥⾯的delayed函数,它是负责将普通的函数转化为⼀个Delayed对象。⽽Delayed对象代表了DAG中的节点(task),像我们之前的DataFrame是在Delayed之上的更⾼⼀级的对象,它对应多个Delayed对象。代码中的x表⽰函数inc的延迟求值,因为它不是被⽴刻执⾏的,当然Delayed对象还可以引⽤其它的Delayed对象,这⼀点从z的定义上也能看出来。这些Delayed对象连接在⼀起最终构成了⼀个图,对于要求值的z,⾸先要求x和y被计算出来。所以这便是⼀个简单的DAG⽰意图,对z的求值有⼀个很明显的依赖链,需要按照顺序求值,并且有⼀个准确定义的起点和终点。
从图中我们看到add函数是有依赖的,但是inc函数没有,因此如果inc函数执⾏时间⽐较的话,那么并⾏计算就很有意义。
使⽤循环和容器对象可视化⼀个复杂的DAG
让我们看⼀个稍微复杂的栗⼦。
import dask.delayed as delayed
def add_two(x):
return x + 2
data = [1, 5, 8, 10]
# step_1是⼀个列表, ⾥⾯是对add_two函数的延迟求值
step_1 = [delayed(add_two)(i) for i in data]
# 这⾥将内置函数sum也变成了Delayed对象
total = delayed(sum)(step_1)
total.visualize()
整体没什么难度,现在我们弄得再复杂⼀些。
def add_two(x):
return x + 2
def sum_two_numbers(x,y):
return x + y
雅思补习def multiply_four(x):
return x * 4
data = [1, 5, 8, 10]
step_1 = [delayed(add_two)(i) for i in data]
step_2 = [delayed(multiply_four)(j) for j in step_1]
total = delayed(sum)(step_1)
total.visualize()
所以我们看到可以将多个计算连在⼀起,⽽⽆需⽴即计算中间结果。
使⽤persist降低DAG的复杂性
shook我们继续,将上⼀步计算出的结果和⾃⼰本⾝再加起来,然后再求和。
import dask.delayed as delayed
def add_two(x):
return x + 2
def multiply_four(x):
return x * 4
def sum_two_numbers(x,y):
return x + y
data = [1, 5, 8, 10]
step_1 = [delayed(add_two)(i) for i in data]
step_2 = [delayed(multiply_four)(j) for j in step_1]
total = delayed(sum)(step_1)
# 将data中的每⼀个值都和total进⾏相加,得到data2
data2 = [delayed(sum_two_numbers)(k, total) for k in data]
# 然后对data2再进⾏求和
total2 = delayed(sum)(data2)
total2.visualize()
当然我相信逻辑依旧是很简单的,但问题是如果我们重复⼏次的话,那么这个DAG就会变得很⼤。类似的,如果我们的原始列表中有100个数字,不是4个,那么DAG也会变得⾮常⾮常的⼤,可以尝试⼀下。不过问题是为什么⼤型DAG难以处理,原因就是持久性。
正如之前提到的,每次在延迟对象上调⽤compute⽅法时,Dask都会逐步遍历完整的DAG来⽣成结果,这对于简单的计算来说是可以的。但如果处理的是⾮常⼤的分布式数据集,那么⼀次⼜⼀次的重
复计算很快会变得效率低下,⽽解决的⼀种办法就是持久化想要重⽤的中间结果,但是这对DAG来说会有什么影响呢。