Skip to content

Commit

Permalink
Have fm_runner's event reporter shutdown gracefully
Browse files Browse the repository at this point in the history
This commit fixes the issue where the logs would be spammed with errors
related to the websocket client being forcefully shut down before
closing the connection.
It also fixes the issue where the fm_runner was not killing the running
forward models when sigterm was signaled
  • Loading branch information
jonathan-eq committed Nov 27, 2024
1 parent 8f8a329 commit 9a89ae3
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 15 deletions.
20 changes: 15 additions & 5 deletions src/_ert/forward_model_runner/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def main(args):
)

job_runner = ForwardModelRunner(jobs_data)

signal.signal(signal.SIGTERM, lambda _, __: _stop_reporters_and_sigkill(reporters))
for job_status in job_runner.run(parsed_args.job):
logger.info(f"Job status: {job_status}")
for reporter in reporters:
Expand All @@ -147,9 +147,19 @@ def main(args):
print(
f"job_dispatch failed due to {oserror}. Stopping and cleaning up."
)
pgid = os.getpgid(os.getpid())
os.killpg(pgid, signal.SIGKILL)
_stop_reporters_and_sigkill(reporters)

if isinstance(job_status, Finish) and not job_status.success():
pgid = os.getpgid(os.getpid())
os.killpg(pgid, signal.SIGKILL)
_stop_reporters_and_sigkill(reporters)


def _stop_reporters_and_sigkill(reporters):
_stop_reporters(reporters)
pgid = os.getpgid(os.getpid())
os.killpg(pgid, signal.SIGKILL)


def _stop_reporters(reporters: typing.Iterable[reporting.Reporter]) -> None:
for reporter in reporters:
if isinstance(reporter, reporting.Event):
reporter.stop()
17 changes: 10 additions & 7 deletions src/_ert/forward_model_runner/reporting/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ def __init__(self, evaluator_url, token=None, cert_path=None):
# seconds to timeout the reporter the thread after Finish() was received
self._reporter_timeout = 60

def stop(self) -> None:
self._event_queue.put(Event._sentinel)
with self._timestamp_lock:
self._timeout_timestamp = datetime.now() + timedelta(
seconds=self._reporter_timeout
)
if self._event_publisher_thread.is_alive():
self._event_publisher_thread.join()

def _event_publisher(self):
logger.debug("Publishing event.")
with Client(
Expand Down Expand Up @@ -178,13 +187,7 @@ def _job_handler(self, msg: Union[Start, Running, Exited]):
self._dump_event(event)

def _finished_handler(self, _):
self._event_queue.put(Event._sentinel)
with self._timestamp_lock:
self._timeout_timestamp = datetime.now() + timedelta(
seconds=self._reporter_timeout
)
if self._event_publisher_thread.is_alive():
self._event_publisher_thread.join()
self.stop()

def _checksum_handler(self, msg: Checksum):
fm_checksum = ForwardModelStepChecksum(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,16 +306,19 @@ def test_retry_of_jobs_json_file_read(unused_tcp_port, tmp_path, monkeypatch, ca
}
)

with _mock_ws_thread("localhost", unused_tcp_port, []):
thread = ErtThread(target=main, args=[["script.py", str(tmp_path)]])
thread.start()
def create_jobs_file_after_lock():
_wait_until(
lambda: f"Could not find file {JOBS_FILE}, retrying" in caplog.text,
2,
"Did not get expected log message from missing jobs.json",
)
(tmp_path / JOBS_FILE).write_text(jobs_json)
lock.release()

with _mock_ws_thread("localhost", unused_tcp_port, []):
thread = ErtThread(target=create_jobs_file_after_lock)
thread.start()
main(args=["script.py", str(tmp_path)])
thread.join()


Expand Down

0 comments on commit 9a89ae3

Please sign in to comment.