From 317f16780d646f4979e2dcb6b663559a70822482 Mon Sep 17 00:00:00 2001 From: xjules Date: Fri, 23 Aug 2024 14:58:13 +0200 Subject: [PATCH] Use asyncio.Lock for forward_model_ok This makes sure that we will not run more than 1 internalization job at a time. --- src/ert/scheduler/job.py | 3 ++- src/ert/scheduler/scheduler.py | 4 ++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/ert/scheduler/job.py b/src/ert/scheduler/job.py index c48f576dd67..f524a8971b4 100644 --- a/src/ert/scheduler/job.py +++ b/src/ert/scheduler/job.py @@ -147,7 +147,8 @@ async def run(self, sem: asyncio.BoundedSemaphore, max_submit: int = 1) -> None: if self.returncode.result() == 0: if self._scheduler._manifest_queue is not None: await self._verify_checksum() - await self._handle_finished_forward_model() + async with self._scheduler._forward_model_ok_lock: + await self._handle_finished_forward_model() break if attempt < max_submit - 1: diff --git a/src/ert/scheduler/scheduler.py b/src/ert/scheduler/scheduler.py index 71ac4213c15..9982a7d935f 100644 --- a/src/ert/scheduler/scheduler.py +++ b/src/ert/scheduler/scheduler.py @@ -106,6 +106,10 @@ def __init__( self._completed_jobs_num: int = 0 self.completed_jobs: asyncio.Queue[int] = asyncio.Queue() + # this lock is to assure that no more than 1 task + # does internalization at a time + self._forward_model_ok_lock: asyncio.Lock = asyncio.Lock() + self._cancelled = False if max_submit < 0: raise ValueError(