diff --git a/src/_ert/events.py b/src/_ert/events.py index 0f810286f9e..99033c7e833 100644 --- a/src/_ert/events.py +++ b/src/_ert/events.py @@ -85,6 +85,7 @@ class ForwardModelStepRunning(ForwardModelStepBaseEvent): event_type: Id.FORWARD_MODEL_STEP_RUNNING_TYPE = Id.FORWARD_MODEL_STEP_RUNNING max_memory_usage: Union[int, None] = None current_memory_usage: Union[int, None] = None + cpu_seconds: float = 0.0 class ForwardModelStepSuccess(ForwardModelStepBaseEvent): diff --git a/src/_ert/forward_model_runner/cli.py b/src/_ert/forward_model_runner/cli.py index b50db013945..afa4aaa70e7 100644 --- a/src/_ert/forward_model_runner/cli.py +++ b/src/_ert/forward_model_runner/cli.py @@ -9,7 +9,7 @@ from datetime import datetime from _ert.forward_model_runner import reporting -from _ert.forward_model_runner.reporting.message import Finish, MemoryStatus +from _ert.forward_model_runner.reporting.message import Finish, ProcessTreeStatus from _ert.forward_model_runner.runner import ForwardModelRunner JOBS_FILE = "jobs.json" @@ -62,7 +62,7 @@ def _setup_logging(directory: str = "logs"): memory_csv_logger.addHandler(csv_handler) memory_csv_logger.setLevel(logging.INFO) # Write the CSV header to the file: - memory_csv_logger.info(MemoryStatus().csv_header()) + memory_csv_logger.info(ProcessTreeStatus().csv_header()) job_runner_logger.addHandler(handler) job_runner_logger.setLevel(logging.DEBUG) diff --git a/src/_ert/forward_model_runner/job.py b/src/_ert/forward_model_runner/job.py index c57c9c9a556..74179c58c4e 100644 --- a/src/_ert/forward_model_runner/job.py +++ b/src/_ert/forward_model_runner/job.py @@ -17,7 +17,7 @@ from .io import assert_file_executable from .reporting.message import ( Exited, - MemoryStatus, + ProcessTreeStatus, Running, Start, ) @@ -179,15 +179,16 @@ def ensure_file_handles_closed(): max_memory_usage = 0 while exit_code is None: - (memory_rss, oom_score) = _get_rss_and_oom_score_for_processtree(process) + (memory_rss, cpu_seconds, oom_score) = _get_processtree_data(process) max_memory_usage = max(memory_rss, max_memory_usage) yield Running( self, - MemoryStatus( + ProcessTreeStatus( rss=memory_rss, max_rss=max_memory_usage, fm_step_id=self.index, fm_step_name=self.job_data.get("name"), + cpu_seconds=cpu_seconds, oom_score=oom_score, ), ) @@ -349,9 +350,9 @@ def _check_target_file_is_written(self, target_file_mtime: int, timeout=5): return f"Could not find target_file:{target_file}" -def _get_rss_and_oom_score_for_processtree( +def _get_processtree_data( process: Process, -) -> Tuple[int, Optional[int]]: +) -> Tuple[int, float, Optional[int]]: """Obtain the oom_score (the Linux kernel uses this number to decide which process to kill first in out-of-memory siturations). @@ -370,14 +371,16 @@ def _get_rss_and_oom_score_for_processtree( oom_score = None # A value of None means that we have no information. memory_rss = 0 + cpu_seconds = 0.0 with contextlib.suppress(ValueError, FileNotFoundError): oom_score = int( Path(f"/proc/{process.pid}/oom_score").read_text(encoding="utf-8") ) with contextlib.suppress( ValueError, NoSuchProcess, AccessDenied, ZombieProcess, ProcessLookupError - ): + ), process.oneshot(): memory_rss = process.memory_info().rss + cpu_seconds = process.cpu_times().user with contextlib.suppress( NoSuchProcess, AccessDenied, ZombieProcess, ProcessLookupError @@ -399,7 +402,9 @@ def _get_rss_and_oom_score_for_processtree( if oom_score is not None else oom_score_child ) - with contextlib.suppress(NoSuchProcess, AccessDenied, ZombieProcess): + with contextlib.suppress( + NoSuchProcess, AccessDenied, ZombieProcess + ), child.oneshot(): memory_rss += child.memory_info().rss - - return (memory_rss, oom_score) + cpu_seconds += child.cpu_times().user + return (memory_rss, cpu_seconds, oom_score) diff --git a/src/_ert/forward_model_runner/reporting/event.py b/src/_ert/forward_model_runner/reporting/event.py index ece3d893408..8bf13dee238 100644 --- a/src/_ert/forward_model_runner/reporting/event.py +++ b/src/_ert/forward_model_runner/reporting/event.py @@ -173,6 +173,7 @@ def _job_handler(self, msg: Union[Start, Running, Exited]): **job_msg, max_memory_usage=msg.memory_status.max_rss, current_memory_usage=msg.memory_status.rss, + cpu_seconds=msg.memory_status.cpu_seconds, ) self._dump_event(event) diff --git a/src/_ert/forward_model_runner/reporting/file.py b/src/_ert/forward_model_runner/reporting/file.py index 4080067709c..1e1b3d4a65d 100644 --- a/src/_ert/forward_model_runner/reporting/file.py +++ b/src/_ert/forward_model_runner/reporting/file.py @@ -100,6 +100,7 @@ def report(self, msg: Message): job_status.update( max_memory_usage=msg.memory_status.max_rss, current_memory_usage=msg.memory_status.rss, + cpu_seconds=msg.memory_status.cpu_seconds, status=_JOB_STATUS_RUNNING, ) memory_logger.info(msg.memory_status) diff --git a/src/_ert/forward_model_runner/reporting/message.py b/src/_ert/forward_model_runner/reporting/message.py index dbd7ae07b84..ace3862a02b 100644 --- a/src/_ert/forward_model_runner/reporting/message.py +++ b/src/_ert/forward_model_runner/reporting/message.py @@ -34,8 +34,8 @@ class ChecksumDict(_ChecksumDictBase, total=False): @dataclasses.dataclass -class MemoryStatus: - """Holds memory information that can be represented as a line of CSV data""" +class ProcessTreeStatus: + """Holds processtree information that can be represented as a line of CSV data""" timestamp: str = "" fm_step_id: Optional[int] = None @@ -46,6 +46,8 @@ class MemoryStatus: max_rss: Optional[int] = None free: Optional[int] = None + cpu_seconds: float = 0.0 + oom_score: Optional[int] = None def __post_init__(self): @@ -119,7 +121,7 @@ def __init__(self, job): class Running(Message): - def __init__(self, job: "Job", memory_status: MemoryStatus): + def __init__(self, job: "Job", memory_status: ProcessTreeStatus): super().__init__(job) self.memory_status = memory_status diff --git a/src/ert/ensemble_evaluator/evaluator.py b/src/ert/ensemble_evaluator/evaluator.py index bb7f91cc05e..4afe8e97971 100644 --- a/src/ert/ensemble_evaluator/evaluator.py +++ b/src/ert/ensemble_evaluator/evaluator.py @@ -1,4 +1,5 @@ import asyncio +import datetime import logging import traceback from contextlib import asynccontextmanager, contextmanager @@ -47,6 +48,7 @@ ) from ert.ensemble_evaluator import identifiers as ids +from ._ensemble import FMStepSnapshot from ._ensemble import LegacyEnsemble as Ensemble from .config import EvaluatorServerConfig from .snapshot import EnsembleSnapshot @@ -161,12 +163,20 @@ async def _stopped_handler(self, events: Sequence[EnsembleSucceeded]) -> None: return max_memory_usage = -1 - for fm_step in self.ensemble.snapshot.get_all_fm_steps().values(): + for (real_id, _), fm_step in self.ensemble.snapshot.get_all_fm_steps().items(): + # Infer max memory usage memory_usage = fm_step.get(ids.MAX_MEMORY_USAGE) or "-1" max_memory_usage = max(int(memory_usage), max_memory_usage) + + if cpu_message := detect_overspent_cpu( + self.ensemble.reals[int(real_id)].num_cpu, real_id, fm_step + ): + logger.warning(cpu_message) + logger.info( f"Ensemble ran with maximum memory usage for a single realization job: {max_memory_usage}" ) + await self._append_message(self.ensemble.update_snapshot(events)) async def _cancelled_handler(self, events: Sequence[EnsembleCancelled]) -> None: @@ -425,3 +435,24 @@ async def run_and_get_successful_realizations(self) -> List[int]: def _get_ens_id(source: str) -> str: # the ens_id will be found at /ert/ensemble/ens_id/... return source.split("/")[3] + + +def detect_overspent_cpu(num_cpu: int, real_id: str, fm_step: FMStepSnapshot) -> str: + """Produces a message warning about misconfiguration of NUM_CPU if + so is detected. Returns an empty string if everything is ok.""" + now = datetime.datetime.now() + duration = ( + (fm_step.get(ids.END_TIME) or now) - (fm_step.get(ids.START_TIME) or now) + ).total_seconds() + if duration <= 0: + return "" + cpu_seconds = fm_step.get(ids.CPU_SECONDS) or 0.0 + parallelization_obtained = cpu_seconds / duration + if parallelization_obtained > num_cpu: + return ( + f"Misconfigured NUM_CPU, forward model step '{fm_step.get(ids.NAME)}' for " + f"realization {real_id} spent {cpu_seconds} cpu seconds " + f"with wall clock duration {duration:.1f} seconds, " + f"a factor of {parallelization_obtained:.2f}, while NUM_CPU was {num_cpu}." + ) + return "" diff --git a/src/ert/ensemble_evaluator/identifiers.py b/src/ert/ensemble_evaluator/identifiers.py index 31cccb84be3..918b11779ad 100644 --- a/src/ert/ensemble_evaluator/identifiers.py +++ b/src/ert/ensemble_evaluator/identifiers.py @@ -5,6 +5,7 @@ ERROR: Final = "error" INDEX: Final = "index" MAX_MEMORY_USAGE: Final = "max_memory_usage" +CPU_SECONDS: Final = "cpu_seconds" NAME: Final = "name" START_TIME: Final = "start_time" STATUS: Final = "status" diff --git a/src/ert/ensemble_evaluator/snapshot.py b/src/ert/ensemble_evaluator/snapshot.py index 00448fbd728..35e0157944f 100644 --- a/src/ert/ensemble_evaluator/snapshot.py +++ b/src/ert/ensemble_evaluator/snapshot.py @@ -346,6 +346,7 @@ def update_from_event( if type(event) is ForwardModelStepRunning: fm["current_memory_usage"] = event.current_memory_usage fm["max_memory_usage"] = event.max_memory_usage + fm["cpu_seconds"] = event.cpu_seconds if type(event) is ForwardModelStepStart: fm["stdout"] = event.std_out fm["stderr"] = event.std_err @@ -384,6 +385,7 @@ class FMStepSnapshot(TypedDict, total=False): index: Optional[str] current_memory_usage: Optional[int] max_memory_usage: Optional[int] + cpu_seconds: Optional[float] name: Optional[str] error: Optional[str] stdout: Optional[str] diff --git a/src/ert/resources/forward-models/res/script/ecl_run.py b/src/ert/resources/forward-models/res/script/ecl_run.py index 8f243c243ce..6975da5d632 100644 --- a/src/ert/resources/forward-models/res/script/ecl_run.py +++ b/src/ert/resources/forward-models/res/script/ecl_run.py @@ -1,4 +1,5 @@ import datetime +import glob import os import os.path import re @@ -9,15 +10,39 @@ from argparse import ArgumentParser from collections import namedtuple from contextlib import contextmanager, suppress +from pathlib import Path from random import random +from typing import List import resfo -from ecl_config import EclrunConfig +from ecl_config import EclConfig, EclrunConfig, Simulator from packaging import version +def ecl_output_has_license_error(ecl_output: str): + return ( + "LICENSE ERROR" in ecl_output + or "LICENSE FAILURE" in ecl_output + or "not allowed in license" in ecl_output + ) + + class EclError(RuntimeError): - pass + def failed_due_to_license_problems(self) -> bool: + # self.args[0] contains the multiline ERROR messages and SLAVE startup messages + if ecl_output_has_license_error(self.args[0]): + return True + if re.search(a_slave_failed_pattern, self.args[0]): + for match in re.finditer(slave_run_paths, self.args[0], re.MULTILINE): + (ecl_case_starts_with, ecl_case_dir) = match.groups() + for prt_file in glob.glob( + f"{ecl_case_dir}/{ecl_case_starts_with}*.PRT" + ): + if ecl_output_has_license_error( + Path(prt_file).read_text(encoding="utf-8") + ): + return True + return False def await_process_tee(process, *out_files) -> int: @@ -52,7 +77,14 @@ def await_process_tee(process, *out_files) -> int: EclipseResult = namedtuple("EclipseResult", "errors bugs") body_sub_pattern = r"(\s^\s@.+$)*" date_sub_pattern = r"\s+AT TIME\s+(?P\d+\.\d+)\s+DAYS\s+\((?P(.+)):\s*$" -error_pattern = rf"^\s@-- ERROR{date_sub_pattern}${body_sub_pattern}" +error_pattern_e100 = rf"^\s@-- ERROR{date_sub_pattern}${body_sub_pattern}" +error_pattern_e300 = rf"^\s@--Error${body_sub_pattern}" +slave_started_pattern = ( + rf"^\s@--MESSAGE{date_sub_pattern}\s^\s@\s+STARTING SLAVE.+${body_sub_pattern}" +) +a_slave_failed_pattern = r"\s@\s+SLAVE RUN.*HAS STOPPED WITH AN ERROR CONDITION.\s*" +slave_run_paths = r"^\s@\s+STARTING SLAVE\s+[^ ]+RUNNING \([^ ]\)\s*$" +slave_run_paths = r"\s@\s+STARTING SLAVE .* RUNNING (\w+)\s*^\s@\s+ON HOST.*IN DIRECTORY\s*^\s@\s+(.*)" def make_LSB_MCPU_machine_list(LSB_MCPU_HOSTS): @@ -209,7 +241,12 @@ class EclRun: """ def __init__( - self, ecl_case, sim, num_cpu=1, check_status=True, summary_conversion=False + self, + ecl_case: str, + sim: Simulator, + num_cpu: int = 1, + check_status: bool = True, + summary_conversion: bool = False, ): self.sim = sim self.check_status = check_status @@ -243,6 +280,10 @@ def runPath(self): def baseName(self): return self.base_name + @property + def prt_path(self): + return Path(self.run_path) / (self.baseName() + ".PRT") + def numCpu(self): return self.num_cpu @@ -392,10 +433,8 @@ def runEclipse(self, eclrun_config=None, retries_left=3, backoff_sleep=None): try: self.assertECLEND() - except RuntimeError as err: - if ( - "LICENSE ERROR" in err.args[0] or "LICENSE FAILURE" in err.args[0] - ) and retries_left > 0: + except EclError as err: + if err.failed_due_to_license_problems() and retries_left > 0: time_to_wait = backoff_sleep + int( random() * self.LICENSE_RETRY_STAGGER_FACTOR ) @@ -414,7 +453,6 @@ def runEclipse(self, eclrun_config=None, retries_left=3, backoff_sleep=None): return else: raise err from None - if self.num_cpu > 1: self.summary_block() @@ -455,14 +493,26 @@ def summary_block(self): return ecl_sum def assertECLEND(self): + tail_length = 5000 result = self.readECLEND() if result.errors > 0: error_list = self.parseErrors() sep = "\n\n...\n\n" - error_msg = sep.join(error_list) + error_and_slave_msg = sep.join(error_list) + extra_message = "" + error_messages = [ + error for error in error_list if not "STARTING SLAVE" in str(error) + ] + if result.errors != len(error_messages): + extra_message = ( + f"\n\nWarning, mismatch between stated Error count ({result.errors}) " + f"and number of ERROR messages found in PRT ({len(error_messages)})." + f"\n\nTail ({tail_length} bytes) of PRT-file {self.prt_path}:\n\n" + ) + tail_textfile(self.prt_path, 5000) + raise EclError( "Eclipse simulation failed with:" - f"{result.errors:d} errors:\n\n{error_msg}" + f"{result.errors:d} errors:\n\n{error_and_slave_msg}{extra_message}" ) if result.bugs > 0: @@ -474,7 +524,7 @@ def readECLEND(self): report_file = os.path.join(self.run_path, f"{self.base_name}.ECLEND") if not os.path.isfile(report_file): - report_file = os.path.join(self.run_path, f"{self.base_name}.PRT") + report_file = self.prt_path errors = None bugs = None @@ -494,28 +544,31 @@ def readECLEND(self): return EclipseResult(errors=errors, bugs=bugs) - def parseErrors(self): - prt_file = os.path.join(self.runPath(), f"{self.baseName()}.PRT") + def parseErrors(self) -> List[str]: + """Extract multiline ERROR messages from the PRT file""" error_list = [] - error_regexp = re.compile(error_pattern, re.MULTILINE) - with open(prt_file, "r", encoding="utf-8") as filehandle: + error_e100_regexp = re.compile(error_pattern_e100, re.MULTILINE) + error_e300_regexp = re.compile(error_pattern_e300, re.MULTILINE) + slave_started_regexp = re.compile(slave_started_pattern, re.MULTILINE) + with open(self.prt_path, "r", encoding="utf-8") as filehandle: content = filehandle.read() - offset = 0 - while True: - match = error_regexp.search(content[offset:]) - if match: - error_list.append( - content[offset + match.start() : offset + match.end()] - ) - offset += match.end() - else: - break + for regexp in [error_e100_regexp, error_e300_regexp, slave_started_regexp]: + offset = 0 + while True: + match = regexp.search(content[offset:]) + if match: + error_list.append( + content[offset + match.start() : offset + match.end()] + ) + offset += match.end() + else: + break return error_list -def run(config, argv): +def run(config: EclConfig, argv): parser = ArgumentParser() parser.add_argument("ecl_case") parser.add_argument("-v", "--version", dest="version", type=str) @@ -557,3 +610,14 @@ def run(config, argv): except EclError as msg: print(msg, file=sys.stderr) sys.exit(-1) + + +def tail_textfile(file_path: Path, num_chars: int) -> str: + if not file_path.exists(): + return f"No output file {file_path}" + with open(file_path, encoding="utf-8") as file: + file.seek(0, 2) + file_end_position = file.tell() + seek_position = max(0, file_end_position - num_chars) + file.seek(seek_position) + return file.read()[-num_chars:] diff --git a/src/ert/scheduler/driver.py b/src/ert/scheduler/driver.py index a5a9f124d14..f83071ed6c7 100644 --- a/src/ert/scheduler/driver.py +++ b/src/ert/scheduler/driver.py @@ -83,7 +83,8 @@ async def _execute_with_retry( total_attempts: int = 1, retry_interval: float = 1.0, driverlogger: Optional[logging.Logger] = None, - exit_on_msgs: Iterable[str] = (), + return_on_msgs: Iterable[str] = (), + error_on_msgs: Iterable[str] = (), log_to_debug: Optional[bool] = True, ) -> Tuple[bool, str]: _logger = driverlogger or logging.getLogger(__name__) @@ -121,11 +122,16 @@ async def _execute_with_retry( f'Command "{shlex.join(cmd_with_args)}" succeeded with {outputs}' ) return True, stdout.decode(errors="ignore").strip() - elif exit_on_msgs and any( - exit_on_msg in stderr.decode(errors="ignore") - for exit_on_msg in exit_on_msgs + elif return_on_msgs and any( + return_on_msg in stderr.decode(errors="ignore") + for return_on_msg in return_on_msgs ): return True, stderr.decode(errors="ignore").strip() + elif error_on_msgs and any( + error_on_msg in stderr.decode(errors="ignore") + for error_on_msg in error_on_msgs + ): + return False, stderr.decode(errors="ignore").strip() elif process.returncode in retry_codes: error_message = outputs elif process.returncode in accept_codes: diff --git a/src/ert/scheduler/lsf_driver.py b/src/ert/scheduler/lsf_driver.py index cd2f49a9ca6..7c7e364a2dc 100644 --- a/src/ert/scheduler/lsf_driver.py +++ b/src/ert/scheduler/lsf_driver.py @@ -94,6 +94,7 @@ class RunningJob: LSF_INFO_JSON_FILENAME = "lsf_info.json" FLAKY_SSH_RETURNCODE = 255 JOB_ALREADY_FINISHED_BKILL_MSG = "Job has already finished" +BSUB_FAILURE_MESSAGES = ("Job not submitted",) def _parse_jobs_dict(jobs: Mapping[str, JobState]) -> dict[str, AnyJob]: @@ -340,6 +341,7 @@ async def submit( retry_codes=(FLAKY_SSH_RETURNCODE,), total_attempts=self._bsub_retries, retry_interval=self._sleep_time_between_cmd_retries, + error_on_msgs=BSUB_FAILURE_MESSAGES, ) if not process_success: self._job_error_message_by_iens[iens] = process_message @@ -392,7 +394,7 @@ async def kill(self, iens: int) -> None: retry_codes=(FLAKY_SSH_RETURNCODE,), total_attempts=3, retry_interval=self._sleep_time_between_cmd_retries, - exit_on_msgs=(JOB_ALREADY_FINISHED_BKILL_MSG), + return_on_msgs=(JOB_ALREADY_FINISHED_BKILL_MSG), ) await asyncio.create_subprocess_shell( f"sleep {self._sleep_time_between_bkills}; {self._bkill_cmd} -s SIGKILL {job_id}", diff --git a/tests/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py b/tests/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py index 11f01658b84..668527b6017 100644 --- a/tests/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py +++ b/tests/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py @@ -1,8 +1,11 @@ import asyncio +import datetime from functools import partial from typing import cast import pytest +from hypothesis import given +from hypothesis import strategies as st from _ert.events import ( EESnapshot, @@ -17,7 +20,13 @@ event_to_json, ) from _ert.forward_model_runner.client import Client -from ert.ensemble_evaluator import EnsembleEvaluator, EnsembleSnapshot, Monitor +from ert.ensemble_evaluator import ( + EnsembleEvaluator, + EnsembleSnapshot, + FMStepSnapshot, + Monitor, +) +from ert.ensemble_evaluator.evaluator import detect_overspent_cpu from ert.ensemble_evaluator.state import ( ENSEMBLE_STATE_STARTED, ENSEMBLE_STATE_UNKNOWN, @@ -465,3 +474,30 @@ async def test_ensure_multi_level_events_in_order(evaluator_to_use): if "reals" in event.snapshot: assert ensemble_state == ENSEMBLE_STATE_STARTED ensemble_state = event.snapshot.get("status", ensemble_state) + + +@given( + num_cpu=st.integers(min_value=1, max_value=64), + start=st.datetimes(), + duration=st.integers(min_value=-1, max_value=10000), + cpu_seconds=st.floats(min_value=0), +) +def test_overspent_cpu_is_logged( + num_cpu: int, + start: datetime.datetime, + duration: int, + cpu_seconds: float, +): + message = detect_overspent_cpu( + num_cpu, + "dummy", + FMStepSnapshot( + start_time=start, + end_time=start + datetime.timedelta(seconds=duration), + cpu_seconds=cpu_seconds, + ), + ) + if duration > 0 and cpu_seconds / duration > num_cpu: + assert "Misconfigured NUM_CPU" in message + else: + assert "NUM_CPU" not in message diff --git a/tests/unit_tests/forward_model_runner/test_event_reporter.py b/tests/unit_tests/forward_model_runner/test_event_reporter.py index 71596fdee74..8030580c8c1 100644 --- a/tests/unit_tests/forward_model_runner/test_event_reporter.py +++ b/tests/unit_tests/forward_model_runner/test_event_reporter.py @@ -22,7 +22,7 @@ Exited, Finish, Init, - MemoryStatus, + ProcessTreeStatus, Running, Start, ) @@ -124,7 +124,7 @@ def test_report_with_running_message_argument(unused_tcp_port): lines = [] with _mock_ws_thread(host, unused_tcp_port, lines): reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0)) - reporter.report(Running(job1, MemoryStatus(max_rss=100, rss=10))) + reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10))) reporter.report(Finish()) assert len(lines) == 1 @@ -143,7 +143,7 @@ def test_report_only_job_running_for_successful_run(unused_tcp_port): lines = [] with _mock_ws_thread(host, unused_tcp_port, lines): reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0)) - reporter.report(Running(job1, MemoryStatus(max_rss=100, rss=10))) + reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10))) reporter.report(Finish()) assert len(lines) == 1 @@ -158,7 +158,7 @@ def test_report_with_failed_finish_message_argument(unused_tcp_port): lines = [] with _mock_ws_thread(host, unused_tcp_port, lines): reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0)) - reporter.report(Running(job1, MemoryStatus(max_rss=100, rss=10))) + reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10))) reporter.report(Finish().with_error("massive_failure")) assert len(lines) == 1 @@ -202,9 +202,9 @@ def mock_send(msg): "_ert.forward_model_runner.client.Client.send", lambda x, y: mock_send(y) ): reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0)) - reporter.report(Running(job1, MemoryStatus(max_rss=100, rss=10))) - reporter.report(Running(job1, MemoryStatus(max_rss=1100, rss=10))) - reporter.report(Running(job1, MemoryStatus(max_rss=1100, rss=10))) + reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10))) + reporter.report(Running(job1, ProcessTreeStatus(max_rss=1100, rss=10))) + reporter.report(Running(job1, ProcessTreeStatus(max_rss=1100, rss=10))) # set _stop_timestamp reporter.report(Finish()) if reporter._event_publisher_thread.is_alive(): @@ -240,9 +240,9 @@ def send_func(msg): patched_send.side_effect = send_func reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0)) - reporter.report(Running(job1, MemoryStatus(max_rss=100, rss=10))) - reporter.report(Running(job1, MemoryStatus(max_rss=200, rss=10))) - reporter.report(Running(job1, MemoryStatus(max_rss=300, rss=10))) + reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10))) + reporter.report(Running(job1, ProcessTreeStatus(max_rss=200, rss=10))) + reporter.report(Running(job1, ProcessTreeStatus(max_rss=300, rss=10))) _wait_until( condition=lambda: patched_send.call_count == 3, @@ -278,8 +278,8 @@ def mock_send(msg): lines = [] with _mock_ws_thread(host, unused_tcp_port, lines): reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0)) - reporter.report(Running(job1, MemoryStatus(max_rss=100, rss=10))) - reporter.report(Running(job1, MemoryStatus(max_rss=200, rss=10))) + reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10))) + reporter.report(Running(job1, ProcessTreeStatus(max_rss=200, rss=10))) # sleep until both Running events have been received _wait_until( @@ -291,14 +291,14 @@ def mock_send(msg): with patch( "_ert.forward_model_runner.client.Client.send", lambda x, y: mock_send(y) ): - reporter.report(Running(job1, MemoryStatus(max_rss=300, rss=10))) + reporter.report(Running(job1, ProcessTreeStatus(max_rss=300, rss=10))) # Make sure the publisher thread exits because it got # ClientConnectionClosedOK. If it hangs it could indicate that the # exception is not caught/handled correctly if reporter._event_publisher_thread.is_alive(): reporter._event_publisher_thread.join() - reporter.report(Running(job1, MemoryStatus(max_rss=400, rss=10))) + reporter.report(Running(job1, ProcessTreeStatus(max_rss=400, rss=10))) reporter.report(Finish()) # set _stop_timestamp was not set to None since the reporter finished on time diff --git a/tests/unit_tests/forward_model_runner/test_file_reporter.py b/tests/unit_tests/forward_model_runner/test_file_reporter.py index ba9de832387..4ac53b95ee2 100644 --- a/tests/unit_tests/forward_model_runner/test_file_reporter.py +++ b/tests/unit_tests/forward_model_runner/test_file_reporter.py @@ -9,7 +9,7 @@ Exited, Finish, Init, - MemoryStatus, + ProcessTreeStatus, Running, Start, ) @@ -130,7 +130,10 @@ def test_report_with_failed_exit_message_argument(reporter): @pytest.mark.usefixtures("use_tmpdir") def test_report_with_running_message_argument(reporter): - msg = Running(Job({"name": "job1"}, 0), MemoryStatus(max_rss=100, rss=10)) + msg = Running( + Job({"name": "job1"}, 0), + ProcessTreeStatus(max_rss=100, rss=10, cpu_seconds=1.1), + ) reporter.status_dict = reporter._init_job_status_dict(msg.timestamp, 0, [msg.job]) reporter.report(msg) @@ -144,6 +147,7 @@ def test_report_with_running_message_argument(reporter): assert ( '"current_memory_usage": 10' in content ), "status.json missing current_memory_usage" + assert '"cpu_seconds": 1.1' in content, "status.json missing cpu_seconds" @pytest.mark.usefixtures("use_tmpdir") diff --git a/tests/unit_tests/forward_model_runner/test_job.py b/tests/unit_tests/forward_model_runner/test_job.py index d217087061c..01b65aa6752 100644 --- a/tests/unit_tests/forward_model_runner/test_job.py +++ b/tests/unit_tests/forward_model_runner/test_job.py @@ -1,3 +1,4 @@ +import contextlib import os import pathlib import stat @@ -11,7 +12,7 @@ import numpy as np import pytest -from _ert.forward_model_runner.job import Job, _get_rss_and_oom_score_for_processtree +from _ert.forward_model_runner.job import Job, _get_processtree_data from _ert.forward_model_runner.reporting.message import Exited, Running, Start @@ -40,6 +41,56 @@ def test_run_with_process_failing( next(run) +@pytest.mark.flaky(reruns=5) +@pytest.mark.integration_test +def test_cpu_seconds_can_detect_multiprocess(): + """Run a job that sets of two simultaneous processes that + each run for 1 second. We should be able to detect the total + cpu seconds consumed to be roughly 2 seconds. + + The test is flaky in that it tries to gather cpu_seconds data while + the subprocesses are running. On a loaded CPU this is not very robust, + but the most important catch is to be able to obtain a cpu_second + number that is larger than the busy-wait times of the individual + sub-processes. + """ + pythonscript = "busy.py" + with open(pythonscript, "w", encoding="utf-8") as pyscript: + pyscript.write( + textwrap.dedent( + """\ + import time + now = time.time() + while time.time() < now + 1: + pass""" + ) + ) + scriptname = "saturate_cpus.sh" + with open(scriptname, "w", encoding="utf-8") as script: + script.write( + textwrap.dedent( + """\ + #!/bin/sh + python busy.py & + python busy.py""" + ) + ) + executable = os.path.realpath(scriptname) + os.chmod(scriptname, stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG) + job = Job( + { + "executable": executable, + }, + 0, + ) + job.MEMORY_POLL_PERIOD = 0.1 + cpu_seconds = 0.0 + for status in job.run(): + if isinstance(status, Running): + cpu_seconds = max(cpu_seconds, status.memory_status.cpu_seconds) + assert 1.4 < cpu_seconds < 2.2 + + @pytest.mark.integration_test @pytest.mark.flaky(reruns=5) @pytest.mark.usefixtures("use_tmpdir") @@ -158,19 +209,39 @@ def test_memory_profile_in_running_events(): assert min(timedeltas).total_seconds() >= 0.01 -@pytest.mark.skipif(sys.platform.startswith("darwin"), reason="No oom_score on MacOS") -def test_oom_score_is_max_over_processtree(monkeypatch): - @dataclass - class MockedProcess: - """A very lightweight mocked psutil.Process object""" +@dataclass +class CpuTimes: + """Mocks the response of psutil.Process().cpu_times()""" + + user: float - pid: int - memory_info = MagicMock() - def children(self, recursive: bool = True): - if self.pid == 123: - return [MockedProcess(124)] +@dataclass +class MockedProcess: + """Mocks psutil.Process()""" + pid: int + memory_info = MagicMock() + + def cpu_times(self): + return CpuTimes(user=self.pid / 10.0) + + def children(self, recursive: bool): + assert recursive + if self.pid == 123: + return [MockedProcess(124)] + + def oneshot(self): + return contextlib.nullcontext() + + +def test_cpu_seconds_for_process_with_children(): + (_, cpu_seconds, _) = _get_processtree_data(MockedProcess(123)) + assert cpu_seconds == 123 / 10.0 + 124 / 10.0 + + +@pytest.mark.skipif(sys.platform.startswith("darwin"), reason="No oom_score on MacOS") +def test_oom_score_is_max_over_processtree(): def read_text_side_effect(self: pathlib.Path, *args, **kwargs): if self.absolute() == pathlib.Path("/proc/123/oom_score"): return "234" @@ -179,7 +250,7 @@ def read_text_side_effect(self: pathlib.Path, *args, **kwargs): with patch("pathlib.Path.read_text", autospec=True) as mocked_read_text: mocked_read_text.side_effect = read_text_side_effect - (_, oom_score) = _get_rss_and_oom_score_for_processtree(MockedProcess(123)) + (_, _, oom_score) = _get_processtree_data(MockedProcess(123)) assert oom_score == 456 diff --git a/tests/unit_tests/resources/test_ecl_run_new_config.py b/tests/unit_tests/resources/test_ecl_run_new_config.py index 4349dc0b571..59ff4ccaa51 100644 --- a/tests/unit_tests/resources/test_ecl_run_new_config.py +++ b/tests/unit_tests/resources/test_ecl_run_new_config.py @@ -267,3 +267,347 @@ def test_summary_block(source_root): erun.runEclipse(eclrun_config=ecl_config.EclrunConfig(econfig, "2019.3")) assert erun.summary_block() is not None + + +@pytest.mark.usefixtures("use_tmpdir") +def test_ecl100_license_error_is_caught(): + prt_error = """\ + @--MESSAGE AT TIME 0.0 DAYS ( 1-JAN-2000): + @ CHECKING FOR LICENSES + + @-- ERROR AT TIME 0.0 DAYS ( 1-JAN-2000): + @ LICENSE ERROR -1 FOR MULTI-SEGMENT WELL OPTION + @ FEATURE IS INVALID. CHECK YOUR LICENSE FILE AND + @ THE LICENSE LOG FILE""" + eclend = """\ + Error summary + Comments 0 + Warnings 0 + Problems 0 + Errors 1 + Bugs 0 + Final cpu 0.00 elapsed 0.08""" + + Path("FOO.PRT").write_text(prt_error + "\n" + eclend, encoding="utf-8") + Path("FOO.ECLEND").write_text(eclend, encoding="utf-8") + Path("FOO.DATA").write_text("", encoding="utf-8") + + run = ecl_run.EclRun("FOO.DATA", "dummysimulatorobject") + with pytest.raises(ecl_run.EclError) as exception_info: + run.assertECLEND() + assert exception_info.value.failed_due_to_license_problems() + + +@pytest.mark.usefixtures("use_tmpdir") +def test_ecl100_crash_is_not_mistaken_as_license_trouble(): + prt_error = """\ + @--MESSAGE AT TIME 0.0 DAYS ( 1-JAN-2000): + @ CHECKING FOR LICENSES + + @-- ERROR AT TIME 0.0 DAYS ( 1-JAN-2000): + @ NON-LINEAR CONVERGENCE FAILURE""" + eclend = """\ + Error summary + Comments 0 + Warnings 0 + Problems 0 + Errors 1 + Bugs 0 + Final cpu 0.00 elapsed 0.08""" + + Path("FOO.PRT").write_text(prt_error + "\n" + eclend, encoding="utf-8") + Path("FOO.ECLEND").write_text(eclend, encoding="utf-8") + Path("FOO.DATA").write_text("", encoding="utf-8") + + run = ecl_run.EclRun("FOO.DATA", "dummysimulatorobject") + with pytest.raises(ecl_run.EclError) as exception_info: + run.assertECLEND() + assert not exception_info.value.failed_due_to_license_problems() + + +@pytest.mark.usefixtures("use_tmpdir") +def test_ecl300_license_error_is_caught(): + prt_error = """\ + @--Message:The message service has been activated + @--Message:Checking for licenses + @--Message:Checking for licenses + @--Message:Checking for licenses + @--Error + @ ECLIPSE option not allowed in license + @ Please ask for a new license + @ Run stopping + 0 Mbytes of storage required + No active cells found + 249 Mbytes (image size) +""" + eclend = """\ + Error summary + Comments 1 + Warnings 2 + Problems 0 + Errors 1 + Bugs 0 + Final cpu 0.01 elapsed 0.02 + Emergency stop called from routine ZSTOPE""" + + Path("FOO.PRT").write_text(prt_error + "\n" + eclend, encoding="utf-8") + Path("FOO.ECLEND").write_text(eclend, encoding="utf-8") + Path("FOO.DATA").write_text("", encoding="utf-8") + + run = ecl_run.EclRun("FOO.DATA", "dummysimulatorobject") + with pytest.raises(ecl_run.EclError) as exception_info: + run.assertECLEND() + assert exception_info.value.failed_due_to_license_problems() + + +@pytest.mark.usefixtures("use_tmpdir") +def test_ecl300_crash_is_not_mistaken_as_license_trouble(): + prt_error = """\ + @--Message:The message service has been activated + @--Message:Checking for licenses + @--Message:Checking for licenses + @--Message:Checking for licenses + @ Run stopping + 0 Mbytes of storage required + No active cells found + 249 Mbytes (image size) +""" + eclend = """\ + Error summary + Comments 1 + Warnings 2 + Problems 0 + Errors 1 + Bugs 0 + Final cpu 0.01 elapsed 0.02 + Emergency stop called from routine ZSTOPE""" + + Path("FOO.PRT").write_text(prt_error + "\n" + eclend, encoding="utf-8") + Path("FOO.ECLEND").write_text(eclend, encoding="utf-8") + Path("FOO.DATA").write_text("", encoding="utf-8") + + run = ecl_run.EclRun("FOO.DATA", "dummysimulatorobject") + with pytest.raises(ecl_run.EclError) as exception_info: + run.assertECLEND() + assert not exception_info.value.failed_due_to_license_problems() + + +@pytest.mark.usefixtures("use_tmpdir") +def test_license_error_in_slave_is_caught(): + """If a coupled Eclipse model fails in one of the slave runs + due to license issues, there is no trace of licence in the master PRT file. + + The master PRT file must be trace for the paths to the SLAVE runs + and then those PRT files must be parsed. + + Note that the name of the DATA file is truncated in the MESSAGE listing + the slaves. + """ + Path("slave1").mkdir() + Path("slave2").mkdir() + master_prt_error = f"""\ + @--MESSAGE AT TIME 0.0 DAYS ( 1-JAN-2000): + @ THIS IS JUST A MESSAGE, NOTHING ELSE + @--MESSAGE AT TIME 0.0 DAYS ( 1-JAN-2000): + @ STARTING SLAVE SLAVE1 RUNNING EIGHTCEL + @ ON HOST localhost IN DIRECTORY + @ {os.getcwd()}/slave1 + @--MESSAGE AT TIME 0.0 DAYS ( 1-JAN-2000): + @ STARTING SLAVE SLAVE2 RUNNING EIGHTCEL + @ ON HOST localhost IN DIRECTORY + @ {os.getcwd()}/slave2 + + + + @-- ERROR AT TIME 0.0 DAYS ( 1-JAN-2000): + @ SLAVE RUN SLAVE2 HAS STOPPED WITH AN ERROR CONDITION. + @ MASTER RUN AND REMAINING SLAVES WILL ALSO STOP. + """ + master_eclend = """ + Error summary + Comments 1 + Warnings 1 + Problems 0 + Errors 1 + Bugs 0""" + + Path("EIGHTCELLS_MASTER.PRT").write_text( + master_prt_error + "\n" + master_eclend, encoding="utf-8" + ) + Path("EIGHTCELLS_MASTER.ECLEND").write_text(master_eclend, encoding="utf-8") + + slave_prt_error = """\ + @-- ERROR AT TIME 0.0 DAYS ( 1-JAN-2000): + @ LICENSE ERROR -15 FOR MULTI-SEGMENT WELL OPTION + @ FEATURE IS INVALID. CHECK YOUR LICENSE FILE AND + @ THE LICENSE LOG FILE + """ + Path("slave1/EIGHTCELLS_SLAVE.PRT").write_text("", encoding="utf-8") + Path("slave2/EIGHTCELLS_SLAVE.PRT").write_text(slave_prt_error, encoding="utf-8") + Path("EIGHTCELLS_MASTER.DATA").write_text("", encoding="utf-8") + + run = ecl_run.EclRun("EIGHTCELLS_MASTER.DATA", "dummysimulatorobject") + with pytest.raises(ecl_run.EclError) as exception_info: + run.assertECLEND() + assert exception_info.value.failed_due_to_license_problems() + + +@pytest.mark.usefixtures("use_tmpdir") +def test_crash_in_slave_is_not_mistaken_as_license(): + Path("slave1").mkdir() + Path("slave2").mkdir() + master_prt_error = f"""\ + @--MESSAGE AT TIME 0.0 DAYS ( 1-JAN-2000): + @ THIS IS JUST A MESSAGE, NOTHING ELSE + @--MESSAGE AT TIME 0.0 DAYS ( 1-JAN-2000): + @ STARTING SLAVE SLAVE1 RUNNING EIGHTCEL + @ ON HOST localhost IN DIRECTORY + @ {os.getcwd()}/slave1 + @--MESSAGE AT TIME 0.0 DAYS ( 1-JAN-2000): + @ STARTING SLAVE SLAVE2 RUNNING EIGHTCEL + @ ON HOST localhost IN DIRECTORY + @ {os.getcwd()}/slave2 + + + + @-- ERROR AT TIME 0.0 DAYS ( 1-JAN-2000): + @ SLAVE RUN SLAVE2 HAS STOPPED WITH AN ERROR CONDITION. + @ MASTER RUN AND REMAINING SLAVES WILL ALSO STOP. + """ + master_eclend = """ + Error summary + Comments 1 + Warnings 1 + Problems 0 + Errors 1 + Bugs 0""" + + Path("EIGHTCELLS_MASTER.PRT").write_text( + master_prt_error + "\n" + master_eclend, encoding="utf-8" + ) + Path("EIGHTCELLS_MASTER.ECLEND").write_text(master_eclend, encoding="utf-8") + + slave_prt_error = """\ + @-- ERROR AT TIME 0.0 DAYS ( 1-JAN-2000): + @ NON-LINEAR CONVERGENCE FAILURE + """ + Path("slave1/EIGHTCELLS_SLAVE.PRT").write_text("", encoding="utf-8") + Path("slave2/EIGHTCELLS_SLAVE.PRT").write_text(slave_prt_error, encoding="utf-8") + Path("EIGHTCELLS_MASTER.DATA").write_text("", encoding="utf-8") + + run = ecl_run.EclRun("EIGHTCELLS_MASTER.DATA", "dummysimulatorobject") + with pytest.raises(ecl_run.EclError) as exception_info: + run.assertECLEND() + assert not exception_info.value.failed_due_to_license_problems() + + +@pytest.mark.usefixtures("use_tmpdir") +def test_too_few_parsed_error_messages_gives_warning(): + prt_error = """\ + @--MESSAGE AT TIME 0.0 DAYS ( 1-JAN-2000): + @ THIS IS JUST A MESSAGE, NOTHING ELSE""" + eclend = """ + Error summary + Comments 0 + Warnings 0 + Problems 0 + Errors 1 + Bugs 0""" + + Path("ECLCASE.PRT").write_text(prt_error + "\n" + eclend, encoding="utf-8") + Path("ECLCASE.ECLEND").write_text(eclend, encoding="utf-8") + + Path("ECLCASE.DATA").write_text("", encoding="utf-8") + + run = ecl_run.EclRun("ECLCASE.DATA", "dummysimulatorobject") + with pytest.raises(ecl_run.EclError) as exception_info: + run.assertECLEND() + assert "Warning, mismatch between stated Error count" in str(exception_info.value) + + +@pytest.mark.usefixtures("use_tmpdir") +def test_tail_of_prt_file_is_included_when_error_count_inconsistency(): + prt_error = ( + "this_should_not_be_included " + + "\n" * 10000 + + """ + this_should_be_included + + @--MESSAGE AT TIME 0.0 DAYS ( 1-JAN-2000): + @ THIS IS JUST A MESSAGE, NOTHING ELSE""" + ) + eclend = """ + Error summary + Comments 0 + Warnings 0 + Problems 0 + Errors 1 + Bugs 0""" + + Path("ECLCASE.PRT").write_text(prt_error + "\n" + eclend, encoding="utf-8") + Path("ECLCASE.ECLEND").write_text(eclend, encoding="utf-8") + + Path("ECLCASE.DATA").write_text("", encoding="utf-8") + + run = ecl_run.EclRun("ECLCASE.DATA", "dummysimulatorobject") + with pytest.raises(ecl_run.EclError) as exception_info: + run.assertECLEND() + assert "this_should_be_included" in str(exception_info.value) + assert "this_should_not_be_included" not in str(exception_info.value) + + +@pytest.mark.usefixtures("use_tmpdir") +def test_correct_number_of_parsed_error_messages_gives_no_warning(): + prt_error = """\ + @-- ERROR AT TIME 0.0 DAYS ( 1-JAN-2000): + @ THIS IS A DUMMY ERROR MESSAGE""" + eclend = """ + Error summary + Comments 0 + Warnings 0 + Problems 0 + Errors 1 + Bugs 0""" + + Path("ECLCASE.PRT").write_text(prt_error + "\n" + eclend, encoding="utf-8") + Path("ECLCASE.ECLEND").write_text(eclend, encoding="utf-8") + + Path("ECLCASE.DATA").write_text("", encoding="utf-8") + + run = ecl_run.EclRun("ECLCASE.DATA", "dummysimulatorobject") + with pytest.raises(ecl_run.EclError) as exception_info: + run.assertECLEND() + assert "Warning, mismatch between stated Error count" not in str( + exception_info.value + ) + + +@pytest.mark.usefixtures("use_tmpdir") +def test_slave_started_message_are_not_counted_as_errors(): + prt_error = f"""\ + @-- ERROR AT TIME 0.0 DAYS ( 1-JAN-2000): + @ THIS IS A DUMMY ERROR MESSAGE + + @--MESSAGE AT TIME 0.0 DAYS ( 1-JAN-2000): + @ STARTING SLAVE SLAVE1 RUNNING EIGHTCEL + @ ON HOST localhost IN DIRECTORY + @ {os.getcwd()}/slave1""" + eclend = """ + Error summary + Comments 0 + Warnings 0 + Problems 0 + Errors 1 + Bugs 0""" + + Path("ECLCASE.PRT").write_text(prt_error + "\n" + eclend, encoding="utf-8") + Path("ECLCASE.ECLEND").write_text(eclend, encoding="utf-8") + + Path("ECLCASE.DATA").write_text("", encoding="utf-8") + + run = ecl_run.EclRun("ECLCASE.DATA", "dummysimulatorobject") + with pytest.raises(ecl_run.EclError) as exception_info: + run.assertECLEND() + assert "Warning, mismatch between stated Error count" not in str( + exception_info.value + ) diff --git a/tests/unit_tests/scheduler/test_lsf_driver.py b/tests/unit_tests/scheduler/test_lsf_driver.py index 9c2cbe9cbd2..bd701b4a203 100644 --- a/tests/unit_tests/scheduler/test_lsf_driver.py +++ b/tests/unit_tests/scheduler/test_lsf_driver.py @@ -560,6 +560,45 @@ async def test_that_bsub_will_retry_and_fail( await driver.submit(0, "sleep 10") +@pytest.mark.parametrize( + ("exit_code, error_msg"), + [ + # All these have been manually obtained on the command line by perturbing the command arguments to bsub: + (255, "No such queue. Job not submitted"), + (255, "Too many processors requested. Job not submitted."), + (255, 'Error near "select" : duplicate section. Job not submitted.'), + ( + 255, + "Error in select section: Expected number, string, " + 'name, or "(" but found end of section. Job not submitted.', + ), + ( + 255, + "Error with :" + " '&' cannot be used in the resource requirement section. Job not submitted.", + ), + (255, "Error in rusage section. Job not submitted."), + (255, "Job not submitted."), + ], +) +async def test_that_bsub_will_fail_without_retries( + monkeypatch, tmp_path, exit_code, error_msg +): + monkeypatch.chdir(tmp_path) + bin_path = Path("bin") + bin_path.mkdir() + monkeypatch.setenv("PATH", f"{bin_path}:{os.environ['PATH']}") + bsub_path = bin_path / "bsub" + bsub_path.write_text( + f'#!/bin/sh\necho . >> bsubcalls\necho "{error_msg}" >&2\nexit {exit_code}' + ) + bsub_path.chmod(bsub_path.stat().st_mode | stat.S_IEXEC) + driver = LsfDriver() + with pytest.raises(RuntimeError): + await driver.submit(0, "sleep 10") + assert len(Path("bsubcalls").read_text(encoding="utf-8").strip()) == 1 + + @pytest.mark.parametrize( ("exit_code, error_msg"), [