From 2be73003f28c626cce06b9bad2a2c1af4afa7417 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Berland?= Date: Fri, 13 Sep 2024 15:59:34 +0200 Subject: [PATCH 1/9] Add cpu_seconds to MemoryStatus report sent from the runner --- src/_ert/forward_model_runner/cli.py | 4 +- src/_ert/forward_model_runner/job.py | 23 +++-- .../forward_model_runner/reporting/file.py | 1 + .../forward_model_runner/reporting/message.py | 8 +- .../test_event_reporter.py | 28 +++--- .../test_file_reporter.py | 8 +- .../forward_model_runner/test_job.py | 95 ++++++++++++++++--- 7 files changed, 125 insertions(+), 42 deletions(-) diff --git a/src/_ert/forward_model_runner/cli.py b/src/_ert/forward_model_runner/cli.py index b50db013945..afa4aaa70e7 100644 --- a/src/_ert/forward_model_runner/cli.py +++ b/src/_ert/forward_model_runner/cli.py @@ -9,7 +9,7 @@ from datetime import datetime from _ert.forward_model_runner import reporting -from _ert.forward_model_runner.reporting.message import Finish, MemoryStatus +from _ert.forward_model_runner.reporting.message import Finish, ProcessTreeStatus from _ert.forward_model_runner.runner import ForwardModelRunner JOBS_FILE = "jobs.json" @@ -62,7 +62,7 @@ def _setup_logging(directory: str = "logs"): memory_csv_logger.addHandler(csv_handler) memory_csv_logger.setLevel(logging.INFO) # Write the CSV header to the file: - memory_csv_logger.info(MemoryStatus().csv_header()) + memory_csv_logger.info(ProcessTreeStatus().csv_header()) job_runner_logger.addHandler(handler) job_runner_logger.setLevel(logging.DEBUG) diff --git a/src/_ert/forward_model_runner/job.py b/src/_ert/forward_model_runner/job.py index c57c9c9a556..74179c58c4e 100644 --- a/src/_ert/forward_model_runner/job.py +++ b/src/_ert/forward_model_runner/job.py @@ -17,7 +17,7 @@ from .io import assert_file_executable from .reporting.message import ( Exited, - MemoryStatus, + ProcessTreeStatus, Running, Start, ) @@ -179,15 +179,16 @@ def ensure_file_handles_closed(): max_memory_usage = 0 while exit_code is None: - (memory_rss, oom_score) = _get_rss_and_oom_score_for_processtree(process) + (memory_rss, cpu_seconds, oom_score) = _get_processtree_data(process) max_memory_usage = max(memory_rss, max_memory_usage) yield Running( self, - MemoryStatus( + ProcessTreeStatus( rss=memory_rss, max_rss=max_memory_usage, fm_step_id=self.index, fm_step_name=self.job_data.get("name"), + cpu_seconds=cpu_seconds, oom_score=oom_score, ), ) @@ -349,9 +350,9 @@ def _check_target_file_is_written(self, target_file_mtime: int, timeout=5): return f"Could not find target_file:{target_file}" -def _get_rss_and_oom_score_for_processtree( +def _get_processtree_data( process: Process, -) -> Tuple[int, Optional[int]]: +) -> Tuple[int, float, Optional[int]]: """Obtain the oom_score (the Linux kernel uses this number to decide which process to kill first in out-of-memory siturations). @@ -370,14 +371,16 @@ def _get_rss_and_oom_score_for_processtree( oom_score = None # A value of None means that we have no information. memory_rss = 0 + cpu_seconds = 0.0 with contextlib.suppress(ValueError, FileNotFoundError): oom_score = int( Path(f"/proc/{process.pid}/oom_score").read_text(encoding="utf-8") ) with contextlib.suppress( ValueError, NoSuchProcess, AccessDenied, ZombieProcess, ProcessLookupError - ): + ), process.oneshot(): memory_rss = process.memory_info().rss + cpu_seconds = process.cpu_times().user with contextlib.suppress( NoSuchProcess, AccessDenied, ZombieProcess, ProcessLookupError @@ -399,7 +402,9 @@ def _get_rss_and_oom_score_for_processtree( if oom_score is not None else oom_score_child ) - with contextlib.suppress(NoSuchProcess, AccessDenied, ZombieProcess): + with contextlib.suppress( + NoSuchProcess, AccessDenied, ZombieProcess + ), child.oneshot(): memory_rss += child.memory_info().rss - - return (memory_rss, oom_score) + cpu_seconds += child.cpu_times().user + return (memory_rss, cpu_seconds, oom_score) diff --git a/src/_ert/forward_model_runner/reporting/file.py b/src/_ert/forward_model_runner/reporting/file.py index 4080067709c..1e1b3d4a65d 100644 --- a/src/_ert/forward_model_runner/reporting/file.py +++ b/src/_ert/forward_model_runner/reporting/file.py @@ -100,6 +100,7 @@ def report(self, msg: Message): job_status.update( max_memory_usage=msg.memory_status.max_rss, current_memory_usage=msg.memory_status.rss, + cpu_seconds=msg.memory_status.cpu_seconds, status=_JOB_STATUS_RUNNING, ) memory_logger.info(msg.memory_status) diff --git a/src/_ert/forward_model_runner/reporting/message.py b/src/_ert/forward_model_runner/reporting/message.py index dbd7ae07b84..ace3862a02b 100644 --- a/src/_ert/forward_model_runner/reporting/message.py +++ b/src/_ert/forward_model_runner/reporting/message.py @@ -34,8 +34,8 @@ class ChecksumDict(_ChecksumDictBase, total=False): @dataclasses.dataclass -class MemoryStatus: - """Holds memory information that can be represented as a line of CSV data""" +class ProcessTreeStatus: + """Holds processtree information that can be represented as a line of CSV data""" timestamp: str = "" fm_step_id: Optional[int] = None @@ -46,6 +46,8 @@ class MemoryStatus: max_rss: Optional[int] = None free: Optional[int] = None + cpu_seconds: float = 0.0 + oom_score: Optional[int] = None def __post_init__(self): @@ -119,7 +121,7 @@ def __init__(self, job): class Running(Message): - def __init__(self, job: "Job", memory_status: MemoryStatus): + def __init__(self, job: "Job", memory_status: ProcessTreeStatus): super().__init__(job) self.memory_status = memory_status diff --git a/tests/unit_tests/forward_model_runner/test_event_reporter.py b/tests/unit_tests/forward_model_runner/test_event_reporter.py index 71596fdee74..8030580c8c1 100644 --- a/tests/unit_tests/forward_model_runner/test_event_reporter.py +++ b/tests/unit_tests/forward_model_runner/test_event_reporter.py @@ -22,7 +22,7 @@ Exited, Finish, Init, - MemoryStatus, + ProcessTreeStatus, Running, Start, ) @@ -124,7 +124,7 @@ def test_report_with_running_message_argument(unused_tcp_port): lines = [] with _mock_ws_thread(host, unused_tcp_port, lines): reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0)) - reporter.report(Running(job1, MemoryStatus(max_rss=100, rss=10))) + reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10))) reporter.report(Finish()) assert len(lines) == 1 @@ -143,7 +143,7 @@ def test_report_only_job_running_for_successful_run(unused_tcp_port): lines = [] with _mock_ws_thread(host, unused_tcp_port, lines): reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0)) - reporter.report(Running(job1, MemoryStatus(max_rss=100, rss=10))) + reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10))) reporter.report(Finish()) assert len(lines) == 1 @@ -158,7 +158,7 @@ def test_report_with_failed_finish_message_argument(unused_tcp_port): lines = [] with _mock_ws_thread(host, unused_tcp_port, lines): reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0)) - reporter.report(Running(job1, MemoryStatus(max_rss=100, rss=10))) + reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10))) reporter.report(Finish().with_error("massive_failure")) assert len(lines) == 1 @@ -202,9 +202,9 @@ def mock_send(msg): "_ert.forward_model_runner.client.Client.send", lambda x, y: mock_send(y) ): reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0)) - reporter.report(Running(job1, MemoryStatus(max_rss=100, rss=10))) - reporter.report(Running(job1, MemoryStatus(max_rss=1100, rss=10))) - reporter.report(Running(job1, MemoryStatus(max_rss=1100, rss=10))) + reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10))) + reporter.report(Running(job1, ProcessTreeStatus(max_rss=1100, rss=10))) + reporter.report(Running(job1, ProcessTreeStatus(max_rss=1100, rss=10))) # set _stop_timestamp reporter.report(Finish()) if reporter._event_publisher_thread.is_alive(): @@ -240,9 +240,9 @@ def send_func(msg): patched_send.side_effect = send_func reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0)) - reporter.report(Running(job1, MemoryStatus(max_rss=100, rss=10))) - reporter.report(Running(job1, MemoryStatus(max_rss=200, rss=10))) - reporter.report(Running(job1, MemoryStatus(max_rss=300, rss=10))) + reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10))) + reporter.report(Running(job1, ProcessTreeStatus(max_rss=200, rss=10))) + reporter.report(Running(job1, ProcessTreeStatus(max_rss=300, rss=10))) _wait_until( condition=lambda: patched_send.call_count == 3, @@ -278,8 +278,8 @@ def mock_send(msg): lines = [] with _mock_ws_thread(host, unused_tcp_port, lines): reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0)) - reporter.report(Running(job1, MemoryStatus(max_rss=100, rss=10))) - reporter.report(Running(job1, MemoryStatus(max_rss=200, rss=10))) + reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10))) + reporter.report(Running(job1, ProcessTreeStatus(max_rss=200, rss=10))) # sleep until both Running events have been received _wait_until( @@ -291,14 +291,14 @@ def mock_send(msg): with patch( "_ert.forward_model_runner.client.Client.send", lambda x, y: mock_send(y) ): - reporter.report(Running(job1, MemoryStatus(max_rss=300, rss=10))) + reporter.report(Running(job1, ProcessTreeStatus(max_rss=300, rss=10))) # Make sure the publisher thread exits because it got # ClientConnectionClosedOK. If it hangs it could indicate that the # exception is not caught/handled correctly if reporter._event_publisher_thread.is_alive(): reporter._event_publisher_thread.join() - reporter.report(Running(job1, MemoryStatus(max_rss=400, rss=10))) + reporter.report(Running(job1, ProcessTreeStatus(max_rss=400, rss=10))) reporter.report(Finish()) # set _stop_timestamp was not set to None since the reporter finished on time diff --git a/tests/unit_tests/forward_model_runner/test_file_reporter.py b/tests/unit_tests/forward_model_runner/test_file_reporter.py index ba9de832387..4ac53b95ee2 100644 --- a/tests/unit_tests/forward_model_runner/test_file_reporter.py +++ b/tests/unit_tests/forward_model_runner/test_file_reporter.py @@ -9,7 +9,7 @@ Exited, Finish, Init, - MemoryStatus, + ProcessTreeStatus, Running, Start, ) @@ -130,7 +130,10 @@ def test_report_with_failed_exit_message_argument(reporter): @pytest.mark.usefixtures("use_tmpdir") def test_report_with_running_message_argument(reporter): - msg = Running(Job({"name": "job1"}, 0), MemoryStatus(max_rss=100, rss=10)) + msg = Running( + Job({"name": "job1"}, 0), + ProcessTreeStatus(max_rss=100, rss=10, cpu_seconds=1.1), + ) reporter.status_dict = reporter._init_job_status_dict(msg.timestamp, 0, [msg.job]) reporter.report(msg) @@ -144,6 +147,7 @@ def test_report_with_running_message_argument(reporter): assert ( '"current_memory_usage": 10' in content ), "status.json missing current_memory_usage" + assert '"cpu_seconds": 1.1' in content, "status.json missing cpu_seconds" @pytest.mark.usefixtures("use_tmpdir") diff --git a/tests/unit_tests/forward_model_runner/test_job.py b/tests/unit_tests/forward_model_runner/test_job.py index d217087061c..01b65aa6752 100644 --- a/tests/unit_tests/forward_model_runner/test_job.py +++ b/tests/unit_tests/forward_model_runner/test_job.py @@ -1,3 +1,4 @@ +import contextlib import os import pathlib import stat @@ -11,7 +12,7 @@ import numpy as np import pytest -from _ert.forward_model_runner.job import Job, _get_rss_and_oom_score_for_processtree +from _ert.forward_model_runner.job import Job, _get_processtree_data from _ert.forward_model_runner.reporting.message import Exited, Running, Start @@ -40,6 +41,56 @@ def test_run_with_process_failing( next(run) +@pytest.mark.flaky(reruns=5) +@pytest.mark.integration_test +def test_cpu_seconds_can_detect_multiprocess(): + """Run a job that sets of two simultaneous processes that + each run for 1 second. We should be able to detect the total + cpu seconds consumed to be roughly 2 seconds. + + The test is flaky in that it tries to gather cpu_seconds data while + the subprocesses are running. On a loaded CPU this is not very robust, + but the most important catch is to be able to obtain a cpu_second + number that is larger than the busy-wait times of the individual + sub-processes. + """ + pythonscript = "busy.py" + with open(pythonscript, "w", encoding="utf-8") as pyscript: + pyscript.write( + textwrap.dedent( + """\ + import time + now = time.time() + while time.time() < now + 1: + pass""" + ) + ) + scriptname = "saturate_cpus.sh" + with open(scriptname, "w", encoding="utf-8") as script: + script.write( + textwrap.dedent( + """\ + #!/bin/sh + python busy.py & + python busy.py""" + ) + ) + executable = os.path.realpath(scriptname) + os.chmod(scriptname, stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG) + job = Job( + { + "executable": executable, + }, + 0, + ) + job.MEMORY_POLL_PERIOD = 0.1 + cpu_seconds = 0.0 + for status in job.run(): + if isinstance(status, Running): + cpu_seconds = max(cpu_seconds, status.memory_status.cpu_seconds) + assert 1.4 < cpu_seconds < 2.2 + + @pytest.mark.integration_test @pytest.mark.flaky(reruns=5) @pytest.mark.usefixtures("use_tmpdir") @@ -158,19 +209,39 @@ def test_memory_profile_in_running_events(): assert min(timedeltas).total_seconds() >= 0.01 -@pytest.mark.skipif(sys.platform.startswith("darwin"), reason="No oom_score on MacOS") -def test_oom_score_is_max_over_processtree(monkeypatch): - @dataclass - class MockedProcess: - """A very lightweight mocked psutil.Process object""" +@dataclass +class CpuTimes: + """Mocks the response of psutil.Process().cpu_times()""" + + user: float - pid: int - memory_info = MagicMock() - def children(self, recursive: bool = True): - if self.pid == 123: - return [MockedProcess(124)] +@dataclass +class MockedProcess: + """Mocks psutil.Process()""" + pid: int + memory_info = MagicMock() + + def cpu_times(self): + return CpuTimes(user=self.pid / 10.0) + + def children(self, recursive: bool): + assert recursive + if self.pid == 123: + return [MockedProcess(124)] + + def oneshot(self): + return contextlib.nullcontext() + + +def test_cpu_seconds_for_process_with_children(): + (_, cpu_seconds, _) = _get_processtree_data(MockedProcess(123)) + assert cpu_seconds == 123 / 10.0 + 124 / 10.0 + + +@pytest.mark.skipif(sys.platform.startswith("darwin"), reason="No oom_score on MacOS") +def test_oom_score_is_max_over_processtree(): def read_text_side_effect(self: pathlib.Path, *args, **kwargs): if self.absolute() == pathlib.Path("/proc/123/oom_score"): return "234" @@ -179,7 +250,7 @@ def read_text_side_effect(self: pathlib.Path, *args, **kwargs): with patch("pathlib.Path.read_text", autospec=True) as mocked_read_text: mocked_read_text.side_effect = read_text_side_effect - (_, oom_score) = _get_rss_and_oom_score_for_processtree(MockedProcess(123)) + (_, _, oom_score) = _get_processtree_data(MockedProcess(123)) assert oom_score == 456 From 751cbb1b92ce0d79ce56bad7cdb87810fc4d6bf7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Berland?= Date: Wed, 18 Sep 2024 10:22:16 +0200 Subject: [PATCH 2/9] Add test for ecl license problem detection --- .../forward-models/res/script/ecl_run.py | 20 +++++++----- .../resources/test_ecl_run_new_config.py | 31 +++++++++++++++++++ 2 files changed, 43 insertions(+), 8 deletions(-) diff --git a/src/ert/resources/forward-models/res/script/ecl_run.py b/src/ert/resources/forward-models/res/script/ecl_run.py index 8f243c243ce..e5629f21f98 100644 --- a/src/ert/resources/forward-models/res/script/ecl_run.py +++ b/src/ert/resources/forward-models/res/script/ecl_run.py @@ -12,12 +12,13 @@ from random import random import resfo -from ecl_config import EclrunConfig +from ecl_config import EclConfig, EclrunConfig, Simulator from packaging import version class EclError(RuntimeError): - pass + def failed_due_to_license_problems(self) -> bool: + return "LICENSE ERROR" in self.args[0] or "LICENSE FAILURE" in self.args[0] def await_process_tee(process, *out_files) -> int: @@ -209,7 +210,12 @@ class EclRun: """ def __init__( - self, ecl_case, sim, num_cpu=1, check_status=True, summary_conversion=False + self, + ecl_case: str, + sim: Simulator, + num_cpu: int = 1, + check_status: bool = True, + summary_conversion: bool = False, ): self.sim = sim self.check_status = check_status @@ -392,10 +398,8 @@ def runEclipse(self, eclrun_config=None, retries_left=3, backoff_sleep=None): try: self.assertECLEND() - except RuntimeError as err: - if ( - "LICENSE ERROR" in err.args[0] or "LICENSE FAILURE" in err.args[0] - ) and retries_left > 0: + except EclError as err: + if err.failed_due_to_license_problems() and retries_left > 0: time_to_wait = backoff_sleep + int( random() * self.LICENSE_RETRY_STAGGER_FACTOR ) @@ -515,7 +519,7 @@ def parseErrors(self): return error_list -def run(config, argv): +def run(config: EclConfig, argv): parser = ArgumentParser() parser.add_argument("ecl_case") parser.add_argument("-v", "--version", dest="version", type=str) diff --git a/tests/unit_tests/resources/test_ecl_run_new_config.py b/tests/unit_tests/resources/test_ecl_run_new_config.py index 4349dc0b571..0a378e733c6 100644 --- a/tests/unit_tests/resources/test_ecl_run_new_config.py +++ b/tests/unit_tests/resources/test_ecl_run_new_config.py @@ -267,3 +267,34 @@ def test_summary_block(source_root): erun.runEclipse(eclrun_config=ecl_config.EclrunConfig(econfig, "2019.3")) assert erun.summary_block() is not None + + +@pytest.mark.usefixtures("use_tmpdir") +def test_ecl100_license_error_is_caught(): + prt_error = """\ + @--MESSAGE AT TIME 0.0 DAYS ( 1-JAN-2000): + @ CHECKING FOR LICENSES + + @-- ERROR AT TIME 0.0 DAYS ( 1-JAN-2000): + @ LICENSE ERROR -1 FOR MULTI-SEGMENT WELL OPTION + @ FEATURE IS INVALID. CHECK YOUR LICENSE FILE AND + @ THE LICENSE LOG FILE""" + eclend = """\ + Error summary + Comments 0 + Warnings 0 + Problems 0 + Errors 1 + Bugs 0 + Final cpu 0.00 elapsed 0.08""" + + Path("FOO.PRT").write_text(prt_error + "\n" + eclend, encoding="utf-8") + Path("FOO.ECLEND").write_text(eclend, encoding="utf-8") + Path("FOO.DATA").write_text("", encoding="utf-8") + + run = ecl_run.EclRun("FOO.DATA", "dummysimulatorobject") + try: + run.assertECLEND() + raise AssertionError("EclError not raised") + except ecl_run.EclError as err: + assert err.failed_due_to_license_problems() From 222229046e8bdaf4f1df1fbcdc99b86d95b31b3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Berland?= Date: Wed, 18 Sep 2024 10:36:23 +0200 Subject: [PATCH 3/9] Detect E300 license problems --- .../forward-models/res/script/ecl_run.py | 37 ++++++---- .../resources/test_ecl_run_new_config.py | 67 +++++++++++++++++++ 2 files changed, 90 insertions(+), 14 deletions(-) diff --git a/src/ert/resources/forward-models/res/script/ecl_run.py b/src/ert/resources/forward-models/res/script/ecl_run.py index e5629f21f98..b51044d8e97 100644 --- a/src/ert/resources/forward-models/res/script/ecl_run.py +++ b/src/ert/resources/forward-models/res/script/ecl_run.py @@ -10,6 +10,7 @@ from collections import namedtuple from contextlib import contextmanager, suppress from random import random +from typing import List import resfo from ecl_config import EclConfig, EclrunConfig, Simulator @@ -18,7 +19,11 @@ class EclError(RuntimeError): def failed_due_to_license_problems(self) -> bool: - return "LICENSE ERROR" in self.args[0] or "LICENSE FAILURE" in self.args[0] + return ( + "LICENSE ERROR" in self.args[0] + or "LICENSE FAILURE" in self.args[0] + or "not allowed in license" in self.args[0] + ) def await_process_tee(process, *out_files) -> int: @@ -53,7 +58,8 @@ def await_process_tee(process, *out_files) -> int: EclipseResult = namedtuple("EclipseResult", "errors bugs") body_sub_pattern = r"(\s^\s@.+$)*" date_sub_pattern = r"\s+AT TIME\s+(?P\d+\.\d+)\s+DAYS\s+\((?P(.+)):\s*$" -error_pattern = rf"^\s@-- ERROR{date_sub_pattern}${body_sub_pattern}" +error_pattern_e100 = rf"^\s@-- ERROR{date_sub_pattern}${body_sub_pattern}" +error_pattern_e300 = rf"^\s@--Error${body_sub_pattern}" def make_LSB_MCPU_machine_list(LSB_MCPU_HOSTS): @@ -498,23 +504,26 @@ def readECLEND(self): return EclipseResult(errors=errors, bugs=bugs) - def parseErrors(self): + def parseErrors(self) -> List[str]: + """Extract multiline ERROR messages from the PRT file""" prt_file = os.path.join(self.runPath(), f"{self.baseName()}.PRT") error_list = [] - error_regexp = re.compile(error_pattern, re.MULTILINE) + error_e100_regexp = re.compile(error_pattern_e100, re.MULTILINE) + error_e300_regexp = re.compile(error_pattern_e300, re.MULTILINE) with open(prt_file, "r", encoding="utf-8") as filehandle: content = filehandle.read() - offset = 0 - while True: - match = error_regexp.search(content[offset:]) - if match: - error_list.append( - content[offset + match.start() : offset + match.end()] - ) - offset += match.end() - else: - break + for regexp in [error_e100_regexp, error_e300_regexp]: + offset = 0 + while True: + match = regexp.search(content[offset:]) + if match: + error_list.append( + content[offset + match.start() : offset + match.end()] + ) + offset += match.end() + else: + break return error_list diff --git a/tests/unit_tests/resources/test_ecl_run_new_config.py b/tests/unit_tests/resources/test_ecl_run_new_config.py index 0a378e733c6..2e091f196b3 100644 --- a/tests/unit_tests/resources/test_ecl_run_new_config.py +++ b/tests/unit_tests/resources/test_ecl_run_new_config.py @@ -298,3 +298,70 @@ def test_ecl100_license_error_is_caught(): raise AssertionError("EclError not raised") except ecl_run.EclError as err: assert err.failed_due_to_license_problems() + + +@pytest.mark.usefixtures("use_tmpdir") +def test_ecl300_license_error_is_caught(): + prt_error = """\ + @--Message:The message service has been activated + @--Message:Checking for licenses + @--Message:Checking for licenses + @--Message:Checking for licenses + @--Error + @ ECLIPSE option not allowed in license + @ Please ask for a new license + @ Run stopping + 0 Mbytes of storage required + No active cells found + 249 Mbytes (image size) +""" + eclend = """\ + Error summary + Comments 1 + Warnings 2 + Problems 0 + Errors 1 + Bugs 0 + Final cpu 0.01 elapsed 0.02 + Emergency stop called from routine ZSTOPE""" + + Path("FOO.PRT").write_text(prt_error + "\n" + eclend, encoding="utf-8") + Path("FOO.ECLEND").write_text(eclend, encoding="utf-8") + Path("FOO.DATA").write_text("", encoding="utf-8") + + run = ecl_run.EclRun("FOO.DATA", "dummysimulatorobject") + with pytest.raises(ecl_run.EclError) as exception_info: + run.assertECLEND() + assert exception_info.value.failed_due_to_license_problems() + + +@pytest.mark.usefixtures("use_tmpdir") +def test_ecl300_crash_is_not_mistaken_as_license_trouble(): + prt_error = """\ + @--Message:The message service has been activated + @--Message:Checking for licenses + @--Message:Checking for licenses + @--Message:Checking for licenses + @ Run stopping + 0 Mbytes of storage required + No active cells found + 249 Mbytes (image size) +""" + eclend = """\ + Error summary + Comments 1 + Warnings 2 + Problems 0 + Errors 1 + Bugs 0 + Final cpu 0.01 elapsed 0.02 + Emergency stop called from routine ZSTOPE""" + + Path("FOO.PRT").write_text(prt_error + "\n" + eclend, encoding="utf-8") + Path("FOO.ECLEND").write_text(eclend, encoding="utf-8") + Path("FOO.DATA").write_text("", encoding="utf-8") + + run = ecl_run.EclRun("FOO.DATA", "dummysimulatorobject") + with pytest.raises(ecl_run.EclError) as exception_info: + run.assertECLEND() + assert not exception_info.value.failed_due_to_license_problems() From 883a995952d3016ed4e4b0a0ac37cfc0c59e7fee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Berland?= Date: Thu, 19 Sep 2024 15:38:04 +0200 Subject: [PATCH 4/9] Add test for not mistaking E100 crashes for license --- .../resources/test_ecl_run_new_config.py | 33 ++++++++++++++++--- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/tests/unit_tests/resources/test_ecl_run_new_config.py b/tests/unit_tests/resources/test_ecl_run_new_config.py index 2e091f196b3..229956547c4 100644 --- a/tests/unit_tests/resources/test_ecl_run_new_config.py +++ b/tests/unit_tests/resources/test_ecl_run_new_config.py @@ -293,11 +293,36 @@ def test_ecl100_license_error_is_caught(): Path("FOO.DATA").write_text("", encoding="utf-8") run = ecl_run.EclRun("FOO.DATA", "dummysimulatorobject") - try: + with pytest.raises(ecl_run.EclError) as exception_info: run.assertECLEND() - raise AssertionError("EclError not raised") - except ecl_run.EclError as err: - assert err.failed_due_to_license_problems() + assert exception_info.value.failed_due_to_license_problems() + + +@pytest.mark.usefixtures("use_tmpdir") +def test_ecl100_crash_is_not_mistaken_as_license_trouble(): + prt_error = """\ + @--MESSAGE AT TIME 0.0 DAYS ( 1-JAN-2000): + @ CHECKING FOR LICENSES + + @-- ERROR AT TIME 0.0 DAYS ( 1-JAN-2000): + @ NON-LINEAR CONVERGENCE FAILURE""" + eclend = """\ + Error summary + Comments 0 + Warnings 0 + Problems 0 + Errors 1 + Bugs 0 + Final cpu 0.00 elapsed 0.08""" + + Path("FOO.PRT").write_text(prt_error + "\n" + eclend, encoding="utf-8") + Path("FOO.ECLEND").write_text(eclend, encoding="utf-8") + Path("FOO.DATA").write_text("", encoding="utf-8") + + run = ecl_run.EclRun("FOO.DATA", "dummysimulatorobject") + with pytest.raises(ecl_run.EclError) as exception_info: + run.assertECLEND() + assert not exception_info.value.failed_due_to_license_problems() @pytest.mark.usefixtures("use_tmpdir") From f960cfd8f4dc9b1ed6f1650f023dab6ba8e9b27b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Berland?= Date: Thu, 19 Sep 2024 14:46:11 +0200 Subject: [PATCH 5/9] Detect license trouble in slave Eclipse models In a coupled reservoir simulation, there is a master Eclipse process, which itself starts up Eclipse processes for its slaves. If the master process fails due to license trouble, it is caught by the existing code, but if the master passes but any of the slaves do not, the PRT files of the slaves must be parsed to deduce license failure or not. --- .../forward-models/res/script/ecl_run.py | 38 +++++- .../resources/test_ecl_run_new_config.py | 109 ++++++++++++++++++ 2 files changed, 141 insertions(+), 6 deletions(-) diff --git a/src/ert/resources/forward-models/res/script/ecl_run.py b/src/ert/resources/forward-models/res/script/ecl_run.py index b51044d8e97..371d68609a7 100644 --- a/src/ert/resources/forward-models/res/script/ecl_run.py +++ b/src/ert/resources/forward-models/res/script/ecl_run.py @@ -1,4 +1,5 @@ import datetime +import glob import os import os.path import re @@ -9,6 +10,7 @@ from argparse import ArgumentParser from collections import namedtuple from contextlib import contextmanager, suppress +from pathlib import Path from random import random from typing import List @@ -17,13 +19,30 @@ from packaging import version +def ecl_output_has_license_error(ecl_output: str): + return ( + "LICENSE ERROR" in ecl_output + or "LICENSE FAILURE" in ecl_output + or "not allowed in license" in ecl_output + ) + + class EclError(RuntimeError): def failed_due_to_license_problems(self) -> bool: - return ( - "LICENSE ERROR" in self.args[0] - or "LICENSE FAILURE" in self.args[0] - or "not allowed in license" in self.args[0] - ) + # self.args[0] contains the multiline ERROR messages and SLAVE startup messages + if ecl_output_has_license_error(self.args[0]): + return True + if re.search(a_slave_failed_pattern, self.args[0]): + for match in re.finditer(slave_run_paths, self.args[0], re.MULTILINE): + (ecl_case_starts_with, ecl_case_dir) = match.groups() + for prt_file in glob.glob( + f"{ecl_case_dir}/{ecl_case_starts_with}*.PRT" + ): + if ecl_output_has_license_error( + Path(prt_file).read_text(encoding="utf-8") + ): + return True + return False def await_process_tee(process, *out_files) -> int: @@ -60,6 +79,12 @@ def await_process_tee(process, *out_files) -> int: date_sub_pattern = r"\s+AT TIME\s+(?P\d+\.\d+)\s+DAYS\s+\((?P(.+)):\s*$" error_pattern_e100 = rf"^\s@-- ERROR{date_sub_pattern}${body_sub_pattern}" error_pattern_e300 = rf"^\s@--Error${body_sub_pattern}" +slave_started_pattern = ( + rf"^\s@--MESSAGE{date_sub_pattern}\s^\s@\s+STARTING SLAVE.+${body_sub_pattern}" +) +a_slave_failed_pattern = r"\s@\s+SLAVE RUN.*HAS STOPPED WITH AN ERROR CONDITION.\s*" +slave_run_paths = r"^\s@\s+STARTING SLAVE\s+[^ ]+RUNNING \([^ ]\)\s*$" +slave_run_paths = r"\s@\s+STARTING SLAVE .* RUNNING (\w+)\s*^\s@\s+ON HOST.*IN DIRECTORY\s*^\s@\s+(.*)" def make_LSB_MCPU_machine_list(LSB_MCPU_HOSTS): @@ -510,10 +535,11 @@ def parseErrors(self) -> List[str]: error_list = [] error_e100_regexp = re.compile(error_pattern_e100, re.MULTILINE) error_e300_regexp = re.compile(error_pattern_e300, re.MULTILINE) + slave_started_regexp = re.compile(slave_started_pattern, re.MULTILINE) with open(prt_file, "r", encoding="utf-8") as filehandle: content = filehandle.read() - for regexp in [error_e100_regexp, error_e300_regexp]: + for regexp in [error_e100_regexp, error_e300_regexp, slave_started_regexp]: offset = 0 while True: match = regexp.search(content[offset:]) diff --git a/tests/unit_tests/resources/test_ecl_run_new_config.py b/tests/unit_tests/resources/test_ecl_run_new_config.py index 229956547c4..6f33446b299 100644 --- a/tests/unit_tests/resources/test_ecl_run_new_config.py +++ b/tests/unit_tests/resources/test_ecl_run_new_config.py @@ -390,3 +390,112 @@ def test_ecl300_crash_is_not_mistaken_as_license_trouble(): with pytest.raises(ecl_run.EclError) as exception_info: run.assertECLEND() assert not exception_info.value.failed_due_to_license_problems() + + +@pytest.mark.usefixtures("use_tmpdir") +def test_license_error_in_slave_is_caught(): + """If a coupled Eclipse model fails in one of the slave runs + due to license issues, there is no trace of licence in the master PRT file. + + The master PRT file must be trace for the paths to the SLAVE runs + and then those PRT files must be parsed. + + Note that the name of the DATA file is truncated in the MESSAGE listing + the slaves. + """ + Path("slave1").mkdir() + Path("slave2").mkdir() + master_prt_error = f"""\ + @--MESSAGE AT TIME 0.0 DAYS ( 1-JAN-2000): + @ THIS IS JUST A MESSAGE, NOTHING ELSE + @--MESSAGE AT TIME 0.0 DAYS ( 1-JAN-2000): + @ STARTING SLAVE SLAVE1 RUNNING EIGHTCEL + @ ON HOST localhost IN DIRECTORY + @ {os.getcwd()}/slave1 + @--MESSAGE AT TIME 0.0 DAYS ( 1-JAN-2000): + @ STARTING SLAVE SLAVE2 RUNNING EIGHTCEL + @ ON HOST localhost IN DIRECTORY + @ {os.getcwd()}/slave2 + + + + @-- ERROR AT TIME 0.0 DAYS ( 1-JAN-2000): + @ SLAVE RUN SLAVE2 HAS STOPPED WITH AN ERROR CONDITION. + @ MASTER RUN AND REMAINING SLAVES WILL ALSO STOP. + """ + master_eclend = """ + Error summary + Comments 1 + Warnings 1 + Problems 0 + Errors 1 + Bugs 0""" + + Path("EIGHTCELLS_MASTER.PRT").write_text( + master_prt_error + "\n" + master_eclend, encoding="utf-8" + ) + Path("EIGHTCELLS_MASTER.ECLEND").write_text(master_eclend, encoding="utf-8") + + slave_prt_error = """\ + @-- ERROR AT TIME 0.0 DAYS ( 1-JAN-2000): + @ LICENSE ERROR -15 FOR MULTI-SEGMENT WELL OPTION + @ FEATURE IS INVALID. CHECK YOUR LICENSE FILE AND + @ THE LICENSE LOG FILE + """ + Path("slave1/EIGHTCELLS_SLAVE.PRT").write_text("", encoding="utf-8") + Path("slave2/EIGHTCELLS_SLAVE.PRT").write_text(slave_prt_error, encoding="utf-8") + Path("EIGHTCELLS_MASTER.DATA").write_text("", encoding="utf-8") + + run = ecl_run.EclRun("EIGHTCELLS_MASTER.DATA", "dummysimulatorobject") + with pytest.raises(ecl_run.EclError) as exception_info: + run.assertECLEND() + assert exception_info.value.failed_due_to_license_problems() + + +@pytest.mark.usefixtures("use_tmpdir") +def test_crash_in_slave_is_not_mistaken_as_license(): + Path("slave1").mkdir() + Path("slave2").mkdir() + master_prt_error = f"""\ + @--MESSAGE AT TIME 0.0 DAYS ( 1-JAN-2000): + @ THIS IS JUST A MESSAGE, NOTHING ELSE + @--MESSAGE AT TIME 0.0 DAYS ( 1-JAN-2000): + @ STARTING SLAVE SLAVE1 RUNNING EIGHTCEL + @ ON HOST localhost IN DIRECTORY + @ {os.getcwd()}/slave1 + @--MESSAGE AT TIME 0.0 DAYS ( 1-JAN-2000): + @ STARTING SLAVE SLAVE2 RUNNING EIGHTCEL + @ ON HOST localhost IN DIRECTORY + @ {os.getcwd()}/slave2 + + + + @-- ERROR AT TIME 0.0 DAYS ( 1-JAN-2000): + @ SLAVE RUN SLAVE2 HAS STOPPED WITH AN ERROR CONDITION. + @ MASTER RUN AND REMAINING SLAVES WILL ALSO STOP. + """ + master_eclend = """ + Error summary + Comments 1 + Warnings 1 + Problems 0 + Errors 1 + Bugs 0""" + + Path("EIGHTCELLS_MASTER.PRT").write_text( + master_prt_error + "\n" + master_eclend, encoding="utf-8" + ) + Path("EIGHTCELLS_MASTER.ECLEND").write_text(master_eclend, encoding="utf-8") + + slave_prt_error = """\ + @-- ERROR AT TIME 0.0 DAYS ( 1-JAN-2000): + @ NON-LINEAR CONVERGENCE FAILURE + """ + Path("slave1/EIGHTCELLS_SLAVE.PRT").write_text("", encoding="utf-8") + Path("slave2/EIGHTCELLS_SLAVE.PRT").write_text(slave_prt_error, encoding="utf-8") + Path("EIGHTCELLS_MASTER.DATA").write_text("", encoding="utf-8") + + run = ecl_run.EclRun("EIGHTCELLS_MASTER.DATA", "dummysimulatorobject") + with pytest.raises(ecl_run.EclError) as exception_info: + run.assertECLEND() + assert not exception_info.value.failed_due_to_license_problems() From f182f56a640defb74604f000a34401af9cd6d2e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Berland?= Date: Mon, 23 Sep 2024 15:30:50 +0200 Subject: [PATCH 6/9] Warn when parsed PRT errors are inconsistent --- .../forward-models/res/script/ecl_run.py | 14 +++- .../resources/test_ecl_run_new_config.py | 81 +++++++++++++++++++ 2 files changed, 92 insertions(+), 3 deletions(-) diff --git a/src/ert/resources/forward-models/res/script/ecl_run.py b/src/ert/resources/forward-models/res/script/ecl_run.py index 371d68609a7..93dec112d2b 100644 --- a/src/ert/resources/forward-models/res/script/ecl_run.py +++ b/src/ert/resources/forward-models/res/script/ecl_run.py @@ -449,7 +449,6 @@ def runEclipse(self, eclrun_config=None, retries_left=3, backoff_sleep=None): return else: raise err from None - if self.num_cpu > 1: self.summary_block() @@ -494,10 +493,19 @@ def assertECLEND(self): if result.errors > 0: error_list = self.parseErrors() sep = "\n\n...\n\n" - error_msg = sep.join(error_list) + error_and_slave_msg = sep.join(error_list) + extra_message = "" + error_messages = [ + error for error in error_list if not "STARTING SLAVE" in str(error) + ] + if result.errors != len(error_messages): + extra_message = ( + f"\n\nWarning, mismatch between stated Error count ({result.errors}) " + f"and number of ERROR messages found in PRT ({len(error_messages)})." + ) raise EclError( "Eclipse simulation failed with:" - f"{result.errors:d} errors:\n\n{error_msg}" + f"{result.errors:d} errors:\n\n{error_and_slave_msg}{extra_message}" ) if result.bugs > 0: diff --git a/tests/unit_tests/resources/test_ecl_run_new_config.py b/tests/unit_tests/resources/test_ecl_run_new_config.py index 6f33446b299..b1e4222a676 100644 --- a/tests/unit_tests/resources/test_ecl_run_new_config.py +++ b/tests/unit_tests/resources/test_ecl_run_new_config.py @@ -499,3 +499,84 @@ def test_crash_in_slave_is_not_mistaken_as_license(): with pytest.raises(ecl_run.EclError) as exception_info: run.assertECLEND() assert not exception_info.value.failed_due_to_license_problems() + + +@pytest.mark.usefixtures("use_tmpdir") +def test_too_few_parsed_error_messages_gives_warning(): + prt_error = """\ + @--MESSAGE AT TIME 0.0 DAYS ( 1-JAN-2000): + @ THIS IS JUST A MESSAGE, NOTHING ELSE""" + eclend = """ + Error summary + Comments 0 + Warnings 0 + Problems 0 + Errors 1 + Bugs 0""" + + Path("ECLCASE.PRT").write_text(prt_error + "\n" + eclend, encoding="utf-8") + Path("ECLCASE.ECLEND").write_text(eclend, encoding="utf-8") + + Path("ECLCASE.DATA").write_text("", encoding="utf-8") + + run = ecl_run.EclRun("ECLCASE.DATA", "dummysimulatorobject") + with pytest.raises(ecl_run.EclError) as exception_info: + run.assertECLEND() + assert "Warning, mismatch between stated Error count" in str(exception_info.value) + + +@pytest.mark.usefixtures("use_tmpdir") +def test_correct_number_of_parsed_error_messages_gives_no_warning(): + prt_error = """\ + @-- ERROR AT TIME 0.0 DAYS ( 1-JAN-2000): + @ THIS IS A DUMMY ERROR MESSAGE""" + eclend = """ + Error summary + Comments 0 + Warnings 0 + Problems 0 + Errors 1 + Bugs 0""" + + Path("ECLCASE.PRT").write_text(prt_error + "\n" + eclend, encoding="utf-8") + Path("ECLCASE.ECLEND").write_text(eclend, encoding="utf-8") + + Path("ECLCASE.DATA").write_text("", encoding="utf-8") + + run = ecl_run.EclRun("ECLCASE.DATA", "dummysimulatorobject") + with pytest.raises(ecl_run.EclError) as exception_info: + run.assertECLEND() + assert "Warning, mismatch between stated Error count" not in str( + exception_info.value + ) + + +@pytest.mark.usefixtures("use_tmpdir") +def test_slave_started_message_are_not_counted_as_errors(): + prt_error = f"""\ + @-- ERROR AT TIME 0.0 DAYS ( 1-JAN-2000): + @ THIS IS A DUMMY ERROR MESSAGE + + @--MESSAGE AT TIME 0.0 DAYS ( 1-JAN-2000): + @ STARTING SLAVE SLAVE1 RUNNING EIGHTCEL + @ ON HOST localhost IN DIRECTORY + @ {os.getcwd()}/slave1""" + eclend = """ + Error summary + Comments 0 + Warnings 0 + Problems 0 + Errors 1 + Bugs 0""" + + Path("ECLCASE.PRT").write_text(prt_error + "\n" + eclend, encoding="utf-8") + Path("ECLCASE.ECLEND").write_text(eclend, encoding="utf-8") + + Path("ECLCASE.DATA").write_text("", encoding="utf-8") + + run = ecl_run.EclRun("ECLCASE.DATA", "dummysimulatorobject") + with pytest.raises(ecl_run.EclError) as exception_info: + run.assertECLEND() + assert "Warning, mismatch between stated Error count" not in str( + exception_info.value + ) From 063f709b3e1b75ba15e68ca98966d4723e3e1c2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Berland?= Date: Mon, 23 Sep 2024 15:47:16 +0200 Subject: [PATCH 7/9] Include tail of PRT file when ecl_run detect error count inconsistency --- .../forward-models/res/script/ecl_run.py | 25 ++++++++++++--- .../resources/test_ecl_run_new_config.py | 31 +++++++++++++++++++ 2 files changed, 52 insertions(+), 4 deletions(-) diff --git a/src/ert/resources/forward-models/res/script/ecl_run.py b/src/ert/resources/forward-models/res/script/ecl_run.py index 93dec112d2b..6975da5d632 100644 --- a/src/ert/resources/forward-models/res/script/ecl_run.py +++ b/src/ert/resources/forward-models/res/script/ecl_run.py @@ -280,6 +280,10 @@ def runPath(self): def baseName(self): return self.base_name + @property + def prt_path(self): + return Path(self.run_path) / (self.baseName() + ".PRT") + def numCpu(self): return self.num_cpu @@ -489,6 +493,7 @@ def summary_block(self): return ecl_sum def assertECLEND(self): + tail_length = 5000 result = self.readECLEND() if result.errors > 0: error_list = self.parseErrors() @@ -502,7 +507,9 @@ def assertECLEND(self): extra_message = ( f"\n\nWarning, mismatch between stated Error count ({result.errors}) " f"and number of ERROR messages found in PRT ({len(error_messages)})." - ) + f"\n\nTail ({tail_length} bytes) of PRT-file {self.prt_path}:\n\n" + ) + tail_textfile(self.prt_path, 5000) + raise EclError( "Eclipse simulation failed with:" f"{result.errors:d} errors:\n\n{error_and_slave_msg}{extra_message}" @@ -517,7 +524,7 @@ def readECLEND(self): report_file = os.path.join(self.run_path, f"{self.base_name}.ECLEND") if not os.path.isfile(report_file): - report_file = os.path.join(self.run_path, f"{self.base_name}.PRT") + report_file = self.prt_path errors = None bugs = None @@ -539,12 +546,11 @@ def readECLEND(self): def parseErrors(self) -> List[str]: """Extract multiline ERROR messages from the PRT file""" - prt_file = os.path.join(self.runPath(), f"{self.baseName()}.PRT") error_list = [] error_e100_regexp = re.compile(error_pattern_e100, re.MULTILINE) error_e300_regexp = re.compile(error_pattern_e300, re.MULTILINE) slave_started_regexp = re.compile(slave_started_pattern, re.MULTILINE) - with open(prt_file, "r", encoding="utf-8") as filehandle: + with open(self.prt_path, "r", encoding="utf-8") as filehandle: content = filehandle.read() for regexp in [error_e100_regexp, error_e300_regexp, slave_started_regexp]: @@ -604,3 +610,14 @@ def run(config: EclConfig, argv): except EclError as msg: print(msg, file=sys.stderr) sys.exit(-1) + + +def tail_textfile(file_path: Path, num_chars: int) -> str: + if not file_path.exists(): + return f"No output file {file_path}" + with open(file_path, encoding="utf-8") as file: + file.seek(0, 2) + file_end_position = file.tell() + seek_position = max(0, file_end_position - num_chars) + file.seek(seek_position) + return file.read()[-num_chars:] diff --git a/tests/unit_tests/resources/test_ecl_run_new_config.py b/tests/unit_tests/resources/test_ecl_run_new_config.py index b1e4222a676..59ff4ccaa51 100644 --- a/tests/unit_tests/resources/test_ecl_run_new_config.py +++ b/tests/unit_tests/resources/test_ecl_run_new_config.py @@ -525,6 +525,37 @@ def test_too_few_parsed_error_messages_gives_warning(): assert "Warning, mismatch between stated Error count" in str(exception_info.value) +@pytest.mark.usefixtures("use_tmpdir") +def test_tail_of_prt_file_is_included_when_error_count_inconsistency(): + prt_error = ( + "this_should_not_be_included " + + "\n" * 10000 + + """ + this_should_be_included + + @--MESSAGE AT TIME 0.0 DAYS ( 1-JAN-2000): + @ THIS IS JUST A MESSAGE, NOTHING ELSE""" + ) + eclend = """ + Error summary + Comments 0 + Warnings 0 + Problems 0 + Errors 1 + Bugs 0""" + + Path("ECLCASE.PRT").write_text(prt_error + "\n" + eclend, encoding="utf-8") + Path("ECLCASE.ECLEND").write_text(eclend, encoding="utf-8") + + Path("ECLCASE.DATA").write_text("", encoding="utf-8") + + run = ecl_run.EclRun("ECLCASE.DATA", "dummysimulatorobject") + with pytest.raises(ecl_run.EclError) as exception_info: + run.assertECLEND() + assert "this_should_be_included" in str(exception_info.value) + assert "this_should_not_be_included" not in str(exception_info.value) + + @pytest.mark.usefixtures("use_tmpdir") def test_correct_number_of_parsed_error_messages_gives_no_warning(): prt_error = """\ From 7c75370dcae308948363c63d06238d35cecf65a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Berland?= Date: Tue, 17 Sep 2024 07:40:16 +0200 Subject: [PATCH 8/9] Let ensemble evaluator evaluate NUM_CPU inconsistencies If the forward model runner reports a cpu-time that exceeds duration times NUM_CPU, it implies that the user has misconfigured something, typically NUM_CPU, or that the forward model does not respect NUM_CPU. --- src/_ert/events.py | 1 + .../forward_model_runner/reporting/event.py | 1 + src/ert/ensemble_evaluator/evaluator.py | 33 +++++++++++++++- src/ert/ensemble_evaluator/identifiers.py | 1 + src/ert/ensemble_evaluator/snapshot.py | 2 + .../test_ensemble_evaluator.py | 38 ++++++++++++++++++- 6 files changed, 74 insertions(+), 2 deletions(-) diff --git a/src/_ert/events.py b/src/_ert/events.py index 0f810286f9e..99033c7e833 100644 --- a/src/_ert/events.py +++ b/src/_ert/events.py @@ -85,6 +85,7 @@ class ForwardModelStepRunning(ForwardModelStepBaseEvent): event_type: Id.FORWARD_MODEL_STEP_RUNNING_TYPE = Id.FORWARD_MODEL_STEP_RUNNING max_memory_usage: Union[int, None] = None current_memory_usage: Union[int, None] = None + cpu_seconds: float = 0.0 class ForwardModelStepSuccess(ForwardModelStepBaseEvent): diff --git a/src/_ert/forward_model_runner/reporting/event.py b/src/_ert/forward_model_runner/reporting/event.py index ece3d893408..8bf13dee238 100644 --- a/src/_ert/forward_model_runner/reporting/event.py +++ b/src/_ert/forward_model_runner/reporting/event.py @@ -173,6 +173,7 @@ def _job_handler(self, msg: Union[Start, Running, Exited]): **job_msg, max_memory_usage=msg.memory_status.max_rss, current_memory_usage=msg.memory_status.rss, + cpu_seconds=msg.memory_status.cpu_seconds, ) self._dump_event(event) diff --git a/src/ert/ensemble_evaluator/evaluator.py b/src/ert/ensemble_evaluator/evaluator.py index bb7f91cc05e..4afe8e97971 100644 --- a/src/ert/ensemble_evaluator/evaluator.py +++ b/src/ert/ensemble_evaluator/evaluator.py @@ -1,4 +1,5 @@ import asyncio +import datetime import logging import traceback from contextlib import asynccontextmanager, contextmanager @@ -47,6 +48,7 @@ ) from ert.ensemble_evaluator import identifiers as ids +from ._ensemble import FMStepSnapshot from ._ensemble import LegacyEnsemble as Ensemble from .config import EvaluatorServerConfig from .snapshot import EnsembleSnapshot @@ -161,12 +163,20 @@ async def _stopped_handler(self, events: Sequence[EnsembleSucceeded]) -> None: return max_memory_usage = -1 - for fm_step in self.ensemble.snapshot.get_all_fm_steps().values(): + for (real_id, _), fm_step in self.ensemble.snapshot.get_all_fm_steps().items(): + # Infer max memory usage memory_usage = fm_step.get(ids.MAX_MEMORY_USAGE) or "-1" max_memory_usage = max(int(memory_usage), max_memory_usage) + + if cpu_message := detect_overspent_cpu( + self.ensemble.reals[int(real_id)].num_cpu, real_id, fm_step + ): + logger.warning(cpu_message) + logger.info( f"Ensemble ran with maximum memory usage for a single realization job: {max_memory_usage}" ) + await self._append_message(self.ensemble.update_snapshot(events)) async def _cancelled_handler(self, events: Sequence[EnsembleCancelled]) -> None: @@ -425,3 +435,24 @@ async def run_and_get_successful_realizations(self) -> List[int]: def _get_ens_id(source: str) -> str: # the ens_id will be found at /ert/ensemble/ens_id/... return source.split("/")[3] + + +def detect_overspent_cpu(num_cpu: int, real_id: str, fm_step: FMStepSnapshot) -> str: + """Produces a message warning about misconfiguration of NUM_CPU if + so is detected. Returns an empty string if everything is ok.""" + now = datetime.datetime.now() + duration = ( + (fm_step.get(ids.END_TIME) or now) - (fm_step.get(ids.START_TIME) or now) + ).total_seconds() + if duration <= 0: + return "" + cpu_seconds = fm_step.get(ids.CPU_SECONDS) or 0.0 + parallelization_obtained = cpu_seconds / duration + if parallelization_obtained > num_cpu: + return ( + f"Misconfigured NUM_CPU, forward model step '{fm_step.get(ids.NAME)}' for " + f"realization {real_id} spent {cpu_seconds} cpu seconds " + f"with wall clock duration {duration:.1f} seconds, " + f"a factor of {parallelization_obtained:.2f}, while NUM_CPU was {num_cpu}." + ) + return "" diff --git a/src/ert/ensemble_evaluator/identifiers.py b/src/ert/ensemble_evaluator/identifiers.py index 31cccb84be3..918b11779ad 100644 --- a/src/ert/ensemble_evaluator/identifiers.py +++ b/src/ert/ensemble_evaluator/identifiers.py @@ -5,6 +5,7 @@ ERROR: Final = "error" INDEX: Final = "index" MAX_MEMORY_USAGE: Final = "max_memory_usage" +CPU_SECONDS: Final = "cpu_seconds" NAME: Final = "name" START_TIME: Final = "start_time" STATUS: Final = "status" diff --git a/src/ert/ensemble_evaluator/snapshot.py b/src/ert/ensemble_evaluator/snapshot.py index 00448fbd728..35e0157944f 100644 --- a/src/ert/ensemble_evaluator/snapshot.py +++ b/src/ert/ensemble_evaluator/snapshot.py @@ -346,6 +346,7 @@ def update_from_event( if type(event) is ForwardModelStepRunning: fm["current_memory_usage"] = event.current_memory_usage fm["max_memory_usage"] = event.max_memory_usage + fm["cpu_seconds"] = event.cpu_seconds if type(event) is ForwardModelStepStart: fm["stdout"] = event.std_out fm["stderr"] = event.std_err @@ -384,6 +385,7 @@ class FMStepSnapshot(TypedDict, total=False): index: Optional[str] current_memory_usage: Optional[int] max_memory_usage: Optional[int] + cpu_seconds: Optional[float] name: Optional[str] error: Optional[str] stdout: Optional[str] diff --git a/tests/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py b/tests/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py index 11f01658b84..668527b6017 100644 --- a/tests/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py +++ b/tests/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py @@ -1,8 +1,11 @@ import asyncio +import datetime from functools import partial from typing import cast import pytest +from hypothesis import given +from hypothesis import strategies as st from _ert.events import ( EESnapshot, @@ -17,7 +20,13 @@ event_to_json, ) from _ert.forward_model_runner.client import Client -from ert.ensemble_evaluator import EnsembleEvaluator, EnsembleSnapshot, Monitor +from ert.ensemble_evaluator import ( + EnsembleEvaluator, + EnsembleSnapshot, + FMStepSnapshot, + Monitor, +) +from ert.ensemble_evaluator.evaluator import detect_overspent_cpu from ert.ensemble_evaluator.state import ( ENSEMBLE_STATE_STARTED, ENSEMBLE_STATE_UNKNOWN, @@ -465,3 +474,30 @@ async def test_ensure_multi_level_events_in_order(evaluator_to_use): if "reals" in event.snapshot: assert ensemble_state == ENSEMBLE_STATE_STARTED ensemble_state = event.snapshot.get("status", ensemble_state) + + +@given( + num_cpu=st.integers(min_value=1, max_value=64), + start=st.datetimes(), + duration=st.integers(min_value=-1, max_value=10000), + cpu_seconds=st.floats(min_value=0), +) +def test_overspent_cpu_is_logged( + num_cpu: int, + start: datetime.datetime, + duration: int, + cpu_seconds: float, +): + message = detect_overspent_cpu( + num_cpu, + "dummy", + FMStepSnapshot( + start_time=start, + end_time=start + datetime.timedelta(seconds=duration), + cpu_seconds=cpu_seconds, + ), + ) + if duration > 0 and cpu_seconds / duration > num_cpu: + assert "Misconfigured NUM_CPU" in message + else: + assert "NUM_CPU" not in message From a6b282aee48a3d76056cf23dd883b278ce958c28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Berland?= Date: Tue, 24 Sep 2024 16:05:12 +0200 Subject: [PATCH 9/9] Avoid bsub retrying on known error messages Contrary to earlier belief in the codebase, returncode 255 does not only mean flaky ssh connection. In order to no retry on known error scenarios, we must detect them explicitly by string matcing. --- src/ert/scheduler/driver.py | 14 +++++-- src/ert/scheduler/lsf_driver.py | 4 +- tests/unit_tests/scheduler/test_lsf_driver.py | 39 +++++++++++++++++++ 3 files changed, 52 insertions(+), 5 deletions(-) diff --git a/src/ert/scheduler/driver.py b/src/ert/scheduler/driver.py index a5a9f124d14..f83071ed6c7 100644 --- a/src/ert/scheduler/driver.py +++ b/src/ert/scheduler/driver.py @@ -83,7 +83,8 @@ async def _execute_with_retry( total_attempts: int = 1, retry_interval: float = 1.0, driverlogger: Optional[logging.Logger] = None, - exit_on_msgs: Iterable[str] = (), + return_on_msgs: Iterable[str] = (), + error_on_msgs: Iterable[str] = (), log_to_debug: Optional[bool] = True, ) -> Tuple[bool, str]: _logger = driverlogger or logging.getLogger(__name__) @@ -121,11 +122,16 @@ async def _execute_with_retry( f'Command "{shlex.join(cmd_with_args)}" succeeded with {outputs}' ) return True, stdout.decode(errors="ignore").strip() - elif exit_on_msgs and any( - exit_on_msg in stderr.decode(errors="ignore") - for exit_on_msg in exit_on_msgs + elif return_on_msgs and any( + return_on_msg in stderr.decode(errors="ignore") + for return_on_msg in return_on_msgs ): return True, stderr.decode(errors="ignore").strip() + elif error_on_msgs and any( + error_on_msg in stderr.decode(errors="ignore") + for error_on_msg in error_on_msgs + ): + return False, stderr.decode(errors="ignore").strip() elif process.returncode in retry_codes: error_message = outputs elif process.returncode in accept_codes: diff --git a/src/ert/scheduler/lsf_driver.py b/src/ert/scheduler/lsf_driver.py index cd2f49a9ca6..7c7e364a2dc 100644 --- a/src/ert/scheduler/lsf_driver.py +++ b/src/ert/scheduler/lsf_driver.py @@ -94,6 +94,7 @@ class RunningJob: LSF_INFO_JSON_FILENAME = "lsf_info.json" FLAKY_SSH_RETURNCODE = 255 JOB_ALREADY_FINISHED_BKILL_MSG = "Job has already finished" +BSUB_FAILURE_MESSAGES = ("Job not submitted",) def _parse_jobs_dict(jobs: Mapping[str, JobState]) -> dict[str, AnyJob]: @@ -340,6 +341,7 @@ async def submit( retry_codes=(FLAKY_SSH_RETURNCODE,), total_attempts=self._bsub_retries, retry_interval=self._sleep_time_between_cmd_retries, + error_on_msgs=BSUB_FAILURE_MESSAGES, ) if not process_success: self._job_error_message_by_iens[iens] = process_message @@ -392,7 +394,7 @@ async def kill(self, iens: int) -> None: retry_codes=(FLAKY_SSH_RETURNCODE,), total_attempts=3, retry_interval=self._sleep_time_between_cmd_retries, - exit_on_msgs=(JOB_ALREADY_FINISHED_BKILL_MSG), + return_on_msgs=(JOB_ALREADY_FINISHED_BKILL_MSG), ) await asyncio.create_subprocess_shell( f"sleep {self._sleep_time_between_bkills}; {self._bkill_cmd} -s SIGKILL {job_id}", diff --git a/tests/unit_tests/scheduler/test_lsf_driver.py b/tests/unit_tests/scheduler/test_lsf_driver.py index 9c2cbe9cbd2..bd701b4a203 100644 --- a/tests/unit_tests/scheduler/test_lsf_driver.py +++ b/tests/unit_tests/scheduler/test_lsf_driver.py @@ -560,6 +560,45 @@ async def test_that_bsub_will_retry_and_fail( await driver.submit(0, "sleep 10") +@pytest.mark.parametrize( + ("exit_code, error_msg"), + [ + # All these have been manually obtained on the command line by perturbing the command arguments to bsub: + (255, "No such queue. Job not submitted"), + (255, "Too many processors requested. Job not submitted."), + (255, 'Error near "select" : duplicate section. Job not submitted.'), + ( + 255, + "Error in select section: Expected number, string, " + 'name, or "(" but found end of section. Job not submitted.', + ), + ( + 255, + "Error with :" + " '&' cannot be used in the resource requirement section. Job not submitted.", + ), + (255, "Error in rusage section. Job not submitted."), + (255, "Job not submitted."), + ], +) +async def test_that_bsub_will_fail_without_retries( + monkeypatch, tmp_path, exit_code, error_msg +): + monkeypatch.chdir(tmp_path) + bin_path = Path("bin") + bin_path.mkdir() + monkeypatch.setenv("PATH", f"{bin_path}:{os.environ['PATH']}") + bsub_path = bin_path / "bsub" + bsub_path.write_text( + f'#!/bin/sh\necho . >> bsubcalls\necho "{error_msg}" >&2\nexit {exit_code}' + ) + bsub_path.chmod(bsub_path.stat().st_mode | stat.S_IEXEC) + driver = LsfDriver() + with pytest.raises(RuntimeError): + await driver.submit(0, "sleep 10") + assert len(Path("bsubcalls").read_text(encoding="utf-8").strip()) == 1 + + @pytest.mark.parametrize( ("exit_code, error_msg"), [