python并⾏计算pathos模块简介
⽬录
pathos模块
pathos是⼀个较为综合性的模块,既能多进程,也能多线程。其主要采⽤进程池/线程池⽅法。
pathos本⾝有⼀套进程池⽅法,同时也集成了multiprocess、pp模块的进程池⽅法。
1、pathos⾃⾝的多进程⽅法(pathos.multiprocessing.ProcessPool、pathos.multiprocessing.ProcessingPool、pathos.pools.ProcessPool)
(1)建⽴进程池
pathos.multiprocessing.ProcessPool(*args, **kwds) #建⽴pathos的进程池(pathos.multiprocessing.ProcessPool实例)。
pathos.multiprocessing.ProcessingPool(*args, **kwds) #同上。
pathos.pools.ProcessPool(*args, **kwds) #同上。
nodes:workers的数量。如果不指定nodes,则⾃动检测processors的数量(即ncpus)。
ncpus:worker processors的数量。
rvers:worker rvers的列表。
scheduler:相应的scheduler。
workdir:⽤于scratch calculations/files的$WORKDIR。
scatter:如为True,表⽰采⽤scatter-gatter(默认为worker-pool)。
source:如为Fal,表⽰尽可能少使⽤TemporaryFiles。
timeout:等待scheduler返回值的时间。中再
同样也有⼏个进程池通⽤的⽅法:
XXX.clo() #关闭进程池,关闭后不能往pool中增加新的⼦进程,然后可以调⽤join()函数等待已有⼦进程执⾏完毕。XXX为进程池。
XXX.join() #等待进程池中的⼦进程执⾏完毕。需在clo()函数后调⽤。XXX为进程池。
1def f(a, b = value):
2 pass
3
4pool = pathos.multiprocessing.Pool()
5pool.map(f, a_q, b_q)
6pool.clo()
7pool.join()
(2)创建⼦进程
(a)单个⼦进程可通过pipe⽅法创建:
XXX.pipe(f, *args, **kwds) #采⽤阻塞⽅式(⾮并⾏)提交⼀个任务,阻塞直到返回结果为⽌。XXX为进程池实例。
XXX.apipe(f, *args, **kwds) #异步(并⾏)提交⼀个任务到队列(queue)中,返回ApplyResult实例(其get⽅法可获得任务返回值,但get⽅法是阻塞的,应在所有⼦进程添加完后再调⽤)。XXX为进程池实例。
f(*args,**kwds)为⼦进程对应的活动。
(b)如果⼦进程有返回值,且返回值需要集中处理,则建议采⽤map⽅式(⼦进程活动允许多个参数):
(f, *args, **kwds) #采⽤阻塞⽅式按顺序运⾏⼀批任务,返回结果组成的list。func(iterable1[i], iterable2[i], ...)为⼦进程对应的活动。XXX为进程池实例。
XXX.amap(f, *args, **kwds) #XXX()的异步(并⾏)版本,返回MapResult实例(其具有get()⽅法,获取结果组成的list)。XXX为进程池实例。
1def f(a, b): #map⽅法允许多个参数
2 pass
3
4pool = pathos.multiprocessing.Pool()
5result = pool.map_async(f, (a0, a1, ...), (b0, b1, ...)).get()
6pool.clo()
7pool.join()
(c)如果内存不够⽤,也可采⽤imap迭代器⽅式:
XXX.imap(f, *args, **kwds) #()的⾮阻塞、按顺序迭代器版本,返回迭代器实例。XXX为进程池实例。
XXX.uimap(f, *args, **kwds) #XXX.imap()的⽆序版本(不会按照调⽤顺序返回,⽽是按照结束顺序返回),返回迭代器实例。XXX为进程池实例。
1def f(a, b):
2 pass
3
4pool = pathos.multiprocessing.Pool()
5result = pool.uimap(f, a_q, b_q)
6pool.clo()
7pool.join()
8
9for item in result:
10 pass
2、映射multiprocess模块的多进程⽅法(pathos.multiprocessing.Pool)
(1)建⽴进程池
pathos.multiprocessing.Pool(process=None, initializer=None, initargs=(), maxtasksperchild=None, context=None) #建⽴multiprocess的进程池。
process :使⽤的⼯作进程的数量,如果process是None那么使⽤ os.cpu_count()返回的数量。
initializer: 如果initializer不是None,那么每⼀个⼯作进程在开始的时候会调⽤initializer(*initargs)。
maxtasksperchild:⼯作进程退出之前可以完成的任务数,完成后⽤⼀个新的⼯作进程来替代原进程,来让闲置的资源被释放。
maxtasksperchild默认是None,意味着只要Pool存在⼯作进程就会⼀直存活。
context: ⽤在制定⼯作进程启动时的上下⽂,⼀般使⽤ multiprocess.Pool() 或者⼀个context对象的Pool()⽅法来创建⼀个池,两种⽅法都适当的设置了context。
(2)创建⼦进程
该进程池对应的创建⼦进程⽅法与multiprocess.Pool()(也即multiprocessing.Pool())完全相同。
3、映射pp模块的多进程⽅法1(pathos.pools.ParallelPool、pathos.pp.ParallelPool、
pathos.pp.ParallelPythonPool、pathos.parallel.ParallelPythonPool、pathos.parallel.ParallelPool)
(1)建⽴进程池
pathos.pp.ParallelPool(*args, **kwds) #建⽴映射pp模块⽅法的进程池,返回pathos.parallel.ParallelPool实例。注意,建⽴的进程池的⽅法与pp模块完全不同。
套裙pathos.pp.ParallelPythonPool(*args, **kwds) #等价pathos.pp.ParallelPool()。币重言甘
pathos.pools.ParallelPool(*args, **kwds) #等价pathos.pp.ParallelPool()。
pathos.parallel.ParallelPool(*args, **kwds) #等价pathos.pp.ParallelPool()。
pathos.parallel.ParallelPythonPool(*args, **kwds) #等价pathos.pp.ParallelPool()。
nodes:workers的数量。如果不指定nodes,则⾃动检测processors的数量(即ncpus)。
ncpus:worker processors的数量。
rvers:worker rvers的列表。
scheduler:相应的scheduler。
workdir:⽤于scratch calculations/files的$WORKDIR。
scatter:如为True,表⽰采⽤scatter-gatter(默认为worker-pool)。
source:如为Fal,表⽰尽可能少使⽤TemporaryFiles。
timeout:等待scheduler返回值的时间。
(2)创建⼦进程
该进程池对应的创建⼦进程⽅法与pathos.multiprocessing.ProcessPool()完全相同(与pp模块完全不同)。
注意,multiprocessing.Pipe()或multiprocess.Pipe()建⽴的管道对象⽆法传⼊⼦进程(可能是pickle错误)。但是,ParallelPool进程池中,⼦进程print函数可以直接输出到标准输出,因此也不必通过管道将信息传递到主进程了。但是,⼦进程print输出的格式经常出现异常,最好还是通过返回值在主进程输出。
⽽且,amap⽅法是个特例。amap⽅法中,如果⼦进程有print语句,会导致返回结果不对,只包含最后⼀个⼦进程返回值的tuple,⽽不是所有⼦进程的返回值组成完整list,原因暂不清楚。因此,amap⽅法中,⼦进程需要输出的内容只能通过返回值在主进程输出。
4、映射pp模块的多进程⽅法2(pathos.pp.pp模块)
该⽅法实质就是pp模块。
5、映射python内置map函数的⽅法(pathos.rial.SerialPool、pathos.pools.SerialPool)
该类⽅法实际是串⾏(⾮并⾏),不做具体介绍。
SerialPool建⽴的进程池实际只能⽤pipe、map、imap⽅法(均是阻塞的),不能使⽤apipe、amap、uimap⽅法。
实例(pathos模块)
(1)pathos.multiprocessing.ProcessPool(),pipe⽅法
5def f(x, conn, t0):
6 ans = 1
7 x0 = x
8 t = time.time() - t0
9 conn.nd('factorial of %d: start@%.2fs' % (x0, t))
10 while x > 1:
11 ans *= x
12 time.sleep(0.5)
13 x -= 1
14 t = time.time() - t0
15 conn.nd('factorial of %d: finish@%.2fs, res = %d' %(x0, t, ans))
16 return ans
17
18def main():
19 res = []
20 var = (4, 8, 12, 20, 16)
21 p = pathos.multiprocessing.ProcessPool()
22 p_conn, c_conn = multiprocess.Pipe()
23 t0 = time.time()
24 for i in var:
25 res.append(p.pipe(f, i, c_conn, t0))
26
27 print('output:')
护士交接班制度28 while p_conn.poll():
29 print(v())
30 t = time.time() - t0
31 print('factorial of %s@%.2fs: %s' % (var, t, res))
32
33if __name__ == '__main__':
34 main()
结果:可以看出,所有⼦进程都是逐个执⾏的。
1 output:
2factorial of 4: start@1.11s
3factorial of 4: finish@2.61s, res = 24
4factorial of 8: start@2.61s
5factorial of 8: finish@6.12s, res = 40320
6factorial of 12: start@6.12s
7factorial of 12: finish@11.62s, res = 479001600
8factorial of 20: start@11.63s
9factorial of 20: finish@21.13s, res = 2432902008176640000
10factorial of 16: start@21.15s
11factorial of 16: finish@28.65s, res = 20922789888000
12factorial of (4, 8, 12, 20, 16)@28.73s: [24, 40320, 479001600, 2432902008176640000, 20922789888000](2)pathos.multiprocessing.ProcessPool(),apipe⽅法
5def f(x, conn, t0):
6 ans = 1
7 x0 = x
8 t = time.time() - t0
9 conn.nd('factorial of %d: start@%.2fs' % (x0, t))
10 while x > 1:
11 ans *= x
12 time.sleep(0.5)
过肩发型13 x -= 1
14 t = time.time() - t0
15 conn.nd('factorial of %d: finish@%.2fs, res = %d' %(x0, t, ans))
16 return ans
17
18def main():
19 res = []
20 var = (4, 8, 12, 20, 16)
21 p = pathos.multiprocessing.ProcessPool()
22 p_conn, c_conn = multiprocess.Pipe()
23 t0 = time.time()
24 for i in var:
早安怎么说
25 res.append(p.apipe(f, i, c_conn, t0))
26 for i in range(len(res)):
27 res[i] = res[i].get()
28
29 print('output:')
30 while p_conn.poll():
31 print(v())
果果是谁32 t = time.time() - t0
33 print('factorial of %s@%.2fs: %s' % (var, t, res))
边城小说
34
35if __name__ == '__main__':
36 main()
结果:
1output:
2factorial of 4: start@1.10s
3factorial of 8: start@1.18s
4factorial of 12: start@1.19s
5factorial of 20: start@1.25s
6factorial of 4: finish@2.60s, res = 24
7factorial of 16: start@2.61s
8factorial of 8: finish@4.69s, res = 40320
9factorial of 12: finish@6.69s, res = 479001600
10factorial of 16: finish@10.11s, res = 20922789888000
11factorial of 20: finish@10.75s, res = 2432902008176640000
12factorial of (4, 8, 12, 20, 16)@10.85s: [24, 40320, 479001600, 2432902008176640000, 20922789888000]
(3)pathos.multiprocessing.ProcessPool(),map⽅法
注意,实例将multiprocessing.Pipe()创建的连接作为参数传递给⼦进程,pickle出错,改为multiprocess.Pipe()创建连接即可解决。