From 2fccc8d6041bdd8058df4e0a7d73c6db6c1835ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Tue, 2 Jul 2024 13:50:11 +0200 Subject: [PATCH 1/4] [FIX] queue_job: close connection to databases without job queue Without this, we leak connections to Databases that don't have queue_job installed. --- queue_job/jobrunner/runner.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 47417caa4f..a2be314815 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -423,6 +423,8 @@ def initialize_databases(self): for job_data in cr: self.channel_manager.notify(db_name, *job_data) _logger.info("queue job runner ready for db %s", db_name) + else: + db.close() def run_jobs(self): now = _odoo_now() From 72c72a326216090a9cc397c01ceb2c443e3e82d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Tue, 2 Jul 2024 13:45:08 +0200 Subject: [PATCH 2/4] [FIX] queue_job: handle exceptions in Database constructor Without this we risk connection leaks in case of exceptions in the constructor. --- queue_job/jobrunner/runner.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index a2be314815..a3ffda688c 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -264,10 +264,14 @@ def __init__(self, db_name): self.db_name = db_name connection_info = _connection_info_for(db_name) self.conn = psycopg2.connect(**connection_info) - self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) - self.has_queue_job = self._has_queue_job() - if self.has_queue_job: - self._initialize() + try: + self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) + self.has_queue_job = self._has_queue_job() + if self.has_queue_job: + self._initialize() + except BaseException: + self.close() + raise def close(self): # pylint: disable=except-pass From d3bbcecad008c22c9b9bbdaf2ff58fb47a4214e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Tue, 2 Jul 2024 13:06:14 +0200 Subject: [PATCH 3/4] [IMP] queue_job: HA job runner using session level advisory lock --- queue_job/jobrunner/runner.py | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index a3ffda688c..afaaa90200 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -159,12 +159,17 @@ SELECT_TIMEOUT = 60 ERROR_RECOVERY_DELAY = 5 +PG_ADVISORY_LOCK_ID = 2293787760715711918 _logger = logging.getLogger(__name__) select = selectors.DefaultSelector +class MasterElectionLost(Exception): + pass + + # Unfortunately, it is not possible to extend the Odoo # server command line arguments, so we resort to environment variables # to configure the runner (channels mostly). @@ -268,6 +273,7 @@ def __init__(self, db_name): self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) self.has_queue_job = self._has_queue_job() if self.has_queue_job: + self._acquire_master_lock() self._initialize() except BaseException: self.close() @@ -284,6 +290,14 @@ def close(self): pass self.conn = None + def _acquire_master_lock(self): + """Acquire the master runner lock or raise MasterElectionLost""" + with closing(self.conn.cursor()) as cr: + cr.execute("SELECT pg_try_advisory_lock(%s)", (PG_ADVISORY_LOCK_ID,)) + if not cr.fetchone()[0]: + msg = f"could not acquire master runner lock on {self.db_name}" + raise MasterElectionLost(msg) + def _has_queue_job(self): with closing(self.conn.cursor()) as cr: cr.execute( @@ -406,7 +420,7 @@ def get_db_names(self): db_names = config["db_name"].split(",") else: db_names = odoo.service.db.list_dbs(True) - return db_names + return sorted(db_names) def close_databases(self, remove_jobs=True): for db_name, db in self.db_by_name.items(): @@ -515,7 +529,7 @@ def run(self): while not self._stop: # outer loop does exception recovery try: - _logger.info("initializing database connections") + _logger.debug("initializing database connections") # TODO: how to detect new databases or databases # on which queue_job is installed after server start? self.initialize_databases() @@ -530,6 +544,14 @@ def run(self): except InterruptedError: # Interrupted system call, i.e. KeyboardInterrupt during select self.stop() + except MasterElectionLost as e: + _logger.debug( + "master election lost: %s, sleeping %ds and retrying", + e, + ERROR_RECOVERY_DELAY, + ) + self.close_databases() + time.sleep(ERROR_RECOVERY_DELAY) except Exception: _logger.exception( "exception: sleeping %ds and retrying", ERROR_RECOVERY_DELAY From 992bc9bfc0b80c83eee1aa2079c3b44dd8cd5ea1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Tue, 2 Jul 2024 20:35:11 +0200 Subject: [PATCH 4/4] [IMP] queue_job: make sorting more explicit --- queue_job/jobrunner/runner.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index afaaa90200..3ffd7d035f 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -420,7 +420,7 @@ def get_db_names(self): db_names = config["db_name"].split(",") else: db_names = odoo.service.db.list_dbs(True) - return sorted(db_names) + return db_names def close_databases(self, remove_jobs=True): for db_name, db in self.db_by_name.items(): @@ -433,7 +433,8 @@ def close_databases(self, remove_jobs=True): self.db_by_name = {} def initialize_databases(self): - for db_name in self.get_db_names(): + for db_name in sorted(self.get_db_names()): + # sorting is important to avoid deadlocks in acquiring the master lock db = Database(db_name) if db.has_queue_job: self.db_by_name[db_name] = db