Skip to content

Commit 06bf0da

Browse files
committed
fix #291
1 parent 3eefa7a commit 06bf0da

File tree

3 files changed

+33
-9
lines changed

3 files changed

+33
-9
lines changed

scheduler/helpers/queues/getters.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,13 @@ def _get_connection(config: QueueConfiguration, use_strict_broker: bool = False)
4646
)
4747

4848

49+
def refresh_queue_connection(queue: Queue) -> None:
50+
"""Refreshes the connection of a given Queue"""
51+
queue_settings = get_queue_configuration(queue.name)
52+
connection = _get_connection(queue_settings)
53+
queue.refresh_connection(connection)
54+
55+
4956
def get_queue(name: str = "default") -> Queue:
5057
"""Returns an DjangoQueue using parameters defined in `SCHEDULER_QUEUES`"""
5158
queue_settings = get_queue_configuration(name)

scheduler/helpers/queues/queue_logic.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,15 @@ def __init__(self, connection: ConnectionType, name: str, is_async: bool = True)
9191
self.scheduled_job_registry = ScheduledJobRegistry(connection=self.connection, name=self.name)
9292
self.canceled_job_registry = CanceledJobRegistry(connection=self.connection, name=self.name)
9393

94+
def refresh_connection(self, connection: ConnectionType) -> None:
95+
self.connection = connection
96+
self.queued_job_registry.connection = connection
97+
self.active_job_registry.connection = connection
98+
self.failed_job_registry.connection = connection
99+
self.finished_job_registry.connection = connection
100+
self.scheduled_job_registry.connection = connection
101+
self.canceled_job_registry.connection = connection
102+
94103
def __len__(self) -> int:
95104
return self.count
96105

scheduler/worker/worker.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from scheduler.types import ConnectionType, TimeoutErrorTypes, ConnectionErrorTypes, WatchErrorTypes, ResponseErrorTypes
2525
from .commands import WorkerCommandsChannelListener
2626
from .scheduler import WorkerScheduler, SchedulerStatus
27+
from ..helpers.queues.getters import refresh_queue_connection
2728
from ..redis_models.lock import QueueLock
2829
from ..redis_models.worker import WorkerStatus
2930

@@ -116,14 +117,14 @@ def __init__(
116117
self.job_monitoring_interval: int = job_monitoring_interval
117118
self.maintenance_interval = maintenance_interval
118119

119-
connection = self._set_connection(connection)
120-
self.connection = connection
120+
self.connection = self._set_connection(connection)
121121

122122
self.queues = [
123123
(Queue(name=q, connection=connection) if isinstance(q, str) else q) for q in _ensure_list(queues)
124124
]
125125
self.name: str = name
126-
self._validate_name_uniqueness()
126+
if model is None:
127+
self._validate_name_uniqueness()
127128
self._ordered_queues = self.queues[:]
128129

129130
self._is_job_execution_process: bool = False
@@ -155,6 +156,9 @@ def __init__(
155156

156157
@property
157158
def _pid(self) -> int:
159+
if self._model is None:
160+
logger.debug(f"[Worker {self.name}]: Worker model is None, returning 0 as PID")
161+
return 0
158162
return self._model.pid
159163

160164
def should_run_maintenance_tasks(self) -> bool:
@@ -600,16 +604,19 @@ def fork_job_execution_process(self, job: JobModel, queue: Queue) -> None:
600604
os.environ["SCHEDULER_JOB_NAME"] = job.name
601605
if child_pid == 0: # Child process/Job executor process to run the job
602606
os.setsid()
603-
self._model.job_execution_process_pid = os.getpid()
604-
self._model.save(connection=self.connection)
605-
self.execute_in_separate_process(job, queue)
607+
refresh_queue_connection(queue)
608+
self.connection = queue.connection
609+
self._model.set_field("job_execution_process_pid", os.getpid(), connection=queue.connection)
610+
worker = Worker.from_model(self._model)
611+
worker.execute_in_separate_process(job, queue)
606612
os._exit(0) # just in case
607613
else: # Parent worker process
608614
logger.debug(
609615
f"[Worker {self.name}/{self._pid}]: Forking job execution process, job_execution_process_pid={child_pid}"
610616
)
617+
refresh_queue_connection(queue)
611618
self._model.job_execution_process_pid = child_pid
612-
self._model.save(connection=self.connection)
619+
self._model.save(connection=queue.connection)
613620
self.procline(f"Forked {child_pid} at {time.time()}")
614621

615622
def get_heartbeat_ttl(self, job: JobModel) -> int:
@@ -732,6 +739,7 @@ def execute_in_separate_process(self, job: JobModel, queue: Queue) -> None:
732739
random.seed()
733740
self.setup_job_execution_process_signals()
734741
self._is_job_execution_process = True
742+
job = JobModel.get(job.name, self.connection)
735743
try:
736744
self.perform_job(job, queue)
737745
except: # noqa
@@ -756,7 +764,7 @@ def worker_before_execution(self, job: JobModel, connection: ConnectionType) ->
756764
self._model.current_job_working_time = 0
757765
self._model.job_execution_process_pid = current_pid
758766
heartbeat_ttl = self.get_heartbeat_ttl(job)
759-
self._model.heartbeat(self.connection, heartbeat_ttl)
767+
self._model.heartbeat(connection, heartbeat_ttl)
760768
self.procline(
761769
f"[Worker {self.name}/{self._pid}]: Processing {job.func_name} from {job.queue_name} since {time.time()}"
762770
)
@@ -792,7 +800,7 @@ def handle_job_success(self, job: JobModel, return_value: Any, queue: Queue) ->
792800
self._model.completed_jobs += 1
793801
if job.started_at is not None and job.ended_at is not None:
794802
self._model.total_working_time_ms += (job.ended_at - job.started_at).microseconds / 1000.0
795-
self._model.save(connection=self.connection)
803+
self._model.save(connection=pipeline)
796804

797805
job.expire(job.success_ttl, connection=pipeline)
798806
logger.debug(f"[Worker {self.name}/{self._pid}]: Removing job {job.name} from active_job_registry")

0 commit comments

Comments
 (0)