问题描述
我在做一些并行处理,如下:
I am doing some parallel processing, as follows:
with mp.Pool(8) as tmpPool: results = tmpPool.starmap(my_function, inputs)
输入如下所示:[(1,0.2312),(5,0.52) ...]即,int 和 float 的元组.
where inputs look like: [(1,0.2312),(5,0.52) ...] i.e., tuples of an int and a float.
代码运行良好,但我似乎无法将其包裹在加载栏 (tqdm) 周围,例如可以使用 imap 方法完成,如下所示:
The code runs nicely, yet I cannot seem to wrap it around a loading bar (tqdm), such as can be done with e.g., imap method as follows:
tqdm.tqdm(mp.imap(some_function,some_inputs))
星图也可以这样做吗?
谢谢!
推荐答案
starmap() 是不行的,但是通过添加 Pool.istarmap() 的补丁是可以的>.它基于 imap() 的代码.您所要做的就是创建 istarmap.py-文件并导入模块以应用补丁,然后再进行常规的多处理导入.
It's not possible with starmap(), but it's possible with a patch adding Pool.istarmap(). It's based on the code for imap(). All you have to do, is create the istarmap.py-file and import the module to apply the patch before you make your regular multiprocessing-imports.
Python <3.8
# istarmap.py for Python <3.8 import multiprocessing.pool as mpp def istarmap(self, func, iterable, chunksize=1): """starmap-version of imap """ if self._state != mpp.RUN: raise ValueError("Pool not running") if chunksize < 1: raise ValueError( "Chunksize must be 1+, not {0:n}".format( chunksize)) task_batches = mpp.Pool._get_tasks(func, iterable, chunksize) result = mpp.IMapIterator(self._cache) self._taskqueue.put( ( self._guarded_task_generation(result._job, mpp.starmapstar, task_batches), result._set_length )) return (item for chunk in result for item in chunk) mpp.Pool.istarmap = istarmap
Python 3.8+
# istarmap.py for Python 3.8+ import multiprocessing.pool as mpp def istarmap(self, func, iterable, chunksize=1): """starmap-version of imap """ self._check_running() if chunksize < 1: raise ValueError( "Chunksize must be 1+, not {0:n}".format( chunksize)) task_batches = mpp.Pool._get_tasks(func, iterable, chunksize) result = mpp.IMapIterator(self) self._taskqueue.put( ( self._guarded_task_generation(result._job, mpp.starmapstar, task_batches), result._set_length )) return (item for chunk in result for item in chunk) mpp.Pool.istarmap = istarmap
然后在你的脚本中:
import istarmap # import to apply patch from multiprocessing import Pool import tqdm def foo(a, b): for _ in range(int(50e6)): pass return a, b if __name__ == '__main__': with Pool(4) as pool: iterable = [(i, 'x') for i in range(10)] for _ in tqdm.tqdm(pool.istarmap(foo, iterable), total=len(iterable)): pass