问题描述
我正在对我使用类的代码应用一些并行化.我知道如果没有与 Python 提供的任何其他方法不同,就不可能选择一个类方法.我在这里找到了解决方案.在我的代码中,我必须使用类进行并行化的部分.在这里,我发布了一个非常简单的代码,仅代表我的结构(相同,但我删除了方法内容,这是很多数学演算,对我得到的输出来说微不足道).问题是因为我可以腌制一种方法(shepard_interpolation),但使用另一种方法(calculate_orientation_uncertainty)我得到了腌制错误.我不知道为什么会这样,或者为什么会部分起作用.
I'm applying some parallelization to my code, in which I use classes. I knew that is not possible to pick a class method without any other approach different of what Python provides. I found a solution here. In my code, I have to parts that should be parallelized, both using class. Here, I'm posting a very simple code just representing the structure of mine (is the same, but I deleted the methods content, which was a lot of math calculus, insignificant for the output that I'm getting). The problem is 'cause I can pickle one method (shepard_interpolation), but with the other one (calculate_orientation_uncertainty) I got the pickle error. I don't know why this is happing, or why it works partly.
def _pickle_method(method): func_name = method.im_func.__name__ obj = method.im_self cls = method.im_class if func_name.startswith('__') and not func_name.endswith('__'): #deal with mangled names cls_name = cls.__name__.lstrip('_') func_name = '_' + cls_name + func_name print cls return _unpickle_method, (func_name, obj, cls) def _unpickle_method(func_name, obj, cls): for cls in cls.__mro__: try: func = cls.__dict__[func_name] except KeyError: pass else: break return func.__get__(obj, cls) class ImageData(object): def __init__(self, width=60, height=60): self.width = width self.height = height self.data = [] for i in range(width): self.data.append([0] * height) def shepard_interpolation(self, seeds=20): print "ImD - Sucess" import copy_reg import types from itertools import product from multiprocessing import Pool copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method) class VariabilityOfGradients(object): def __init__(self): pass @staticmethod def aux(): return "VoG - Sucess" @staticmethod def calculate_orientation_uncertainty(): results = [] pool = Pool() for x, y in product(range(1, 5), range(1, 5)): result = pool.apply_async(VariabilityOfGradients.aux) results.append(result.get()) pool.close() pool.join() if __name__ == '__main__': results = [] pool = Pool() for _ in range(3): result = pool.apply_async(ImageData.shepard_interpolation, args=[ImageData()]) results.append(result.get()) pool.close() pool.join() VariabilityOfGradients.calculate_orientation_uncertainty()
运行时,我得到PicklingError: Can't pickle : attribute lookup builtin.function failed".这与 here 几乎相同.我看到的唯一区别是我的方法是静态的.
When running, I got "PicklingError: Can't pickle : attribute lookup builtin.function failed". And this is almost the same found here. The only difference that I see is that my methods are static.
我注意到在我的 calculate_orientation_uncertainty 中,当我将函数调用为 result = pool.apply_async(VariabilityOfGradients.aux()) 时,即带有括号(在文档示例中我从未见过这个),它似乎工作.但是,当我尝试获取结果时,我收到TypeError: 'int' object is not callable"...
I noticed that in my calculate_orientation_uncertainty, when I call the function as result = pool.apply_async(VariabilityOfGradients.aux()), i.e., with the parenthesis (in the doc examples I never saw this), it seems to work. But, when I try to get the result, I receive "TypeError: 'int' object is not callable"...
任何帮助将不胜感激.提前谢谢你.
Any help would be appreciated. Thank you in advance.
推荐答案
您可以在模块级别定义一个普通函数和一个静态方法.这保留了静态方法的调用语法、自省和可继承特性,同时避免了酸洗问题:
You could define a plain function at the module level and a staticmethod as well. This preserves the calling syntax, introspection and inheritability features of a staticmethod, while avoiding the pickling problem:
def aux(): return "VoG - Sucess" class VariabilityOfGradients(object): aux = staticmethod(aux)
<小时>
例如,
import copy_reg import types from itertools import product import multiprocessing as mp def _pickle_method(method): """ Author: Steven Bethard (author of argparse) http://www.51sjk.com/Upload/Articles/1/0/338/338962_20221209141514667.jpg """ func_name = method.im_func.__name__ obj = method.im_self cls = method.im_class cls_name = '' if func_name.startswith('__') and not func_name.endswith('__'): cls_name = cls.__name__.lstrip('_') if cls_name: func_name = '_' + cls_name + func_name return _unpickle_method, (func_name, obj, cls) def _unpickle_method(func_name, obj, cls): """ Author: Steven Bethard http://www.51sjk.com/Upload/Articles/1/0/338/338962_20221209141514667.jpg """ for cls in cls.mro(): try: func = cls.__dict__[func_name] except KeyError: pass else: break return func.__get__(obj, cls) copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method) class ImageData(object): def __init__(self, width=60, height=60): self.width = width self.height = height self.data = [] for i in range(width): self.data.append([0] * height) def shepard_interpolation(self, seeds=20): print "ImD - Success" def aux(): return "VoG - Sucess" class VariabilityOfGradients(object): aux = staticmethod(aux) @staticmethod def calculate_orientation_uncertainty(): pool = mp.Pool() results = [] for x, y in product(range(1, 5), range(1, 5)): # result = pool.apply_async(aux) # this works too result = pool.apply_async(VariabilityOfGradients.aux, callback=results.append) pool.close() pool.join() print(results) if __name__ == '__main__': results = [] pool = mp.Pool() for _ in range(3): result = pool.apply_async(ImageData.shepard_interpolation, args=[ImageData()]) results.append(result.get()) pool.close() pool.join() VariabilityOfGradients.calculate_orientation_uncertainty()
产量
ImD - Success ImD - Success ImD - Success ['VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess']
<小时>
顺便说一句,result.get() 会阻止调用过程,直到 pool.apply_async 调用的函数(例如 ImageData.shepard_interpolation)完成.所以
By the way, result.get() blocks the calling process until the function called by pool.apply_async (e.g. ImageData.shepard_interpolation) is completed. So
for _ in range(3): result = pool.apply_async(ImageData.shepard_interpolation, args=[ImageData()]) results.append(result.get())
实际上是按顺序调用 ImageData.shepard_interpolation,违背了池的目的.
is really calling ImageData.shepard_interpolation sequentially, defeating the purpose of the pool.
你可以使用
for _ in range(3): pool.apply_async(ImageData.shepard_interpolation, args=[ImageData()], callback=results.append)
回调函数(例如results.append)在函数完成时在调用进程的线程中被调用.它被发送一个参数——函数的返回值.因此,没有什么能阻止快速进行三个 pool.apply_async 调用,并且三个调用 ImageData.shepard_interpolation 所做的工作将同时执行.
The callback function (e.g. results.append) is called in a thread of the calling process when the function is completed. It is sent one argument -- the return value of the function. Thus nothing blocks the three pool.apply_async calls from being made quickly, and the work done by the three calls to ImageData.shepard_interpolation will be performed concurrently.
或者,在此处使用 pool.map 可能更简单.
Alternatively, it might be simpler to just use pool.map here.
results = pool.map(ImageData.shepard_interpolation, [ImageData()]*3)