问题描述
我有一个简单的 UDPServer,它与 multiprocessing 一起工作.
I have simple UDPServer, which works with multiprocessing.
我想创建一个包含所有客户信息的列表.
I want to create a list, that contains information about all clients.
我使用Manager,但我不明白,如何在列表中追加信息 - 我需要转移Manager的对象来处理,但是如何?我使用新属性的方式不起作用.
I use Manager, but I don't understand, how to append information in list - I need transfer Manager`s object to handle, but how? My way with new attribute does not work.
import multiprocessing from socketserver import UDPServer, ForkingMixIn, DatagramRequestHandler from socket import socket, AF_INET, SOCK_DGRAM from settings import host, port, number_of_connections class ChatHandler(DatagramRequestHandler): def handle(self): cur_process = multiprocessing.current_process() data = self.request[0].strip() socket = self.request[1] ChatHandler.clients.append(self.client_address) # error here print(ChatHandler.clients) class ChatServer(ForkingMixIn, UDPServer): pass if __name__ == '__main__': server = ChatServer((host, port), ChatHandler) ChatHandler.clients = multiprocessing.Manager().list() server_process = multiprocessing.Process(target=server.serve_forever) server_process.daemon = False server_process.start()
如何解决这个问题?谢谢!
How to fix that? Thanks!
输出:
Exception happened during processing of request from ('127.0.0.1', 55679) Traceback (most recent call last): File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/managers.py", line 724, in _callmethod conn = self._tls.connection AttributeError: 'ForkAwareLocal' object has no attribute 'connection' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/socketserver.py", line 584, in process_request self.finish_request(request, client_address) File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/socketserver.py", line 344, in finish_request self.RequestHandlerClass(request, client_address, self) File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/socketserver.py", line 665, in __init__ self.handle() File "server.py", line 15, in handle ChatHandler.clients.append(self.client_address) File "<string>", line 2, in append File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/managers.py", line 728, in _callmethod self._connect() File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/managers.py", line 715, in _connect conn = self._Client(self._token.address, authkey=self._authkey) File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/connection.py", line 495, in Client c = SocketClient(address) File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/connection.py", line 624, in SocketClient s.connect(address) FileNotFoundError: [Errno 2] No such file or directory
推荐答案
问题是你让主进程在你启动工作进程后立即完成它的执行.当创建 multiprocessing.Manager 的进程完成执行时,Manager 服务器将关闭,这意味着您的共享列表对象现在已无用.发生这种情况是因为 Manager 对象将它的 shutdown 函数注册为 multiprocessing 模块的终结器",这意味着它将在进程退出.这是在 BaseManager.__init__ 中注册它的代码:
The problem is that you're letting the main process finish its execution immediately after you start the worker process. When the process that created the multiprocessing.Manager finishes its execution, the Manager server gets shut down, which means your shared list object is now useless. This happens because the Manager object registers it's shutdown function as a "finalizer" with the multiprocessing module, which means it will be run just before the process exits. Here's the code that registers it, in BaseManager.__init__:
# register a finalizer self._state.value = State.STARTED self.shutdown = util.Finalize( self, type(self)._finalize_manager, args=(self._process, self._address, self._authkey, self._state, self._Client), exitpriority=0 )
这是实际关闭的代码:
@staticmethod def _finalize_manager(process, address, authkey, state, _Client): ''' Shutdown the manager process; will be registered as a finalizer ''' if process.is_alive(): util.info('sending shutdown message to manager') try: conn = _Client(address, authkey=authkey) try: dispatch(conn, None, 'shutdown') finally: conn.close() except Exception: pass process.join(timeout=1.0) if process.is_alive(): util.info('manager still alive') if hasattr(process, 'terminate'): util.info('trying to `terminate()` manager process') process.terminate() process.join(timeout=0.1) if process.is_alive(): util.info('manager still alive after terminate') state.value = State.SHUTDOWN try: del BaseProxy._address_to_local[address] except KeyError: pass
解决方法很简单 - 不要让主进程立即完成,通过调用 server_process.join() 启动运行 UDP 服务器的进程:
The fix is simple - don't let the main process complete immediately you start the process that runs the UDP server, by calling server_process.join():
import multiprocessing from socketserver import UDPServer, ForkingMixIn, DatagramRequestHandler from socket import socket, AF_INET, SOCK_DGRAM from settings import host, port, number_of_connections class ChatHandler(DatagramRequestHandler): def handle(self): cur_process = multiprocessing.current_process() data = self.request[0].strip() socket = self.request[1] ChatHandler.clients.append(self.client_address) # error here print(ChatHandler.clients) class ChatServer(ForkingMixIn, UDPServer): pass if __name__ == '__main__': server = ChatServer((host, port), ChatHandler) ChatHandler.clients = multiprocessing.Manager().list() server_process = multiprocessing.Process(target=server.serve_forever) server_process.daemon = False server_process.start() server_process.join() # This fixes the issue.