Skip to content

Commit

Permalink
Fix autoworker with subprocess
Browse files Browse the repository at this point in the history
  • Loading branch information
ecarreras committed May 27, 2022
1 parent c96a4e0 commit 008a424
Showing 1 changed file with 19 additions and 15 deletions.
34 changes: 19 additions & 15 deletions autoworker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import multiprocessing as mp
from uuid import uuid4
import subprocess

from redis import Redis
from rq.defaults import DEFAULT_RESULT_TTL
Expand Down Expand Up @@ -89,25 +90,28 @@ def num_connected_workers(self):
def worker(self):
"""Internal target to use in multiprocessing
"""
cleanup_ghosts(self.connection)
worker_class = import_attribute(self.config['worker_class'])

rq_params = {
'-b': '',
'-w': self.config['worker_class'],
'-n': '{}-auto'.format(uuid4().hex),
'-u': self.config['redis_url'],
'--results-ttl': self.default_result_ttl

}
if self.skip_failed:
exception_handlers = []
else:
exception_handlers = None
rq_params['--disable-default-exception-handler'] = ''

name = '{}-auto'.format(uuid4().hex)
worker = worker_class(
[self.queue], name=name, connection=self.connection,
exception_handlers=exception_handlers,
default_result_ttl=self.default_result_ttl
)
worker.work(burst=True)
command = ['rq', 'worker']
for k, v in rq_params.items():
command.append(str(k))
if v:
command.append(str(v))
command.append(self.queue.name)
subprocess.call(command)

def _create_worker(self):
child_pid = os.fork()
if child_pid == 0:
self.worker()
self.worker()

def work(self):
"""Spawn the multiple workers using multiprocessing and `self.worker`_
Expand Down

0 comments on commit 008a424

Please sign in to comment.