Skip to content

Commit

Permalink
[IMP] queue_job: detect jobs runned by workers that have been killed
Browse files Browse the repository at this point in the history
  • Loading branch information
AnizR committed Dec 3, 2024
1 parent 20b0e93 commit 8aa3e58
Showing 1 changed file with 30 additions and 0 deletions.
30 changes: 30 additions & 0 deletions queue_job/models/queue_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import random
from datetime import datetime, timedelta

import psutil

from odoo import _, api, exceptions, fields, models
from odoo.osv import expression
from odoo.tools import config, html_escape
Expand Down Expand Up @@ -417,6 +419,31 @@ def autovacuum(self):
break
return True

def _check_job_worker_pid(self):
"""
Checking that job's worker pids still exist
If not, it means that the worker has been killed
"""
jobs = self.env["queue.job"].search(

Check warning on line 427 in queue_job/models/queue_job.py

View check run for this annotation

Codecov / codecov/patch

queue_job/models/queue_job.py#L427

Added line #L427 was not covered by tests
[
("state", "=", "started"),
("worker_pid", "!=", False),
]
)

for job in jobs:
if not psutil.pid_exists(job.worker_pid):
_job = Job.load(job.env, job.uuid)
_job.set_failed(

Check warning on line 437 in queue_job/models/queue_job.py

View check run for this annotation

Codecov / codecov/patch

queue_job/models/queue_job.py#L436-L437

Added lines #L436 - L437 were not covered by tests
exc_name=_("WorkerError"),
exc_info=_(
"The worker executing the job was killed."
"This is likely to be due to a timeout"
),
exc_message=_("Associated worker was killed"),
)
_job.store()

Check warning on line 445 in queue_job/models/queue_job.py

View check run for this annotation

Codecov / codecov/patch

queue_job/models/queue_job.py#L445

Added line #L445 was not covered by tests

def requeue_stuck_jobs(self, enqueued_delta=1, started_delta=0):
"""Fix jobs that are in a bad states
Expand All @@ -431,6 +458,9 @@ def requeue_stuck_jobs(self, enqueued_delta=1, started_delta=0):
"""
if started_delta == -1:
started_delta = (config["limit_time_real"] // 60) + 1

self._check_job_worker_pid()

Check warning on line 462 in queue_job/models/queue_job.py

View check run for this annotation

Codecov / codecov/patch

queue_job/models/queue_job.py#L462

Added line #L462 was not covered by tests

return self._get_stuck_jobs_to_requeue(
enqueued_delta=enqueued_delta, started_delta=started_delta
).requeue()
Expand Down

0 comments on commit 8aa3e58

Please sign in to comment.