Skip to content

Commit

Permalink
Remove tracker and tracker_worker
Browse files Browse the repository at this point in the history
  • Loading branch information
sondreso committed Jan 30, 2024
1 parent 6b784ef commit 8cf334e
Show file tree
Hide file tree
Showing 27 changed files with 390 additions and 479 deletions.
17 changes: 7 additions & 10 deletions src/ert/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import contextlib
import logging
import os
import queue
import sys
import threading
from typing import Any, TextIO
Expand All @@ -19,7 +20,7 @@
from ert.cli.workflow import execute_workflow
from ert.config import ErtConfig, QueueSystem
from ert.enkf_main import EnKFMain
from ert.ensemble_evaluator import EvaluatorServerConfig, EvaluatorTracker
from ert.ensemble_evaluator import EvaluatorServerConfig
from ert.namespace import Namespace
from ert.shared.feature_toggling import FeatureToggling
from ert.storage import open_storage
Expand Down Expand Up @@ -82,11 +83,13 @@ def run_cli(args: Namespace, _: Any = None) -> None:
execute_workflow(ert, storage, args.name)
return

status_queue = queue.SimpleQueue()
try:
model = create_model(
ert_config,
storage,
args,
status_queue,
)
except ValueError as e:
raise ErtCliError(e) from e
Expand All @@ -112,11 +115,6 @@ def run_cli(args: Namespace, _: Any = None) -> None:
target=model.start_simulations_thread,
args=(evaluator_server_config,),
)
thread.start()

tracker = EvaluatorTracker(
model, ee_con_info=evaluator_server_config.get_connection_info()
)

with contextlib.ExitStack() as exit_stack:
out: TextIO
Expand All @@ -127,13 +125,12 @@ def run_cli(args: Namespace, _: Any = None) -> None:
else:
out = sys.stderr
monitor = Monitor(out=out, color_always=args.color_always)

thread.start()
try:
monitor.monitor(tracker.track())
monitor.monitor(status_queue)
except (SystemExit, KeyboardInterrupt, OSError):
# _base_service.py translates CTRL-c to OSError
print("\nKilling simulations...")
tracker.request_termination()
model.cancel()

thread.join()
storage.close()
Expand Down
34 changes: 27 additions & 7 deletions src/ert/cli/model_factory.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
from queue import SimpleQueue
from typing import TYPE_CHECKING

import numpy as np
Expand Down Expand Up @@ -53,6 +54,7 @@ def create_model(
config: ErtConfig,
storage: StorageAccessor,
args: Namespace,
status_queue: SimpleQueue,
) -> BaseRunModel:
logger = logging.getLogger(__name__)
logger.info(
Expand All @@ -73,24 +75,31 @@ def create_model(
)

if args.mode == TEST_RUN_MODE:
return _setup_single_test_run(config, storage, args)
return _setup_single_test_run(config, storage, args, status_queue)
elif args.mode == ENSEMBLE_EXPERIMENT_MODE:
return _setup_ensemble_experiment(config, storage, args)
return _setup_ensemble_experiment(config, storage, args, status_queue)
elif args.mode == ENSEMBLE_SMOOTHER_MODE:
return _setup_ensemble_smoother(config, storage, args, update_settings)
return _setup_ensemble_smoother(
config, storage, args, update_settings, status_queue
)
elif args.mode == ES_MDA_MODE:
return _setup_multiple_data_assimilation(config, storage, args, update_settings)
return _setup_multiple_data_assimilation(
config, storage, args, update_settings, status_queue
)
elif args.mode == ITERATIVE_ENSEMBLE_SMOOTHER_MODE:
return _setup_iterative_ensemble_smoother(
config, storage, args, update_settings
config, storage, args, update_settings, status_queue
)

else:
raise NotImplementedError(f"Run type not supported {args.mode}")


def _setup_single_test_run(
config: ErtConfig, storage: StorageAccessor, args: Namespace
config: ErtConfig,
storage: StorageAccessor,
args: Namespace,
status_queue: SimpleQueue,
) -> SingleTestRun:
return SingleTestRun(
SingleTestRunArguments(
Expand All @@ -102,11 +111,15 @@ def _setup_single_test_run(
),
config,
storage,
status_queue,
)


def _setup_ensemble_experiment(
config: ErtConfig, storage: StorageAccessor, args: Namespace
config: ErtConfig,
storage: StorageAccessor,
args: Namespace,
status_queue: SimpleQueue,
) -> EnsembleExperiment:
min_realizations_count = config.analysis_config.minimum_required_realizations
active_realizations = _realizations(args, config.model_config.num_realizations)
Expand All @@ -132,6 +145,7 @@ def _setup_ensemble_experiment(
config,
storage,
config.queue_config,
status_queue,
)


Expand All @@ -140,6 +154,7 @@ def _setup_ensemble_smoother(
storage: StorageAccessor,
args: Namespace,
update_settings: UpdateSettings,
status_queue: SimpleQueue,
) -> EnsembleSmoother:
return EnsembleSmoother(
ESRunArguments(
Expand All @@ -158,6 +173,7 @@ def _setup_ensemble_smoother(
config.queue_config,
es_settings=config.analysis_config.es_module,
update_settings=update_settings,
status_queue=status_queue,
)


Expand All @@ -166,6 +182,7 @@ def _setup_multiple_data_assimilation(
storage: StorageAccessor,
args: Namespace,
update_settings: UpdateSettings,
status_queue: SimpleQueue,
) -> MultipleDataAssimilation:
# Because the configuration of the CLI is different from the gui, we
# have a different way to get the restart information.
Expand Down Expand Up @@ -195,6 +212,7 @@ def _setup_multiple_data_assimilation(
prior_ensemble,
es_settings=config.analysis_config.es_module,
update_settings=update_settings,
status_queue=status_queue,
)


Expand All @@ -203,6 +221,7 @@ def _setup_iterative_ensemble_smoother(
storage: StorageAccessor,
args: Namespace,
update_settings: UpdateSettings,
status_queue: SimpleQueue,
) -> IteratedEnsembleSmoother:
return IteratedEnsembleSmoother(
SIESRunArguments(
Expand All @@ -223,6 +242,7 @@ def _setup_iterative_ensemble_smoother(
config.queue_config,
config.analysis_config.ies_module,
update_settings=update_settings,
status_queue=status_queue,
)


Expand Down
9 changes: 6 additions & 3 deletions src/ert/cli/monitor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# -*- coding: utf-8 -*-
import sys
from datetime import datetime, timedelta
from typing import Dict, Iterator, Optional, TextIO, Tuple, Union
from queue import SimpleQueue
from typing import Dict, Optional, TextIO, Tuple

from tqdm import tqdm

Expand Down Expand Up @@ -57,13 +58,15 @@ def __init__(self, out: TextIO = sys.stdout, color_always: bool = False) -> None

# The dot adds no value without color, so remove it.
self.dot = ""
self.done = False

def monitor(
self,
events: Iterator[Union[FullSnapshotEvent, SnapshotUpdateEvent, EndEvent]],
event_queue: SimpleQueue,
) -> None:
self._start_time = datetime.now()
for event in events:
while True:
event = event_queue.get()
if isinstance(event, FullSnapshotEvent):
if event.snapshot is not None:
self._snapshots[event.iteration] = event.snapshot
Expand Down
2 changes: 0 additions & 2 deletions src/ert/ensemble_evaluator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
)
from .config import EvaluatorServerConfig
from .evaluator import EnsembleEvaluator
from .evaluator_tracker import EvaluatorTracker
from .event import EndEvent, FullSnapshotEvent, SnapshotUpdateEvent
from .monitor import Monitor
from .snapshot import PartialSnapshot, Snapshot
Expand All @@ -18,7 +17,6 @@
"EnsembleBuilder",
"EnsembleEvaluator",
"EvaluatorServerConfig",
"EvaluatorTracker",
"ForwardModel",
"FullSnapshotEvent",
"Monitor",
Expand Down
9 changes: 5 additions & 4 deletions src/ert/ensemble_evaluator/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,9 +390,10 @@ def _run_server(self, loop: asyncio.AbstractEventLoop) -> None:
loop.run_until_complete(self.evaluator_server())
logger.debug("Server thread exiting.")

def _start_running(self) -> None:
def start_running(self) -> None:
self._ws_thread.start()
self._ensemble.evaluate(self._config)
logger.debug("Started evaluator, joining until shutdown")

def _stop(self) -> None:
if not self._done.done():
Expand All @@ -418,11 +419,11 @@ def _signal_cancel(self) -> None:
logger.debug("Stopping current ensemble")
self._loop.call_soon_threadsafe(self._stop)

def run_and_get_successful_realizations(self) -> List[int]:
self._start_running()
logger.debug("Started evaluator, joining until shutdown")
def join(self) -> None:
self._ws_thread.join()
logger.debug("Evaluator is done")

def get_successful_realizations(self) -> List[int]:
return self._ensemble.get_successful_realizations()

@staticmethod
Expand Down
Loading

0 comments on commit 8cf334e

Please sign in to comment.