问题描述
创建多处理/GUI 编码系统的最佳方法是什么?
What are the best ways to create a multiprocessing/ GUI coding system?
我想为互联网社区创建一个地方来查找有关如何在 python 中使用 multiprocessing 模块的示例.
I would like to create a place for the internet community to come and find examples on how to use the multiprocessing module in python.
我在 Internet 上看到了几个在主模块中调用的简单全局函数的 multiprocessing 进程的小示例,但我发现这很少能轻松转化为任何人实际使用的任何东西关于 GUI.我认为许多程序将具有他们想要在单独进程中使用的功能作为对象的方法(可能是其他对象的聚合等),并且可能单个 GUI 元素将具有需要调用它的关联对象流程等
I have seen several small examples of multiprocessing processes on the internet of simple global functions which are called in a main module, but I have found that this rarely translates easily into anything that anyone actually does with regard to GUIs. I would think that many programs would have the functions which they want to use in a separate process as methods of objects (which may be aggregates of other objects etc.) and perhaps a single GUI element would have an associated object that needs to call this process, etc.
例如,我有一个相对复杂的程序,但在为它获取响应式 GUI 时遇到问题,我认为这是由于我对 multiprocessing 和使用 的线程缺乏了解>QThread.但是,我知道下面给出的示例至少会以我想要的方式在进程之间传递信息(由于能够执行 print 语句),但我的 GUI 仍然处于锁定状态.有谁知道这可能是什么原因造成的,如果我对多线程/多处理架构缺乏了解,这仍然是一个问题?
For example, I have a relatively complex program and I am having problems in getting a responsive GUI for it, which I believed to be due to my lack of understanding in multiprocessing and threading with QThread. However, I do know that the example given below will at least pass information between processes in the manner I desire (due to being able to execute print statements) but my GUI is still locking. Does anyone know what may be causing this, and if it is still a probelm with my lack of understanding in mutlithreaded/multiprocessing architectures?
这是我正在做的一个小的伪代码示例:
Here is a small pseudo code example of what I am doing:
class Worker: ... def processing(self, queue): # put stuff into queue in a loop # This thread gets data from Worker class Worker_thread(QThread): def __init__(self): ... # make process with Worker inside def start_processing(self): # continuously get data from Worker # send data to Tab object with signals/slots class Tab(QTabWidget): # spawn a thread separate from main GUI thread # update GUI using slot def update_GUI()
这段代码是完全可编译的示例,体现了我的程序的覆盖结构:
And this code is fully compilable example which embodies the overlying sturcture of my program:
from PyQt4 import QtCore, QtGui import multiprocessing as mp import numpy as np import sys import time # This object can hold several properties which will be used for the processing # and will be run in the background, while it updates a thread with all of it's progress class Worker: def __init__(self, some_var): self.some_var = some_var self.iteration = 0 def some_complex_processing(self, queue): for i in range(0,5000): self.iteration += 1 queue.put(self.iteration) queue.put('done with processing') # This Woker_thread is a thread which will spawn a separate process (Worker). # This separate is needed in order to separate the data retrieval # from the main GUI thread, which should only quickly update when needed class Worker_thread(QtCore.QThread): # signals and slots are used to communicate back to the main GUI thread update_signal = QtCore.pyqtSignal(int) done_signal = QtCore.pyqtSignal() def __init__(self, parent, worker): QtCore.QThread.__init__(self, parent) self.queue = mp.Queue() self.worker = worker self.parent = parent self.process = mp.Process(target=self.worker.some_complex_processing, args=(self.queue,)) # When the process button is pressed, this function will start getting data from Worker # this data is then retrieved by the queue and pushed through a signal # to Tab.update_GUI @QtCore.pyqtSlot() def start_computation(self): self.process.start() while(True): try: message = self.queue.get() self.update_signal.emit(message) except EOFError: pass if message == 'done with processing': self.done_signal.emit() break #self.parent.update_GUI(message) self.process.join() return # Each tab will start it's own thread, which will spawn a process class Tab(QtGui.QTabWidget): start_comp = QtCore.pyqtSignal() def __init__(self, parent, this_worker): self.parent = parent self.this_worker = this_worker QtGui.QTabWidget.__init__(self, parent) self.treeWidget = QtGui.QTreeWidget(self) self.properties = QtGui.QTreeWidgetItem(self.treeWidget, ["Properties"]) self.step = QtGui.QTreeWidgetItem(self.properties, ["Iteration #"]) self.thread = Worker_thread(parent=self, worker=self.this_worker) self.thread.update_signal.connect(self.update_GUI) self.thread.done_signal.connect(self.thread.quit) self.start_comp.connect(self.thread.start_computation) self.thread.start() ############################### # Here is what should update the GUI at every iteration of Worker.some_complex_processing() # The message appears to be getting sent, due to seeing the print statement in the console, but the GUI is not updated. @QtCore.pyqtSlot(int) def update_GUI(self, iteration): self.step.setText(0, str(iteration)) #time.sleep(0.1) print iteration def start_signal_emit(self): self.start_comp.emit() # GUI stuff class MainWindow(QtGui.QMainWindow): def __init__(self, parent = None): QtGui.QMainWindow.__init__(self) self.tab_list = [] self.setTabShape(QtGui.QTabWidget.Rounded) self.centralwidget = QtGui.QWidget(self) self.top_level_layout = QtGui.QGridLayout(self.centralwidget) self.tabWidget = QtGui.QTabWidget(self.centralwidget) self.top_level_layout.addWidget(self.tabWidget, 1, 0, 25, 25) process_button = QtGui.QPushButton("Process") self.top_level_layout.addWidget(process_button, 0, 1) QtCore.QObject.connect(process_button, QtCore.SIGNAL("clicked()"), self.process) self.setCentralWidget(self.centralwidget) self.centralwidget.setLayout(self.top_level_layout) # Make Tabs in loop from button for i in range(0,10): name = 'tab' + str(i) self.tab_list.append(Tab(self.tabWidget, Worker(name))) self.tabWidget.addTab(self.tab_list[-1], name) # Do the processing def process(self): for tab in self.tab_list: tab.start_signal_emit() return if __name__ == "__main__": app = QtGui.QApplication([]) win = MainWindow() win.show() sys.exit(app.exec_())
更多信息:我正在编写一个程序,我想从中产生几个进程,并让它们在整个处理过程中不断显示它们的进度.我希望程序是多处理的,以便尽可能地从程序中获得最佳速度.
More Information: I am writing a program which I would like to spawn several processes from and have them continuously show their progress throughout their processing. I would like the program to be multiprocessed in order to get the best speed out of the program as possible.
目前,我正在尝试使用线程来生成进程并使用信号和插槽来更新 GUI,同时队列不断检索数据.使用 print 语句时,queues、signals 和 slots 似乎可以工作,但无法更新 GUI.如果有人对我应该如何构建它以使程序更易于管理有任何其他建议,我想学习.
At the moment, I am trying to use a thread to spawn a process and use signals and slots to update the GUI while the data is continuously retrieved by a queue. It appears that the queues, signals, and slots work when using print statements, but can not update the GUI. If anyone has any other suggestions as to how I should structure this in order to keep the program more managable, I would like to learn.
编辑:我已经做了 Min Lin 提出的调整,除了使 Worker 成为 QObject 以便 moveToThread() 工作.
这是我目前拥有的新代码:
EDIT: I have made the adjustments put forth by Min Lin, with the addition of making Worker a QObject so that moveToThread() would work.
Here is the new code I have at the moment:
from PyQt4 import QtCore, QtGui import multiprocessing as mp import numpy as np import sys import time class Worker(QtCore.QObject): update_signal = QtCore.pyqtSignal(int) done_signal = QtCore.pyqtSignal() def __init__(self, some_var): QtCore.QObject.__init__(self, parent=None) self.some_var = some_var self.iteration = 0 self.queue = mp.Queue() self.process = mp.Process(target=self.some_complex_processing, args=(self.queue,)) def some_complex_processing(self, queue): for i in range(0,5000): self.iteration += 1 queue.put(self.iteration) queue.put('done with processing') @QtCore.pyqtSlot() def start_computation(self): self.process.start() while(True): try: message = self.queue.get() self.update_signal.emit(message) except EOFError: pass if message == 'done with processing': self.done_signal.emit() break self.process.join() return class Tab(QtGui.QTabWidget): start_comp = QtCore.pyqtSignal() def __init__(self, parent, this_worker): self.parent = parent self.this_worker = this_worker QtGui.QTabWidget.__init__(self, parent) self.treeWidget = QtGui.QTreeWidget(self) self.properties = QtGui.QTreeWidgetItem(self.treeWidget, ["Properties"]) self.step = QtGui.QTreeWidgetItem(self.properties, ["Iteration #"]) # Use QThread is enough self.thread = QtCore.QThread(); # Change the thread affinity of worker to self.thread. self.this_worker.moveToThread(self.thread); self.this_worker.update_signal.connect(self.update_GUI) self.this_worker.done_signal.connect(self.thread.quit) self.start_comp.connect(self.this_worker.start_computation) self.thread.start() ############################### # Here is what should update the GUI at every iteration of Worker.some_complex_processing() # The message appears to be getting sent, due to seeing the print statement in the console, but the GUI is not updated. @QtCore.pyqtSlot(int) def update_GUI(self, iteration): self.step.setText(0, str(iteration)) #time.sleep(0.1) print iteration def start_signal_emit(self): self.start_comp.emit() # GUI stuff class MainWindow(QtGui.QMainWindow): def __init__(self, parent = None): QtGui.QMainWindow.__init__(self) self.tab_list = [] self.setTabShape(QtGui.QTabWidget.Rounded) self.centralwidget = QtGui.QWidget(self) self.top_level_layout = QtGui.QGridLayout(self.centralwidget) self.tabWidget = QtGui.QTabWidget(self.centralwidget) self.top_level_layout.addWidget(self.tabWidget, 1, 0, 25, 25) process_button = QtGui.QPushButton("Process") self.top_level_layout.addWidget(process_button, 0, 1) QtCore.QObject.connect(process_button, QtCore.SIGNAL("clicked()"), self.process) self.setCentralWidget(self.centralwidget) self.centralwidget.setLayout(self.top_level_layout) # Make Tabs in loop from button for i in range(0,10): name = 'tab' + str(i) self.tab_list.append(Tab(self.tabWidget, Worker(name))) self.tabWidget.addTab(self.tab_list[-1], name) # Do the processing def process(self): for tab in self.tab_list: tab.start_signal_emit() return if __name__ == "__main__": app = QtGui.QApplication([]) win = MainWindow() win.show() sys.exit(app.exec_())
感谢您的所有回答,我很欣赏每个人在描述他们认为是解决方案的想法时所采用的详细程度,但不幸的是,我还没有能够执行在在 GUI 上显示对象的属性时它们所属的对象.
但是,我从这篇文章中学到了很多东西,这让我意识到我目前拥有的线程版本正在挂起 GUI,因为 GUI 更新功能太大并且需要太多处理.
Thank you for all of the answers, I appreciate the level of detail that everyone has gone into in describing the idea they believe to be solution, but unfortunately I have not yet been able to perform these types of processes which operate on the object they belong to while displaying the object's attribute on a GUI.
However, I have learned a decent amount from this post, which allowed me to realize that the threaded version I have at the moment is hanging the GUI since the GUI update function is too large and takes too much processing.
所以,我在我的多线程版本中采用了 QTimer() 方法,它的性能要好得多!我会建议任何面临类似问题的人至少尝试类似的事情.
So, I have taken the QTimer() approach to my multi-threaded version and it is performing much better! I would advise anyone facing similar problems to at least attempt something similar to this.
我不知道这种解决 GUI 更新问题的方法,现在它是对我所面临问题的伪或临时修复.
I was unaware of this approach to solving GUI update problems, and it is now a pseudo or temporary fix to the problem I am facing.
推荐答案
GUI 应用程序非常适合测试东西,因为它很容易生成新任务并可视化正在发生的事情,所以我写了一个小示例应用程序(屏幕截图,代码如下)因为我确实想自己学习.
A GUI application is perfect for testing stuff, as it is easy to spawn new tasks and visualize what is going on, so I wrote a little example app (Screenshot, code is below) as I did want to learn it for my self.
起初,我采取了与您类似的方法,尝试实现消费者/生产者模式,我在后台进程中苦苦挣扎,执行无限循环以等待新工作,并为自己负责来回通信.然后我发现了 Pool 接口,然后我可以用几行代码替换所有那些可怕的代码.您只需要一个池和几个回调:
At first, i took a similar approach as yours, trying to implement the Consumer/Producer pattern and I struggeled with Background Processes doing endless loops to wait for new jobs and took care of communication back and forth for myself. Then I found out about the Pool Interface and then I could replace all that hidious code with just a few lines. All you need is a single pool and a few callbacks:
#!/usr/bin/env python3 import multiprocessing, time, random, sys from PySide.QtCore import * # equivalent: from PyQt4.QtCore import * from PySide.QtGui import * # equivalent: from PyQt4.QtGui import * def compute(num): print("worker() started at %d" % num) random_number = random.randint(1, 6) if random_number in (2, 4, 6): raise Exception('Random Exception in _%d' % num) time.sleep(random_number) return num class MainWindow(QMainWindow): def __init__(self): QMainWindow.__init__(self) self.toolBar = self.addToolBar("Toolbar") self.toolBar.addAction(QAction('Add Task', self, triggered=self.addTask)) self.list = QListWidget() self.setCentralWidget(self.list) # Pool of Background Processes self.pool = multiprocessing.Pool(processes=4) def addTask(self): num_row = self.list.count() self.pool.apply_async(func=compute, args=(num_row,), callback=self.receiveResult, error_callback=self.receiveException) item = QListWidgetItem("item %d" % num_row) item.setForeground(Qt.gray) self.list.addItem(item) def receiveResult(self, result): assert isinstance(result, int) print("end_work(), where result is %s" % result) self.list.item(result).setForeground(Qt.darkGreen) def receiveException(self, exception): error = str(exception) _pos = error.find('_') + 1 num_row = int(error[_pos:]) item = self.list.item(num_row) item.setForeground(Qt.darkRed) item.setText(item.text() + ' Retry...') self.pool.apply_async(func=compute, args=(num_row,), callback=self.receiveResult, error_callback=self.receiveException) if __name__ == '__main__': app = QApplication(sys.argv) main_window = MainWindow() main_window.show() sys.exit(app.exec_())
我做了另一个例子,使用 QTimer 而不是回调,定期检查队列中的条目,更新 QProgressBar:
I did another example using a QTimer instead of Callbacks, checking periodically for Entries in a Queue, updating a QProgressBar:
#!/usr/bin/env python3 import multiprocessing, multiprocessing.pool, time, random, sys from PySide.QtCore import * from PySide.QtGui import * def compute(num_row): print("worker started at %d" % num_row) random_number = random.randint(1, 10) for second in range(random_number): progress = float(second) / float(random_number) * 100 compute.queue.put((num_row, progress,)) time.sleep(1) compute.queue.put((num_row, 100)) def pool_init(queue): # see http://stackoverflow.com/a/3843313/852994 compute.queue = queue class MainWindow(QMainWindow): def __init__(self): QMainWindow.__init__(self) self.toolBar = self.addToolBar("Toolbar") self.toolBar.addAction(QAction('Add Task', self, triggered=self.addTask)) self.table = QTableWidget() self.table.verticalHeader().hide() self.table.setColumnCount(2) self.setCentralWidget(self.table) # Pool of Background Processes self.queue = multiprocessing.Queue() self.pool = multiprocessing.Pool(processes=4, initializer=pool_init, initargs=(self.queue,)) # Check for progress periodically self.timer = QTimer() self.timer.timeout.connect(self.updateProgress) self.timer.start(2000) def addTask(self): num_row = self.table.rowCount() self.pool.apply_async(func=compute, args=(num_row,)) label = QLabel("Queued") bar = QProgressBar() bar.setValue(0) self.table.setRowCount(num_row + 1) self.table.setCellWidget(num_row, 0, label) self.table.setCellWidget(num_row, 1, bar) def updateProgress(self): if self.queue.empty(): return num_row, progress = self.queue.get() # unpack print("received progress of %s at %s" % (progress, num_row)) label = self.table.cellWidget(num_row, 0) bar = self.table.cellWidget(num_row, 1) bar.setValue(progress) if progress == 100: label.setText('Finished') elif label.text() == 'Queued': label.setText('Downloading') self.updateProgress() # recursion if __name__ == '__main__': app = QApplication(sys.argv) main_window = MainWindow() main_window.show() sys.exit(app.exec_())