Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Version 11.0 havb #8793

Merged
merged 9 commits into from
Sep 25, 2024
1 change: 1 addition & 0 deletions src/_ert/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class ForwardModelStepRunning(ForwardModelStepBaseEvent):
event_type: Id.FORWARD_MODEL_STEP_RUNNING_TYPE = Id.FORWARD_MODEL_STEP_RUNNING
max_memory_usage: Union[int, None] = None
current_memory_usage: Union[int, None] = None
cpu_seconds: float = 0.0


class ForwardModelStepSuccess(ForwardModelStepBaseEvent):
Expand Down
4 changes: 2 additions & 2 deletions src/_ert/forward_model_runner/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from datetime import datetime

from _ert.forward_model_runner import reporting
from _ert.forward_model_runner.reporting.message import Finish, MemoryStatus
from _ert.forward_model_runner.reporting.message import Finish, ProcessTreeStatus
from _ert.forward_model_runner.runner import ForwardModelRunner

JOBS_FILE = "jobs.json"
Expand Down Expand Up @@ -62,7 +62,7 @@ def _setup_logging(directory: str = "logs"):
memory_csv_logger.addHandler(csv_handler)
memory_csv_logger.setLevel(logging.INFO)
# Write the CSV header to the file:
memory_csv_logger.info(MemoryStatus().csv_header())
memory_csv_logger.info(ProcessTreeStatus().csv_header())

job_runner_logger.addHandler(handler)
job_runner_logger.setLevel(logging.DEBUG)
Expand Down
23 changes: 14 additions & 9 deletions src/_ert/forward_model_runner/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from .io import assert_file_executable
from .reporting.message import (
Exited,
MemoryStatus,
ProcessTreeStatus,
Running,
Start,
)
Expand Down Expand Up @@ -179,15 +179,16 @@ def ensure_file_handles_closed():

max_memory_usage = 0
while exit_code is None:
(memory_rss, oom_score) = _get_rss_and_oom_score_for_processtree(process)
(memory_rss, cpu_seconds, oom_score) = _get_processtree_data(process)
max_memory_usage = max(memory_rss, max_memory_usage)
yield Running(
self,
MemoryStatus(
ProcessTreeStatus(
rss=memory_rss,
max_rss=max_memory_usage,
fm_step_id=self.index,
fm_step_name=self.job_data.get("name"),
cpu_seconds=cpu_seconds,
oom_score=oom_score,
),
)
Expand Down Expand Up @@ -349,9 +350,9 @@ def _check_target_file_is_written(self, target_file_mtime: int, timeout=5):
return f"Could not find target_file:{target_file}"


def _get_rss_and_oom_score_for_processtree(
def _get_processtree_data(
process: Process,
) -> Tuple[int, Optional[int]]:
) -> Tuple[int, float, Optional[int]]:
"""Obtain the oom_score (the Linux kernel uses this number to
decide which process to kill first in out-of-memory siturations).

Expand All @@ -370,14 +371,16 @@ def _get_rss_and_oom_score_for_processtree(
oom_score = None
# A value of None means that we have no information.
memory_rss = 0
cpu_seconds = 0.0
with contextlib.suppress(ValueError, FileNotFoundError):
oom_score = int(
Path(f"/proc/{process.pid}/oom_score").read_text(encoding="utf-8")
)
with contextlib.suppress(
ValueError, NoSuchProcess, AccessDenied, ZombieProcess, ProcessLookupError
):
), process.oneshot():
memory_rss = process.memory_info().rss
cpu_seconds = process.cpu_times().user

with contextlib.suppress(
NoSuchProcess, AccessDenied, ZombieProcess, ProcessLookupError
Expand All @@ -399,7 +402,9 @@ def _get_rss_and_oom_score_for_processtree(
if oom_score is not None
else oom_score_child
)
with contextlib.suppress(NoSuchProcess, AccessDenied, ZombieProcess):
with contextlib.suppress(
NoSuchProcess, AccessDenied, ZombieProcess
), child.oneshot():
memory_rss += child.memory_info().rss

return (memory_rss, oom_score)
cpu_seconds += child.cpu_times().user
return (memory_rss, cpu_seconds, oom_score)
1 change: 1 addition & 0 deletions src/_ert/forward_model_runner/reporting/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ def _job_handler(self, msg: Union[Start, Running, Exited]):
**job_msg,
max_memory_usage=msg.memory_status.max_rss,
current_memory_usage=msg.memory_status.rss,
cpu_seconds=msg.memory_status.cpu_seconds,
)
self._dump_event(event)

Expand Down
1 change: 1 addition & 0 deletions src/_ert/forward_model_runner/reporting/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def report(self, msg: Message):
job_status.update(
max_memory_usage=msg.memory_status.max_rss,
current_memory_usage=msg.memory_status.rss,
cpu_seconds=msg.memory_status.cpu_seconds,
status=_JOB_STATUS_RUNNING,
)
memory_logger.info(msg.memory_status)
Expand Down
8 changes: 5 additions & 3 deletions src/_ert/forward_model_runner/reporting/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ class ChecksumDict(_ChecksumDictBase, total=False):


@dataclasses.dataclass
class MemoryStatus:
"""Holds memory information that can be represented as a line of CSV data"""
class ProcessTreeStatus:
"""Holds processtree information that can be represented as a line of CSV data"""

timestamp: str = ""
fm_step_id: Optional[int] = None
Expand All @@ -46,6 +46,8 @@ class MemoryStatus:
max_rss: Optional[int] = None
free: Optional[int] = None

cpu_seconds: float = 0.0

oom_score: Optional[int] = None

def __post_init__(self):
Expand Down Expand Up @@ -119,7 +121,7 @@ def __init__(self, job):


class Running(Message):
def __init__(self, job: "Job", memory_status: MemoryStatus):
def __init__(self, job: "Job", memory_status: ProcessTreeStatus):
super().__init__(job)
self.memory_status = memory_status

Expand Down
33 changes: 32 additions & 1 deletion src/ert/ensemble_evaluator/evaluator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import datetime
import logging
import traceback
from contextlib import asynccontextmanager, contextmanager
Expand Down Expand Up @@ -47,6 +48,7 @@
)
from ert.ensemble_evaluator import identifiers as ids

from ._ensemble import FMStepSnapshot
from ._ensemble import LegacyEnsemble as Ensemble
from .config import EvaluatorServerConfig
from .snapshot import EnsembleSnapshot
Expand Down Expand Up @@ -161,12 +163,20 @@ async def _stopped_handler(self, events: Sequence[EnsembleSucceeded]) -> None:
return

max_memory_usage = -1
for fm_step in self.ensemble.snapshot.get_all_fm_steps().values():
for (real_id, _), fm_step in self.ensemble.snapshot.get_all_fm_steps().items():
# Infer max memory usage
memory_usage = fm_step.get(ids.MAX_MEMORY_USAGE) or "-1"
max_memory_usage = max(int(memory_usage), max_memory_usage)

if cpu_message := detect_overspent_cpu(
self.ensemble.reals[int(real_id)].num_cpu, real_id, fm_step
):
logger.warning(cpu_message)

logger.info(
f"Ensemble ran with maximum memory usage for a single realization job: {max_memory_usage}"
)

await self._append_message(self.ensemble.update_snapshot(events))

async def _cancelled_handler(self, events: Sequence[EnsembleCancelled]) -> None:
Expand Down Expand Up @@ -425,3 +435,24 @@ async def run_and_get_successful_realizations(self) -> List[int]:
def _get_ens_id(source: str) -> str:
# the ens_id will be found at /ert/ensemble/ens_id/...
return source.split("/")[3]


def detect_overspent_cpu(num_cpu: int, real_id: str, fm_step: FMStepSnapshot) -> str:
"""Produces a message warning about misconfiguration of NUM_CPU if
so is detected. Returns an empty string if everything is ok."""
now = datetime.datetime.now()
duration = (
(fm_step.get(ids.END_TIME) or now) - (fm_step.get(ids.START_TIME) or now)
).total_seconds()
if duration <= 0:
return ""
cpu_seconds = fm_step.get(ids.CPU_SECONDS) or 0.0
parallelization_obtained = cpu_seconds / duration
if parallelization_obtained > num_cpu:
return (
f"Misconfigured NUM_CPU, forward model step '{fm_step.get(ids.NAME)}' for "
f"realization {real_id} spent {cpu_seconds} cpu seconds "
f"with wall clock duration {duration:.1f} seconds, "
f"a factor of {parallelization_obtained:.2f}, while NUM_CPU was {num_cpu}."
)
return ""
1 change: 1 addition & 0 deletions src/ert/ensemble_evaluator/identifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
ERROR: Final = "error"
INDEX: Final = "index"
MAX_MEMORY_USAGE: Final = "max_memory_usage"
CPU_SECONDS: Final = "cpu_seconds"
NAME: Final = "name"
START_TIME: Final = "start_time"
STATUS: Final = "status"
Expand Down
2 changes: 2 additions & 0 deletions src/ert/ensemble_evaluator/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ def update_from_event(
if type(event) is ForwardModelStepRunning:
fm["current_memory_usage"] = event.current_memory_usage
fm["max_memory_usage"] = event.max_memory_usage
fm["cpu_seconds"] = event.cpu_seconds
if type(event) is ForwardModelStepStart:
fm["stdout"] = event.std_out
fm["stderr"] = event.std_err
Expand Down Expand Up @@ -384,6 +385,7 @@ class FMStepSnapshot(TypedDict, total=False):
index: Optional[str]
current_memory_usage: Optional[int]
max_memory_usage: Optional[int]
cpu_seconds: Optional[float]
name: Optional[str]
error: Optional[str]
stdout: Optional[str]
Expand Down
Loading