-
-
Notifications
You must be signed in to change notification settings - Fork 471
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[16.0][IMP] queue_job: remove dead jobs requeuer cron and automatically requeue dead jobs #716
base: 16.0
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -238,6 +238,34 @@ | |
recordset = cls.db_records_from_uuids(env, job_uuids) | ||
return {cls._load_from_db_record(record) for record in recordset} | ||
|
||
def lock(self): | ||
self.env.cr.execute( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please add the intent of this |
||
""" | ||
SELECT | ||
* | ||
FROM | ||
queue_job_locks | ||
WHERE | ||
id in ( | ||
SELECT | ||
id | ||
FROM | ||
queue_job | ||
WHERE | ||
uuid = %s | ||
AND state='started' | ||
) | ||
FOR UPDATE; | ||
""", | ||
[self.uuid], | ||
) | ||
AnizR marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# 1 job should be locked | ||
if not 1 == len(self.env.cr.fetchall()): | ||
raise RetryableJobError( | ||
f"Trying to lock job that wasn't started, uuid: {self.uuid}" | ||
) | ||
|
||
@classmethod | ||
def _load_from_db_record(cls, job_db_record): | ||
stored = job_db_record | ||
|
@@ -517,6 +545,11 @@ | |
|
||
The job is executed with the user which has initiated it. | ||
""" | ||
if self.max_retries and self.retry >= self.max_retries: | ||
raise FailedJobError( | ||
"Job: %s, Max. retries (%d) reached" % (self.uuid, self.max_retries) | ||
) | ||
|
||
self.retry += 1 | ||
try: | ||
self.result = self.func(*tuple(self.args), **self.kwargs) | ||
|
@@ -820,6 +853,23 @@ | |
self.date_started = datetime.now() | ||
self.worker_pid = os.getpid() | ||
|
||
# add job to list of lockable jobs | ||
self.env.cr.execute( | ||
""" | ||
INSERT INTO | ||
queue_job_locks (id) | ||
SELECT | ||
id | ||
FROM | ||
queue_job | ||
WHERE | ||
uuid = %s | ||
ON CONFLICT(id) | ||
DO NOTHING; | ||
""", | ||
[self.uuid], | ||
) | ||
|
||
def set_done(self, result=None): | ||
self.state = DONE | ||
self.exc_name = None | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -114,22 +114,6 @@ | |
* After creating a new database or installing queue_job on an | ||
existing database, Odoo must be restarted for the runner to detect it. | ||
|
||
* When Odoo shuts down normally, it waits for running jobs to finish. | ||
However, when the Odoo server crashes or is otherwise force-stopped, | ||
running jobs are interrupted while the runner has no chance to know | ||
they have been aborted. In such situations, jobs may remain in | ||
``started`` or ``enqueued`` state after the Odoo server is halted. | ||
Since the runner has no way to know if they are actually running or | ||
not, and does not know for sure if it is safe to restart the jobs, | ||
it does not attempt to restart them automatically. Such stale jobs | ||
therefore fill the running queue and prevent other jobs to start. | ||
You must therefore requeue them manually, either from the Jobs view, | ||
or by running the following SQL statement *before starting Odoo*: | ||
|
||
.. code-block:: sql | ||
|
||
update queue_job set state='pending' where state in ('started', 'enqueued') | ||
|
||
.. rubric:: Footnotes | ||
|
||
.. [1] From a security standpoint, it is safe to have an anonymous HTTP | ||
|
@@ -155,7 +139,7 @@ | |
from odoo.tools import config | ||
|
||
from . import queue_job_config | ||
from .channels import ENQUEUED, NOT_DONE, PENDING, ChannelManager | ||
from .channels import ENQUEUED, NOT_DONE, ChannelManager | ||
|
||
SELECT_TIMEOUT = 60 | ||
ERROR_RECOVERY_DELAY = 5 | ||
|
@@ -207,35 +191,14 @@ | |
|
||
|
||
def _async_http_get(scheme, host, port, user, password, db_name, job_uuid): | ||
# Method to set failed job (due to timeout, etc) as pending, | ||
# to avoid keeping it as enqueued. | ||
def set_job_pending(): | ||
connection_info = _connection_info_for(db_name) | ||
conn = psycopg2.connect(**connection_info) | ||
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) | ||
with closing(conn.cursor()) as cr: | ||
cr.execute( | ||
"UPDATE queue_job SET state=%s, " | ||
"date_enqueued=NULL, date_started=NULL " | ||
"WHERE uuid=%s and state=%s " | ||
"RETURNING uuid", | ||
(PENDING, job_uuid, ENQUEUED), | ||
) | ||
if cr.fetchone(): | ||
_logger.warning( | ||
"state of job %s was reset from %s to %s", | ||
job_uuid, | ||
ENQUEUED, | ||
PENDING, | ||
) | ||
|
||
# TODO: better way to HTTP GET asynchronously (grequest, ...)? | ||
# if this was python3 I would be doing this with | ||
# asyncio, aiohttp and aiopg | ||
def urlopen(): | ||
url = "{}://{}:{}/queue_job/runjob?db={}&job_uuid={}".format( | ||
scheme, host, port, db_name, job_uuid | ||
) | ||
# pylint: disable=except-pass | ||
try: | ||
auth = None | ||
if user: | ||
|
@@ -249,10 +212,9 @@ | |
# for codes between 500 and 600 | ||
response.raise_for_status() | ||
except requests.Timeout: | ||
set_job_pending() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A timeout here is normal behaviour, so we don't want to log it as an exception. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. right 👍 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please add a comment on why there is |
||
pass | ||
except Exception: | ||
_logger.exception("exception in GET %s", url) | ||
set_job_pending() | ||
|
||
thread = threading.Thread(target=urlopen) | ||
thread.daemon = True | ||
|
@@ -343,6 +305,60 @@ | |
(ENQUEUED, uuid), | ||
) | ||
|
||
def requeue_dead_jobs(self): | ||
""" | ||
Set started and enqueued jobs but not locked to pending | ||
|
||
A job is locked when it's being executed | ||
When a job is killed, it releases the lock | ||
|
||
Adding a buffer on 'date_enqueued' to check | ||
that it has been enqueued for more than 10sec. | ||
This prevents from requeuing jobs before they are actually started. | ||
|
||
When Odoo shuts down normally, it waits for running jobs to finish. | ||
However, when the Odoo server crashes or is otherwise force-stopped, | ||
running jobs are interrupted while the runner has no chance to know | ||
they have been aborted. | ||
""" | ||
|
||
with closing(self.conn.cursor()) as cr: | ||
query = """ | ||
UPDATE | ||
queue_job | ||
SET | ||
state='pending', | ||
retry=(CASE WHEN state='started' THEN retry+1 ELSE retry END) | ||
WHERE | ||
id in ( | ||
SELECT | ||
id | ||
FROM | ||
queue_job_locks | ||
WHERE | ||
id in ( | ||
SELECT | ||
id | ||
FROM | ||
queue_job | ||
WHERE | ||
state IN ('enqueued','started') | ||
AND date_enqueued < | ||
(now() AT TIME ZONE 'utc' - INTERVAL '10 sec') | ||
) | ||
FOR UPDATE SKIP LOCKED | ||
AnizR marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) | ||
RETURNING uuid | ||
""" | ||
|
||
cr.execute(query) | ||
|
||
for (uuid,) in cr.fetchall(): | ||
_logger.warning( | ||
"Re-queued job with uuid: %s", | ||
uuid, | ||
) | ||
|
||
|
||
class QueueJobRunner(object): | ||
def __init__( | ||
|
@@ -424,6 +440,11 @@ | |
self.channel_manager.notify(db_name, *job_data) | ||
_logger.info("queue job runner ready for db %s", db_name) | ||
|
||
def requeue_dead_jobs(self): | ||
for db in self.db_by_name.values(): | ||
if db.has_queue_job: | ||
db.requeue_dead_jobs() | ||
|
||
sbidoul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
def run_jobs(self): | ||
now = _odoo_now() | ||
for job in self.channel_manager.get_jobs_to_run(now): | ||
|
@@ -516,6 +537,7 @@ | |
_logger.info("database connections ready") | ||
# inner loop does the normal processing | ||
while not self._stop: | ||
self.requeue_dead_jobs() | ||
self.process_notifications() | ||
self.run_jobs() | ||
self.wait_notification() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the manifest, the version is 2.8.0, could you update this file accordingly @AnizR? |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) | ||
|
||
|
||
def migrate(cr, version): | ||
# Create job lock table | ||
cr.execute( | ||
""" | ||
CREATE TABLE IF NOT EXISTS queue_job_locks ( | ||
id INT PRIMARY KEY, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @hparfr we probably could but I don't think it would significantly change the main query in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks for the clarification |
||
CONSTRAINT | ||
queue_job_locks_queue_job_id_fkey | ||
FOREIGN KEY (id) | ||
REFERENCES queue_job (id) ON DELETE CASCADE | ||
); | ||
""" | ||
) | ||
|
||
# Deactivate cron garbage collector | ||
cr.execute( | ||
""" | ||
UPDATE | ||
ir_cron | ||
SET | ||
active=False | ||
WHERE id IN ( | ||
SELECT res_id | ||
FROM | ||
ir_model_data | ||
WHERE | ||
module='queue_job' | ||
AND model='ir.cron' | ||
AND name='ir_cron_queue_job_garbage_collector' | ||
); | ||
""" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this become
to clean up upgrades / avoid filling cron logs with errors since requeue_stuck_jobs method is gone?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have archived the cron in the
pre-migration.py
. Therefore, there won't be any error.I always prefer to archive (set active to false) rather than deleting.