From f43b30b13f0f77f8f6d2ea556684523d5e8428fe Mon Sep 17 00:00:00 2001 From: Eduard Carrerars Date: Fri, 27 May 2022 13:05:34 +0200 Subject: [PATCH] Fix autoworker with subprocess --- autoworker/__init__.py | 46 ++++++++++++++++++++---------------------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/autoworker/__init__.py b/autoworker/__init__.py index cfe2bc6..71ebeee 100644 --- a/autoworker/__init__.py +++ b/autoworker/__init__.py @@ -3,6 +3,7 @@ import os import multiprocessing as mp from uuid import uuid4 +import subprocess from redis import Redis from rq.defaults import DEFAULT_RESULT_TTL @@ -86,37 +87,34 @@ def num_connected_workers(self): ) ]) - def worker(self): + @property + def worker_command(self): """Internal target to use in multiprocessing """ - cleanup_ghosts(self.connection) - worker_class = import_attribute(self.config['worker_class']) - if self.skip_failed: - exception_handlers = [] - else: - exception_handlers = None - name = '{}-auto'.format(uuid4().hex) - worker = worker_class( - [self.queue], name=name, connection=self.connection, - exception_handlers=exception_handlers, - default_result_ttl=self.default_result_ttl - ) - worker.work(burst=True) + rq_params = { + '-b': '', + '-w': self.config['worker_class'], + '-n': '{}-auto'.format(uuid4().hex), + '-u': self.config['redis_url'], + '--results-ttl': self.default_result_ttl + + } + if self.skip_failed: + rq_params['--disable-default-exception-handler'] = '' - def _create_worker(self): - child_pid = os.fork() - if child_pid == 0: - self.worker() + command = ['rq', 'worker'] + for k, v in rq_params.items(): + command.append(str(k)) + if v: + command.append(str(v)) + command.append(self.queue.name) + return command def work(self): """Spawn the multiple workers using multiprocessing and `self.worker`_ targget """ max_procs = self.max_procs - self.num_connected_workers() - self.processes = [ - mp.Process(target=self._create_worker) for _ in range(0, max_procs) - ] - for proc in self.processes: - proc.daemon = False - proc.start() + for _ in range(0, max_procs): + subprocess.Popen(self.worker_command, close_fds=True)