Skip to content

Commit

Permalink
Remove tracker and tracker_worker
Browse files Browse the repository at this point in the history
  • Loading branch information
sondreso committed Apr 9, 2024
1 parent 48a15b7 commit d45e511
Show file tree
Hide file tree
Showing 26 changed files with 395 additions and 770 deletions.
17 changes: 7 additions & 10 deletions src/ert/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import contextlib
import logging
import os
import queue
import sys
from typing import Any, TextIO

Expand All @@ -19,7 +20,7 @@
from ert.cli.workflow import execute_workflow
from ert.config import ErtConfig, QueueSystem
from ert.enkf_main import EnKFMain
from ert.ensemble_evaluator import EvaluatorServerConfig, EvaluatorTracker
from ert.ensemble_evaluator import EvaluatorServerConfig
from ert.namespace import Namespace
from ert.storage import open_storage
from ert.storage.local_storage import local_storage_set_ert_config
Expand Down Expand Up @@ -76,11 +77,13 @@ def run_cli(args: Namespace, _: Any = None) -> None:
execute_workflow(ert, storage, args.name)
return

status_queue = queue.SimpleQueue()
try:
model = create_model(
ert_config,
storage,
args,
status_queue,
)
except ValueError as e:
raise ErtCliError(e) from e
Expand All @@ -106,11 +109,6 @@ def run_cli(args: Namespace, _: Any = None) -> None:
target=model.start_simulations_thread,
args=(evaluator_server_config,),
)
thread.start()

tracker = EvaluatorTracker(
model, ee_con_info=evaluator_server_config.get_connection_info()
)

with contextlib.ExitStack() as exit_stack:
out: TextIO
Expand All @@ -121,13 +119,12 @@ def run_cli(args: Namespace, _: Any = None) -> None:
else:
out = sys.stderr
monitor = Monitor(out=out, color_always=args.color_always)

thread.start()
try:
monitor.monitor(tracker.track())
monitor.monitor(status_queue)
except (SystemExit, KeyboardInterrupt, OSError):
# _base_service.py translates CTRL-c to OSError
print("\nKilling simulations...")
tracker.request_termination()
model.cancel()

thread.join()
storage.close()
Expand Down
42 changes: 33 additions & 9 deletions src/ert/cli/model_factory.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
from queue import SimpleQueue
from typing import TYPE_CHECKING, Tuple

import numpy as np
Expand Down Expand Up @@ -45,6 +46,7 @@ def create_model(
config: ErtConfig,
storage: Storage,
args: Namespace,
status_queue: SimpleQueue,
) -> BaseRunModel:
logger = logging.getLogger(__name__)
logger.info(
Expand All @@ -57,26 +59,33 @@ def create_model(
update_settings = config.analysis_config.observation_settings

if args.mode == TEST_RUN_MODE:
return _setup_single_test_run(config, storage, args)
return _setup_single_test_run(config, storage, args, status_queue)
elif args.mode == ENSEMBLE_EXPERIMENT_MODE:
return _setup_ensemble_experiment(config, storage, args)
return _setup_ensemble_experiment(config, storage, args, status_queue)
elif args.mode == EVALUATE_ENSEMBLE_MODE:
return _setup_evaluate_ensemble(config, storage, args)
return _setup_evaluate_ensemble(config, storage, args, status_queue)
elif args.mode == ENSEMBLE_SMOOTHER_MODE:
return _setup_ensemble_smoother(config, storage, args, update_settings)
return _setup_ensemble_smoother(
config, storage, args, update_settings, status_queue
)
elif args.mode == ES_MDA_MODE:
return _setup_multiple_data_assimilation(config, storage, args, update_settings)
return _setup_multiple_data_assimilation(
config, storage, args, update_settings, status_queue
)
elif args.mode == ITERATIVE_ENSEMBLE_SMOOTHER_MODE:
return _setup_iterative_ensemble_smoother(
config, storage, args, update_settings
config, storage, args, update_settings, status_queue
)

else:
raise NotImplementedError(f"Run type not supported {args.mode}")


def _setup_single_test_run(
config: ErtConfig, storage: Storage, args: Namespace
config: ErtConfig,
storage: Storage,
args: Namespace,
status_queue: SimpleQueue,
) -> SingleTestRun:
return SingleTestRun(
SingleTestRunArguments(
Expand All @@ -89,11 +98,15 @@ def _setup_single_test_run(
),
config,
storage,
status_queue,
)


def _setup_ensemble_experiment(
config: ErtConfig, storage: Storage, args: Namespace
config: ErtConfig,
storage: Storage,
args: Namespace,
status_queue: SimpleQueue,
) -> EnsembleExperiment:
min_realizations_count = config.analysis_config.minimum_required_realizations
active_realizations = _realizations(args, config.model_config.num_realizations)
Expand Down Expand Up @@ -124,11 +137,15 @@ def _setup_ensemble_experiment(
config,
storage,
config.queue_config,
status_queue=status_queue,
)


def _setup_evaluate_ensemble(
config: ErtConfig, storage: Storage, args: Namespace
config: ErtConfig,
storage: Storage,
args: Namespace,
status_queue: SimpleQueue,
) -> EvaluateEnsemble:
min_realizations_count = config.analysis_config.minimum_required_realizations
active_realizations = _realizations(args, config.model_config.num_realizations)
Expand All @@ -154,6 +171,7 @@ def _setup_evaluate_ensemble(
config,
storage,
config.queue_config,
status_queue=status_queue,
)


Expand All @@ -162,6 +180,7 @@ def _setup_ensemble_smoother(
storage: Storage,
args: Namespace,
update_settings: UpdateSettings,
status_queue: SimpleQueue,
) -> EnsembleSmoother:
return EnsembleSmoother(
ESRunArguments(
Expand All @@ -181,6 +200,7 @@ def _setup_ensemble_smoother(
config.queue_config,
es_settings=config.analysis_config.es_module,
update_settings=update_settings,
status_queue=status_queue,
)


Expand Down Expand Up @@ -208,6 +228,7 @@ def _setup_multiple_data_assimilation(
storage: Storage,
args: Namespace,
update_settings: UpdateSettings,
status_queue: SimpleQueue,
) -> MultipleDataAssimilation:
restart_run, prior_ensemble = _determine_restart_info(args)

Expand All @@ -231,6 +252,7 @@ def _setup_multiple_data_assimilation(
config.queue_config,
es_settings=config.analysis_config.es_module,
update_settings=update_settings,
status_queue=status_queue,
)


Expand All @@ -239,6 +261,7 @@ def _setup_iterative_ensemble_smoother(
storage: Storage,
args: Namespace,
update_settings: UpdateSettings,
status_queue: SimpleQueue,
) -> IteratedEnsembleSmoother:
return IteratedEnsembleSmoother(
SIESRunArguments(
Expand All @@ -260,6 +283,7 @@ def _setup_iterative_ensemble_smoother(
config.queue_config,
config.analysis_config.ies_module,
update_settings=update_settings,
status_queue=status_queue,
)


Expand Down
9 changes: 6 additions & 3 deletions src/ert/cli/monitor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# -*- coding: utf-8 -*-
import sys
from datetime import datetime, timedelta
from typing import Dict, Iterator, Optional, TextIO, Tuple, Union
from queue import SimpleQueue
from typing import Dict, Optional, TextIO, Tuple

from tqdm import tqdm

Expand Down Expand Up @@ -57,13 +58,15 @@ def __init__(self, out: TextIO = sys.stdout, color_always: bool = False) -> None

# The dot adds no value without color, so remove it.
self.dot = ""
self.done = False

def monitor(
self,
events: Iterator[Union[FullSnapshotEvent, SnapshotUpdateEvent, EndEvent]],
event_queue: SimpleQueue,
) -> None:
self._start_time = datetime.now()
for event in events:
while True:
event = event_queue.get()
if isinstance(event, FullSnapshotEvent):
if event.snapshot is not None:
self._snapshots[event.iteration] = event.snapshot
Expand Down
2 changes: 0 additions & 2 deletions src/ert/ensemble_evaluator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
)
from .config import EvaluatorServerConfig
from .evaluator import EnsembleEvaluator
from .evaluator_tracker import EvaluatorTracker
from .event import EndEvent, FullSnapshotEvent, SnapshotUpdateEvent
from .monitor import Monitor
from .snapshot import PartialSnapshot, Snapshot
Expand All @@ -18,7 +17,6 @@
"EnsembleBuilder",
"EnsembleEvaluator",
"EvaluatorServerConfig",
"EvaluatorTracker",
"ForwardModel",
"FullSnapshotEvent",
"Monitor",
Expand Down
9 changes: 5 additions & 4 deletions src/ert/ensemble_evaluator/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,10 @@ def _run_server(self, loop: asyncio.AbstractEventLoop) -> None:
loop.run_until_complete(self.evaluator_server())
logger.debug("Server thread exiting.")

def _start_running(self) -> None:
def start_running(self) -> None:
self._ws_thread.start()
self._ensemble.evaluate(self._config)
logger.debug("Started evaluator, joining until shutdown")

def _stop(self) -> None:
if not self._done.done():
Expand All @@ -419,11 +420,11 @@ def _signal_cancel(self) -> None:
logger.debug("Stopping current ensemble")
self._loop.call_soon_threadsafe(self._stop)

def run_and_get_successful_realizations(self) -> List[int]:
self._start_running()
logger.debug("Started evaluator, joining until shutdown")
def join(self) -> None:
self._ws_thread.join()
logger.debug("Evaluator is done")

def get_successful_realizations(self) -> List[int]:
return self._ensemble.get_successful_realizations()

@staticmethod
Expand Down
Loading

0 comments on commit d45e511

Please sign in to comment.