Skip to content

Commit

Permalink
allow connect_multiprocess() and connect_thread() to pickle
Browse files Browse the repository at this point in the history
Nested Function fails pickle

  File "C:\Python\lib\site-packages\rpyc-4.1.5-py3.8.egg\rpyc\utils\factory.py", line 327, in connect_multiprocess
    t.start()
  File "C:\Python\lib\multiprocessing\process.py", line 121, in start
    self._popen = self._Popen(self)
  File "C:\Python\lib\multiprocessing\context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Python\lib\multiprocessing\context.py", line 327, in _Popen
    return Popen(process_obj)
  File "C:\Python\lib\multiprocessing\popen_spawn_win32.py", line 93, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Python\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
AttributeError: Can't pickle local object 'connect_multiprocess.<locals>.server'
  • Loading branch information
Dan Dees committed Nov 2, 2020
1 parent e20074a commit 4f2c53c
Showing 1 changed file with 28 additions and 23 deletions.
51 changes: 28 additions & 23 deletions rpyc/utils/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,32 @@ def connect_subproc(args, service=VoidService, config={}):
return conn


# callable, picklable functor
# was un-pickleable nested function
class _ServerFunctor:
"""Functor for use in connect calls"""

def __init__(self, listener, service, config, args=None):
""" Holds functor inputs """
self.listener = listener
self.service = service
self.config = config
self.args = args

def __call__(self):
""" connects to process/socket """
with closing(self.listener):
client = self.listener.accept()[0]
conn = connect_stream(SocketStream(client), service=self.service, config=self.config)
try:
if self.args is not None:
for k in self.args:
conn._local_root.exposed_namespace[k] = self.args[k]
conn.serve_all()
except KeyboardInterrupt:
interrupt_main()


def connect_thread(service=VoidService, config={}, remote_service=VoidService, remote_config={}):
"""starts an rpyc server on a new thread, bound to an arbitrary port,
and connects to it over a socket.
Expand All @@ -277,17 +303,7 @@ def connect_thread(service=VoidService, config={}, remote_service=VoidService, r
listener.bind(("localhost", 0))
listener.listen(1)

def server(listener=listener):
with closing(listener):
client = listener.accept()[0]
conn = connect_stream(SocketStream(client), service=remote_service,
config=remote_config)
try:
conn.serve_all()
except KeyboardInterrupt:
interrupt_main()

spawn(server)
spawn(_ServerFunctor(listener, remote_service, remote_config))
host, port = listener.getsockname()
return connect(host, port, service=service, config=config)

Expand All @@ -312,18 +328,7 @@ def connect_multiprocess(service=VoidService, config={}, remote_service=VoidServ
listener.bind(("localhost", 0))
listener.listen(1)

def server(listener=listener, args=args):
with closing(listener):
client = listener.accept()[0]
conn = connect_stream(SocketStream(client), service=remote_service, config=remote_config)
try:
for k in args:
conn._local_root.exposed_namespace[k] = args[k]
conn.serve_all()
except KeyboardInterrupt:
interrupt_main()

t = Process(target=server)
t = Process(target=_ServerFunctor(listener, remote_service, remote_config, args))
t.start()
host, port = listener.getsockname()
return connect(host, port, service=service, config=config)

0 comments on commit 4f2c53c

Please sign in to comment.