Skip to content

Commit

Permalink
Use asyncio.Lock for forward_model_ok
Browse files Browse the repository at this point in the history
This makes sure that we will not run more than 1 internalization job at a time.
  • Loading branch information
xjules committed Aug 23, 2024
1 parent 31d3aa2 commit 520380e
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 11 deletions.
30 changes: 19 additions & 11 deletions src/ert/scheduler/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,17 +223,25 @@ async def _verify_checksum(self, timeout: int = 120) -> None: # noqa: ASYNC109
logger.error(f"Disk synchronization failed for {file_path}")

async def _handle_finished_forward_model(self) -> None:
callback_status, status_msg = await forward_model_ok(self.real.run_arg)
if self._callback_status_msg:
self._callback_status_msg = status_msg
else:
self._callback_status_msg += f"\nstatus from done callback: {status_msg}"

if callback_status == LoadStatus.LOAD_SUCCESSFUL:
await self._send(JobState.COMPLETED)
else:
assert callback_status == LoadStatus.LOAD_FAILURE
await self._send(JobState.FAILED)
if self._scheduler._fmok_lock:
await self._scheduler._fmok_lock.acquire()
try:
callback_status, status_msg = await forward_model_ok(self.real.run_arg)
if self._callback_status_msg:
self._callback_status_msg = status_msg
else:
self._callback_status_msg += (
f"\nstatus from done callback: {status_msg}"
)

if callback_status == LoadStatus.LOAD_SUCCESSFUL:
await self._send(JobState.COMPLETED)
else:
assert callback_status == LoadStatus.LOAD_FAILURE
await self._send(JobState.FAILED)
finally:
if self._scheduler._fmok_lock:
self._scheduler._fmok_lock.release()

async def _handle_failure(self) -> None:
assert self._requested_max_submit is not None
Expand Down
4 changes: 4 additions & 0 deletions src/ert/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def __init__(
self._average_job_runtime: float = 0
self._completed_jobs_num: int = 0
self.completed_jobs: asyncio.Queue[int] = asyncio.Queue()
self._fmok_lock: Optional[asyncio.Lock] = None

self._cancelled = False
if max_submit < 0:
Expand Down Expand Up @@ -275,6 +276,9 @@ async def execute(
scheduling_tasks.append(asyncio.create_task(self._update_avg_job_runtime()))

sem = asyncio.BoundedSemaphore(self._max_running or len(self._jobs))
# this lock is to assure that no more than 1 task
# does internalization at a time
self._fmok_lock = asyncio.Lock()
for iens, job in self._jobs.items():
self._job_tasks[iens] = asyncio.create_task(
job.run(sem, self._max_submit), name=f"job-{iens}_task"
Expand Down

0 comments on commit 520380e

Please sign in to comment.