Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experiment server #2132

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion ert_gui/simulation/run_dialog.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
)
from ert_shared.status.tracker.factory import create_tracker
from ert_shared.status.utils import format_running_time
from ert_shared.ensemble_evaluator.evaluator import EnsembleEvaluatorService
from ert_shared.feature_toggling import FeatureToggling
from qtpy.QtCore import QModelIndex, QSize, Qt, QThread, QTimer, Signal, Slot
from qtpy.QtWidgets import (
QDialog,
Expand Down Expand Up @@ -270,10 +272,16 @@ def run():

self._ticker.start(1000)

ee_config = (
EnsembleEvaluatorService.get_config()
if FeatureToggling.is_enabled("ensemble-evaluator")
else None
)

tracker = create_tracker(
self._run_model,
num_realizations=self._simulations_argments["active_realizations"].count(),
ee_config=self._simulations_argments.get("ee_config", None),
ee_config=ee_config,
)

worker = TrackerWorker(tracker)
Expand Down
7 changes: 0 additions & 7 deletions ert_gui/simulation/simulation_panel.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,13 @@
SimulationConfigPanel,
)
from ert_gui.simulation import RunDialog
from ert_shared.feature_toggling import FeatureToggling
from collections import OrderedDict
from ert_shared.ensemble_evaluator.config import EvaluatorServerConfig


class SimulationPanel(QWidget):
def __init__(self, config_file):
QWidget.__init__(self)
self._config_file = config_file
self._ee_config = None
if FeatureToggling.is_enabled("ensemble-evaluator"):
self._ee_config = EvaluatorServerConfig()

self.setObjectName("Simulation_panel")
layout = QVBoxLayout()
Expand Down Expand Up @@ -110,8 +105,6 @@ def getSimulationArguments(self):
"""@rtype: dict[str,object]"""
simulation_widget = self._simulation_widgets[self.getCurrentSimulationModel()]
args = simulation_widget.getSimulationArguments()
if self._ee_config is not None:
args.update({"ee_config": self._ee_config})
return args

def runSimulation(self):
Expand Down
11 changes: 6 additions & 5 deletions ert_shared/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from ert_shared.cli.monitor import Monitor
from ert_shared.cli.notifier import ErtCliNotifier
from ert_shared.cli.workflow import execute_workflow
from ert_shared.ensemble_evaluator.config import EvaluatorServerConfig
from ert_shared.ensemble_evaluator.evaluator import EnsembleEvaluatorService
from ert_shared.feature_toggling import FeatureToggling
from ert_shared.status.tracker.factory import create_tracker
from res.enkf import EnKFMain, ResConfig
Expand Down Expand Up @@ -63,10 +63,11 @@ def run_cli(args):
clear_global_state()
raise ErtCliError(msg)

ee_config = None
if FeatureToggling.is_enabled("ensemble-evaluator"):
ee_config = EvaluatorServerConfig(custom_port_range=args.port_range)
argument.update({"ee_config": ee_config})
ee_config = (
EnsembleEvaluatorService.get_config()
if FeatureToggling.is_enabled("ensemble-evaluator")
else None
)

thread = threading.Thread(
name="ert_cli_simulation_thread",
Expand Down
6 changes: 3 additions & 3 deletions ert_shared/ensemble_evaluator/ensemble/legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def on_timeout(callback_args):
)
timeout_queue.put_nowait(timeout_cloudevent)

dispatch_url = self._config.dispatch_uri
dispatch_url = f"{self._config.dispatch_uri}/{self._ee_id}"
cert = self._config.cert
token = self._config.token

Expand Down Expand Up @@ -86,7 +86,7 @@ def evaluate(self, config, ee_id):
def _evaluate(self):
asyncio.set_event_loop(asyncio.new_event_loop())

dispatch_url = self._config.dispatch_uri
dispatch_url = f"{self._config.dispatch_uri}/{self._ee_id}"
cert = self._config.cert
token = self._config.token
try:
Expand Down Expand Up @@ -216,7 +216,7 @@ def _cancel(self):
)
asyncio.get_event_loop().run_until_complete(
self.send_cloudevent(
self._config.dispatch_uri,
f"{self._config.dispatch_uri}/{self._ee_id}",
out_cloudevent,
token=self._config.token,
cert=self._config.cert,
Expand Down
24 changes: 19 additions & 5 deletions ert_shared/ensemble_evaluator/ensemble/prefect.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,11 @@ def evaluate(self, config: EvaluatorServerConfig, ee_id: str):
def _evaluate(self, ee_config: EvaluatorServerConfig, ee_id):
asyncio.set_event_loop(asyncio.get_event_loop())
try:
with Client(ee_config.dispatch_uri, ee_config.token, ee_config.cert) as c:
with Client(
f"{ee_config.dispatch_uri}/{self._ee_id}",
ee_config.token,
ee_config.cert,
) as c:
event = CloudEvent(
{
"type": ids.EVTYPE_ENSEMBLE_STARTED,
Expand All @@ -219,11 +223,17 @@ def _evaluate(self, ee_config: EvaluatorServerConfig, ee_id):
)
c.send(to_json(event).decode())
with prefect.context(
url=ee_config.dispatch_uri, token=ee_config.token, cert=ee_config.cert
url=f"{ee_config.dispatch_uri}/{self._ee_id}",
token=ee_config.token,
cert=ee_config.cert,
):
self.run_flow(ee_id)

with Client(ee_config.dispatch_uri, ee_config.token, ee_config.cert) as c:
with Client(
f"{ee_config.dispatch_uri}/{self._ee_id}",
ee_config.token,
ee_config.cert,
) as c:
event = CloudEvent(
{
"type": ids.EVTYPE_ENSEMBLE_STOPPED,
Expand All @@ -238,7 +248,11 @@ def _evaluate(self, ee_config: EvaluatorServerConfig, ee_id):
"An exception occurred while starting the ensemble evaluation",
exc_info=True,
)
with Client(ee_config.dispatch_uri, ee_config.token, ee_config.cert) as c:
with Client(
f"{ee_config.dispatch_uri}/{self._ee_id}",
ee_config.token,
ee_config.cert,
) as c:
event = CloudEvent(
{
"type": ids.EVTYPE_ENSEMBLE_FAILED,
Expand Down Expand Up @@ -303,7 +317,7 @@ def _cancel(self):
loop = asyncio.new_event_loop()
loop.run_until_complete(
self.send_cloudevent(
self._ee_config.dispatch_uri,
f"{ee_config.dispatch_uri}/{self._ee_id}",
event,
token=self._ee_config.token,
cert=self._ee_config.cert,
Expand Down
9 changes: 9 additions & 0 deletions ert_shared/ensemble_evaluator/entity/identifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@
EVTYPE_EE_USER_CANCEL = "com.equinor.ert.ee.user_cancel"
EVTYPE_EE_USER_DONE = "com.equinor.ert.ee.user_done"

EVTYPE_EE_EXPERIMENT_UPDATE = "com.equinor.ert.ee.experiment.update"
EVTYPE_EE_EXPERIMENT_TERMINATED = "com.equinor.ert.ee.experiment.terminated"

EVTYPE_EXPERIMENT_SUBMITTED = "com.equinor.ert.experiment.submitted"
EVTYPE_EXPERIMENT_RUNNING = "com.equinor.ert.experiment.running"
EVTYPE_EXPERIMENT_SUCCESS = "com.equinor.ert.experiment.success"
EVTYPE_EXPERIMENT_FAILURE = "com.equinor.ert.experiment.failure"


EVTYPE_ENSEMBLE_STARTED = "com.equinor.ert.ensemble.started"
EVTYPE_ENSEMBLE_STOPPED = "com.equinor.ert.ensemble.stopped"
EVTYPE_ENSEMBLE_CANCELLED = "com.equinor.ert.ensemble.cancelled"
Expand Down
4 changes: 4 additions & 0 deletions ert_shared/ensemble_evaluator/entity/tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,7 @@ def get_step_id(source):

def get_job_id(source):
return _match_token("job", source)


def get_evaluation_id(source):
return _match_token("ee", source)
Loading