Skip to content

Commit

Permalink
Remove tracker and tracker_worker
Browse files Browse the repository at this point in the history
  • Loading branch information
sondreso committed Apr 22, 2024
1 parent ea34ecc commit 4fad2f9
Show file tree
Hide file tree
Showing 27 changed files with 415 additions and 781 deletions.
20 changes: 10 additions & 10 deletions src/ert/cli/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#!/usr/bin/env python
from __future__ import annotations

import contextlib
import logging
import os
import queue
import sys
from typing import Any, TextIO

Expand All @@ -19,8 +22,9 @@
from ert.cli.workflow import execute_workflow
from ert.config import ErtConfig, QueueSystem
from ert.enkf_main import EnKFMain
from ert.ensemble_evaluator import EvaluatorServerConfig, EvaluatorTracker
from ert.ensemble_evaluator import EvaluatorServerConfig
from ert.namespace import Namespace
from ert.run_models.base_run_model import StatusEvents
from ert.storage import open_storage
from ert.storage.local_storage import local_storage_set_ert_config

Expand Down Expand Up @@ -76,11 +80,13 @@ def run_cli(args: Namespace, _: Any = None) -> None:
execute_workflow(ert, storage, args.name)
return

status_queue: queue.SimpleQueue[StatusEvents] = queue.SimpleQueue()
try:
model = create_model(
ert_config,
storage,
args,
status_queue,
)
except ValueError as e:
raise ErtCliError(e) from e
Expand All @@ -106,11 +112,6 @@ def run_cli(args: Namespace, _: Any = None) -> None:
target=model.start_simulations_thread,
args=(evaluator_server_config,),
)
thread.start()

tracker = EvaluatorTracker(
model, ee_con_info=evaluator_server_config.get_connection_info()
)

with contextlib.ExitStack() as exit_stack:
out: TextIO
Expand All @@ -121,13 +122,12 @@ def run_cli(args: Namespace, _: Any = None) -> None:
else:
out = sys.stderr
monitor = Monitor(out=out, color_always=args.color_always)

thread.start()
try:
monitor.monitor(tracker.track())
monitor.monitor(status_queue)
except (SystemExit, KeyboardInterrupt, OSError):
# _base_service.py translates CTRL-c to OSError
print("\nKilling simulations...")
tracker.request_termination()
model.cancel()

thread.join()
storage.close()
Expand Down
43 changes: 34 additions & 9 deletions src/ert/cli/model_factory.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
from queue import SimpleQueue
from typing import TYPE_CHECKING, Tuple

import numpy as np
Expand Down Expand Up @@ -38,13 +39,15 @@
import numpy.typing as npt

from ert.namespace import Namespace
from ert.run_models.base_run_model import StatusEvents
from ert.storage import Storage


def create_model(
config: ErtConfig,
storage: Storage,
args: Namespace,
status_queue: SimpleQueue[StatusEvents],
) -> BaseRunModel:
logger = logging.getLogger(__name__)
logger.info(
Expand All @@ -57,26 +60,33 @@ def create_model(
update_settings = config.analysis_config.observation_settings

if args.mode == TEST_RUN_MODE:
return _setup_single_test_run(config, storage, args)
return _setup_single_test_run(config, storage, args, status_queue)
elif args.mode == ENSEMBLE_EXPERIMENT_MODE:
return _setup_ensemble_experiment(config, storage, args)
return _setup_ensemble_experiment(config, storage, args, status_queue)
elif args.mode == EVALUATE_ENSEMBLE_MODE:
return _setup_evaluate_ensemble(config, storage, args)
return _setup_evaluate_ensemble(config, storage, args, status_queue)
elif args.mode == ENSEMBLE_SMOOTHER_MODE:
return _setup_ensemble_smoother(config, storage, args, update_settings)
return _setup_ensemble_smoother(
config, storage, args, update_settings, status_queue
)
elif args.mode == ES_MDA_MODE:
return _setup_multiple_data_assimilation(config, storage, args, update_settings)
return _setup_multiple_data_assimilation(
config, storage, args, update_settings, status_queue
)
elif args.mode == ITERATIVE_ENSEMBLE_SMOOTHER_MODE:
return _setup_iterative_ensemble_smoother(
config, storage, args, update_settings
config, storage, args, update_settings, status_queue
)

else:
raise NotImplementedError(f"Run type not supported {args.mode}")


def _setup_single_test_run(
config: ErtConfig, storage: Storage, args: Namespace
config: ErtConfig,
storage: Storage,
args: Namespace,
status_queue: SimpleQueue[StatusEvents],
) -> SingleTestRun:
return SingleTestRun(
SingleTestRunArguments(
Expand All @@ -89,11 +99,15 @@ def _setup_single_test_run(
),
config,
storage,
status_queue,
)


def _setup_ensemble_experiment(
config: ErtConfig, storage: Storage, args: Namespace
config: ErtConfig,
storage: Storage,
args: Namespace,
status_queue: SimpleQueue[StatusEvents],
) -> EnsembleExperiment:
min_realizations_count = config.analysis_config.minimum_required_realizations
active_realizations = _realizations(args, config.model_config.num_realizations)
Expand Down Expand Up @@ -124,11 +138,15 @@ def _setup_ensemble_experiment(
config,
storage,
config.queue_config,
status_queue=status_queue,
)


def _setup_evaluate_ensemble(
config: ErtConfig, storage: Storage, args: Namespace
config: ErtConfig,
storage: Storage,
args: Namespace,
status_queue: SimpleQueue[StatusEvents],
) -> EvaluateEnsemble:
min_realizations_count = config.analysis_config.minimum_required_realizations
active_realizations = _realizations(args, config.model_config.num_realizations)
Expand All @@ -154,6 +172,7 @@ def _setup_evaluate_ensemble(
config,
storage,
config.queue_config,
status_queue=status_queue,
)


Expand All @@ -162,6 +181,7 @@ def _setup_ensemble_smoother(
storage: Storage,
args: Namespace,
update_settings: UpdateSettings,
status_queue: SimpleQueue[StatusEvents],
) -> EnsembleSmoother:
return EnsembleSmoother(
ESRunArguments(
Expand All @@ -181,6 +201,7 @@ def _setup_ensemble_smoother(
config.queue_config,
es_settings=config.analysis_config.es_module,
update_settings=update_settings,
status_queue=status_queue,
)


Expand Down Expand Up @@ -208,6 +229,7 @@ def _setup_multiple_data_assimilation(
storage: Storage,
args: Namespace,
update_settings: UpdateSettings,
status_queue: SimpleQueue[StatusEvents],
) -> MultipleDataAssimilation:
restart_run, prior_ensemble = _determine_restart_info(args)

Expand All @@ -231,6 +253,7 @@ def _setup_multiple_data_assimilation(
config.queue_config,
es_settings=config.analysis_config.es_module,
update_settings=update_settings,
status_queue=status_queue,
)


Expand All @@ -239,6 +262,7 @@ def _setup_iterative_ensemble_smoother(
storage: Storage,
args: Namespace,
update_settings: UpdateSettings,
status_queue: SimpleQueue[StatusEvents],
) -> IteratedEnsembleSmoother:
return IteratedEnsembleSmoother(
SIESRunArguments(
Expand All @@ -260,6 +284,7 @@ def _setup_iterative_ensemble_smoother(
config.queue_config,
config.analysis_config.ies_module,
update_settings=update_settings,
status_queue=status_queue,
)


Expand Down
12 changes: 9 additions & 3 deletions src/ert/cli/monitor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# -*- coding: utf-8 -*-
from __future__ import annotations

import sys
from datetime import datetime, timedelta
from typing import Dict, Iterator, Optional, TextIO, Tuple, Union
from queue import SimpleQueue
from typing import Dict, Optional, TextIO, Tuple

from tqdm import tqdm

Expand All @@ -18,6 +21,7 @@
FORWARD_MODEL_STATE_FAILURE,
REAL_STATE_TO_COLOR,
)
from ert.run_models.base_run_model import StatusEvents
from ert.shared.status.utils import format_running_time

Color = Tuple[int, int, int]
Expand Down Expand Up @@ -57,13 +61,15 @@ def __init__(self, out: TextIO = sys.stdout, color_always: bool = False) -> None

# The dot adds no value without color, so remove it.
self.dot = ""
self.done = False

def monitor(
self,
events: Iterator[Union[FullSnapshotEvent, SnapshotUpdateEvent, EndEvent]],
event_queue: SimpleQueue[StatusEvents],
) -> None:
self._start_time = datetime.now()
for event in events:
while True:
event = event_queue.get()
if isinstance(event, FullSnapshotEvent):
if event.snapshot is not None:
self._snapshots[event.iteration] = event.snapshot
Expand Down
2 changes: 0 additions & 2 deletions src/ert/ensemble_evaluator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
)
from .config import EvaluatorServerConfig
from .evaluator import EnsembleEvaluator
from .evaluator_tracker import EvaluatorTracker
from .event import EndEvent, FullSnapshotEvent, SnapshotUpdateEvent
from .monitor import Monitor
from .snapshot import PartialSnapshot, Snapshot
Expand All @@ -18,7 +17,6 @@
"EnsembleBuilder",
"EnsembleEvaluator",
"EvaluatorServerConfig",
"EvaluatorTracker",
"ForwardModel",
"FullSnapshotEvent",
"Monitor",
Expand Down
8 changes: 4 additions & 4 deletions src/ert/ensemble_evaluator/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ def _run_server(self, loop: asyncio.AbstractEventLoop) -> None:
loop.run_until_complete(self.evaluator_server())
logger.debug("Server thread exiting.")

def _start_running(self) -> None:
def start_running(self) -> None:
self._ws_thread.start()
self._ensemble.evaluate(self._config)

Expand Down Expand Up @@ -419,11 +419,11 @@ def _signal_cancel(self) -> None:
logger.debug("Stopping current ensemble")
self._loop.call_soon_threadsafe(self._stop)

def run_and_get_successful_realizations(self) -> List[int]:
self._start_running()
logger.debug("Started evaluator, joining until shutdown")
def join(self) -> None:
self._ws_thread.join()
logger.debug("Evaluator is done")

def get_successful_realizations(self) -> List[int]:
return self._ensemble.get_successful_realizations()

@staticmethod
Expand Down
Loading

0 comments on commit 4fad2f9

Please sign in to comment.