问题描述
我正在尝试实现这个多处理 教程 用于我自己的目的.起初我认为它不能很好地扩展,但是当我做了一个可重现的例子时,我发现如果项目列表超过 124,它似乎永远不会返回答案.在 x = 124 处,它在 0.4 秒内运行,但是当我将其设置为 x = 125 时,它永远不会完成.我在 Windows 7 上运行 Python 2.7.
I am trying to implement this multiprocessing tutorial for my own purposes. At first I thought it did not scale well, but when I made a reproducible example I found that if the list of items goes above 124, it seems to never return an answer. At x = 124 it runs in .4 seconds, but when I set it to x = 125 it never finishes. I am running Python 2.7 on Windows 7.
from multiprocessing import Lock, Process, Queue, current_process import time class Testclass(object): def __init__(self, x): self.x = x def toyfunction(testclass): testclass.product = testclass.x * testclass.x return testclass def worker(work_queue, done_queue): try: for testclass in iter(work_queue.get, 'STOP'): print(testclass.counter) newtestclass = toyfunction(testclass) done_queue.put(newtestclass) except: print('error') return True def main(x): counter = 1 database = [] while counter <= x: database.append(Testclass(10)) counter += 1 print(counter) workers = 8 work_queue = Queue() done_queue = Queue() processes = [] start = time.clock() counter = 1 for testclass in database: testclass.counter = counter work_queue.put(testclass) counter += 1 print(counter) print('items loaded') for w in range(workers): p = Process(target=worker, args=(work_queue, done_queue)) p.start() processes.append(p) work_queue.put('STOP') for p in processes: p.join() done_queue.put('STOP') newdatabase = [] for testclass in iter(done_queue.get, 'STOP'): newdatabase.append(testclass) print(time.clock()-start) print("Done") return(newdatabase) if __name__ == '__main__': database = main(124) database2 = main(125)
推荐答案
好的!来自文档:
警告如上所述,如果子进程已将项目放入队列中(并且它没有使用 JoinableQueue.cancel_join_thread),那么该进程将不会终止,直到所有缓冲的项目已被刷新到管道.这意味着如果您尝试加入该进程,除非您确定,否则您可能会遇到死锁已放入队列的所有项目都已被消耗.同样,如果子进程是非守护进程,然后父进程可能会在尝试退出时挂起加入其所有非恶魔的孩子.请注意,使用管理器创建的队列确实没有这个问题.请参阅编程指南.
Warning As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe. This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children. Note that a queue created using a manager does not have this issue. See Programming guidelines.
正如我在前面的评论中指出的,代码尝试 .join() 处理 before done_queue 队列耗尽 - 并且在以一种时髦的方式更改代码以确保在 .join() 之前耗尽了 done_queue 之后,代码对一百万个项目运行良好.
As I noted in a comment earlier, the code attempts to .join() processes before the done_queue Queue is drained - and that after changing the code in a funky way to be sure done_queue was drained before .join()'ing, the code worked fine for a million items.
所以这是一个飞行员错误的例子,虽然很模糊.至于为什么行为取决于传递给 main(x) 的数字,这是不可预测的:它取决于内部缓冲是如何完成的.真有趣;-)
So this is a case of pilot error, although quite obscure. As to why behavior depends on the number passed to main(x), it's unpredictable: it depends on how buffering is done internally. Such fun ;-)