Skip to content
This repository has been archived by the owner on Jul 19, 2021. It is now read-only.

Commit

Permalink
Adapt to new event model
Browse files Browse the repository at this point in the history
As a part of ert#1220, steps are now the main unit of work in the
ensemble evaluator. Queues therefor execute steps and communicate about
steps. job_dispatch executes jobs and communicate about jobs.

Co-authored-by: Dan Sava <dsav@equinor.com>
Co-authored-by: Jonas G. Drange <jond@equinor.com>
Co-authored-by: Sondre Sortland <sonso@equinor.com>
  • Loading branch information
3 people committed Apr 16, 2021
1 parent da6c7d2 commit 81bfcb2
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 131 deletions.
58 changes: 4 additions & 54 deletions python/job_runner/reporting/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@
_FM_JOB_SUCCESS = "com.equinor.ert.forward_model_job.success"
_FM_JOB_FAILURE = "com.equinor.ert.forward_model_job.failure"

_FM_STEP_START = "com.equinor.ert.forward_model_step.start"
_FM_STEP_FAILURE = "com.equinor.ert.forward_model_step.failure"
_FM_STEP_SUCCESS = "com.equinor.ert.forward_model_step.success"


class TransitionError(ValueError):
pass
Expand All @@ -30,7 +26,7 @@ def __init__(self, evaluator_url):

self._ee_id = None
self._real_id = None
self._stage_id = None
self._step_id = None

self._initialize_state_machine()

Expand All @@ -41,7 +37,7 @@ def _initialize_state_machine(self):
self._states = {
initialized: self._init_handler,
jobs: self._job_handler,
finished: self._end_handler,
finished: lambda _: _,
}
self._transitions = {
None: initialized,
Expand Down Expand Up @@ -71,33 +67,12 @@ def _dump_event(self, event):
client.send(to_json(event).decode())

def _step_path(self):
return f"/ert/ee/{self._ee_id}/real/{self._real_id}/stage/{self._stage_id}/step/{0}"
return f"/ert/ee/{self._ee_id}/real/{self._real_id}/step/{self._step_id}"

def _init_handler(self, msg):
self._ee_id = msg.ee_id
self._real_id = msg.real_id
self._stage_id = msg.stage_id

jobs = {}
for job in msg.jobs:
jobs[job.index] = job.job_data.copy()
if job.job_data.get("stderr"):
jobs[job.index]["stderr"] = str(Path(job.job_data["stderr"]).resolve())
if job.job_data.get("stdout"):
jobs[job.index]["stdout"] = str(Path(job.job_data["stdout"]).resolve())

self._dump_event(
CloudEvent(
{
"type": _FM_STEP_START,
"source": self._step_path(),
"datacontenttype": "application/json",
},
{
"jobs": jobs,
},
)
)
self._step_id = msg.step_id

def _job_handler(self, msg):
job_path = f"{self._step_path()}/job/{msg.job.index}"
Expand Down Expand Up @@ -166,28 +141,3 @@ def _job_handler(self, msg):
},
)
)

def _end_handler(self, msg):
step_path = self._step_path()
if msg.success():
self._dump_event(
CloudEvent(
{
"type": _FM_STEP_SUCCESS,
"source": step_path,
}
)
)
else:
self._dump_event(
CloudEvent(
{
"type": _FM_STEP_FAILURE,
"source": step_path,
"datacontenttype": "application/json",
},
{
"error_msg": msg.error_message,
},
)
)
4 changes: 2 additions & 2 deletions python/job_runner/reporting/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ def success(self):


class Init(Message):
def __init__(self, jobs, run_id, ert_pid, ee_id=None, real_id=None, stage_id=None):
def __init__(self, jobs, run_id, ert_pid, ee_id=None, real_id=None, step_id=None):
super(Init, self).__init__()
self.jobs = jobs
self.run_id = run_id
self.ert_pid = ert_pid
self.ee_id = ee_id
self.real_id = real_id
self.stage_id = stage_id
self.step_id = step_id


class Finish(Message):
Expand Down
4 changes: 2 additions & 2 deletions python/job_runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def __init__(self, jobs_data):
self.simulation_id = jobs_data.get("run_id")
self.ee_id = jobs_data.get("ee_id")
self.real_id = jobs_data.get("real_id")
self.stage_id = jobs_data.get("stage_id")
self.step_id = jobs_data.get("step_id")
self.ert_pid = jobs_data.get("ert_pid")
self.global_environment = jobs_data.get("global_environment")
self.global_update_path = jobs_data.get("global_update_path")
Expand Down Expand Up @@ -46,7 +46,7 @@ def run(self, names_of_jobs_to_run):
self.ert_pid,
self.ee_id,
self.real_id,
self.stage_id,
self.step_id,
)

unused = set(names_of_jobs_to_run) - set([j.name() for j in job_queue])
Expand Down
50 changes: 26 additions & 24 deletions python/res/job_queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,30 +36,32 @@

LONG_RUNNING_FACTOR = 1.25

_FM_STAGE_WAITING = "com.equinor.ert.forward_model_stage.waiting"
_FM_STAGE_PENDING = "com.equinor.ert.forward_model_stage.pending"
_FM_STAGE_RUNNING = "com.equinor.ert.forward_model_stage.running"
_FM_STAGE_FAILURE = "com.equinor.ert.forward_model_stage.failure"
_FM_STAGE_SUCCESS = "com.equinor.ert.forward_model_stage.success"
_FM_STAGE_UNKNOWN = "com.equinor.ert.forward_model_stage.unknown"

_FM_STEP_FAILURE = "com.equinor.ert.forward_model_step.failure"
_FM_STEP_PENDING = "com.equinor.ert.forward_model_step.pending"
_FM_STEP_RUNNING = "com.equinor.ert.forward_model_step.running"
_FM_STEP_SUCCESS = "com.equinor.ert.forward_model_step.success"
_FM_STEP_UNKNOWN = "com.equinor.ert.forward_model_step.unknown"
_FM_STEP_WAITING = "com.equinor.ert.forward_model_step.waiting"


_queue_state_to_event_type_map = {
"JOB_QUEUE_NOT_ACTIVE": _FM_STAGE_WAITING,
"JOB_QUEUE_WAITING": _FM_STAGE_WAITING,
"JOB_QUEUE_SUBMITTED": _FM_STAGE_WAITING,
"JOB_QUEUE_PENDING": _FM_STAGE_PENDING,
"JOB_QUEUE_RUNNING": _FM_STAGE_RUNNING,
"JOB_QUEUE_DONE": _FM_STAGE_RUNNING,
"JOB_QUEUE_EXIT": _FM_STAGE_RUNNING,
"JOB_QUEUE_IS_KILLED": _FM_STAGE_FAILURE,
"JOB_QUEUE_DO_KILL": _FM_STAGE_FAILURE,
"JOB_QUEUE_SUCCESS": _FM_STAGE_SUCCESS,
"JOB_QUEUE_RUNNING_DONE_CALLBACK": _FM_STAGE_RUNNING,
"JOB_QUEUE_RUNNING_EXIT_CALLBACK": _FM_STAGE_RUNNING,
"JOB_QUEUE_STATUS_FAILURE": _FM_STAGE_UNKNOWN,
"JOB_QUEUE_FAILED": _FM_STAGE_FAILURE,
"JOB_QUEUE_DO_KILL_NODE_FAILURE": _FM_STAGE_FAILURE,
"JOB_QUEUE_UNKNOWN": _FM_STAGE_UNKNOWN,
"JOB_QUEUE_NOT_ACTIVE": _FM_STEP_WAITING,
"JOB_QUEUE_WAITING": _FM_STEP_WAITING,
"JOB_QUEUE_SUBMITTED": _FM_STEP_WAITING,
"JOB_QUEUE_PENDING": _FM_STEP_PENDING,
"JOB_QUEUE_RUNNING": _FM_STEP_RUNNING,
"JOB_QUEUE_DONE": _FM_STEP_RUNNING,
"JOB_QUEUE_EXIT": _FM_STEP_RUNNING,
"JOB_QUEUE_IS_KILLED": _FM_STEP_FAILURE,
"JOB_QUEUE_DO_KILL": _FM_STEP_FAILURE,
"JOB_QUEUE_SUCCESS": _FM_STEP_SUCCESS,
"JOB_QUEUE_RUNNING_DONE_CALLBACK": _FM_STEP_RUNNING,
"JOB_QUEUE_RUNNING_EXIT_CALLBACK": _FM_STEP_RUNNING,
"JOB_QUEUE_STATUS_FAILURE": _FM_STEP_UNKNOWN,
"JOB_QUEUE_FAILED": _FM_STEP_FAILURE,
"JOB_QUEUE_DO_KILL_NODE_FAILURE": _FM_STEP_FAILURE,
"JOB_QUEUE_UNKNOWN": _FM_STEP_UNKNOWN,
}


Expand Down Expand Up @@ -413,7 +415,7 @@ def _translate_change_to_cloudevent(real_id, status):
return CloudEvent(
{
"type": _queue_state_event_type(status),
"source": f"/ert/ee/{0}/real/{real_id}/stage/{0}",
"source": f"/ert/ee/{0}/real/{real_id}/step/{0}",
"datacontenttype": "application/json",
},
{
Expand Down Expand Up @@ -573,7 +575,7 @@ def add_ensemble_evaluator_information_to_jobs_file(self, ee_id, dispatch_url):
data = json.load(jobs_file)
data["ee_id"] = ee_id
data["real_id"] = self._qindex_to_iens[q_index]
data["stage_id"] = 0
data["step_id"] = 0
data["dispatch_url"] = dispatch_url
jobs_file.seek(0)
jobs_file.truncate()
Expand Down
70 changes: 21 additions & 49 deletions tests/job_runner/test_event_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,51 +10,28 @@
_FM_JOB_RUNNING,
_FM_JOB_START,
_FM_JOB_SUCCESS,
_FM_STEP_FAILURE,
_FM_STEP_START,
_FM_STEP_SUCCESS,
)
from job_runner.reporting.message import Exited, Finish, Init, Running, Start
import json

from tests.utils import _mock_ws_thread


def test_report_with_init_message_argument(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0))

assert len(lines) == 1
event = json.loads(lines[0])
job = event.get("data", {}).get("jobs", {}).get("0", {})
assert job
assert job["name"] == "job1"
assert job["stdout"].startswith("/") and job["stdout"].endswith("stdout")
assert job["stderr"].startswith("/") and job["stderr"].endswith("stderr")
assert event["type"] == _FM_STEP_START
assert event["source"] == "/ert/ee/ee_id/real/0/stage/0/step/0"


def test_report_with_successful_start_message_argument(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0))
reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, step_id=0))
msg = Start(job1)
reporter.report(msg)

assert len(lines) == 2
event = json.loads(lines[1])
assert len(lines) == 1
event = json.loads(lines[0])
assert event["type"] == _FM_JOB_START
assert event["source"] == "/ert/ee/ee_id/real/0/stage/0/step/0/job/0"
assert event["source"] == "/ert/ee/ee_id/real/0/step/0/job/0"


def test_report_with_failed_start_message_argument(unused_tcp_port):
Expand All @@ -66,14 +43,14 @@ def test_report_with_failed_start_message_argument(unused_tcp_port):

lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0))
reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, step_id=0))

msg = Start(job1).with_error("massive_failure")

reporter.report(msg)

assert len(lines) == 3
event = json.loads(lines[2])
assert len(lines) == 2
event = json.loads(lines[1])
assert event["type"] == _FM_JOB_FAILURE
assert event["data"]["error_msg"] == "massive_failure"

Expand All @@ -86,11 +63,11 @@ def test_report_with_successful_exit_message_argument(unused_tcp_port):

lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0))
reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, step_id=0))
reporter.report(Exited(job1, 0))

assert len(lines) == 2
event = json.loads(lines[1])
assert len(lines) == 1
event = json.loads(lines[0])
assert event["type"] == _FM_JOB_SUCCESS


Expand All @@ -102,11 +79,11 @@ def test_report_with_failed_exit_message_argument(unused_tcp_port):

lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0))
reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, step_id=0))
reporter.report(Exited(job1, 1).with_error("massive_failure"))

assert len(lines) == 2
event = json.loads(lines[1])
assert len(lines) == 1
event = json.loads(lines[0])
assert event["type"] == _FM_JOB_FAILURE
assert event["data"]["error_msg"] == "massive_failure"

Expand All @@ -119,31 +96,29 @@ def test_report_with_running_message_argument(unused_tcp_port):

lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0))
reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, step_id=0))
reporter.report(Running(job1, 100, 10))

assert len(lines) == 2
event = json.loads(lines[1])
assert len(lines) == 1
event = json.loads(lines[0])
assert event["type"] == _FM_JOB_RUNNING
assert event["data"]["max_memory_usage"] == 100
assert event["data"]["current_memory_usage"] == 10


def test_report_with_successful_finish_message_argument(unused_tcp_port):
def test_report_only_job_running_for_successful_run(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)

lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0))
reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, step_id=0))
reporter.report(Running(job1, 100, 10))
reporter.report(Finish())

assert len(lines) == 3
event = json.loads(lines[2])
assert event["type"] == _FM_STEP_SUCCESS
assert len(lines) == 1


def test_report_with_failed_finish_message_argument(unused_tcp_port):
Expand All @@ -154,11 +129,8 @@ def test_report_with_failed_finish_message_argument(unused_tcp_port):

lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0))
reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, step_id=0))
reporter.report(Running(job1, 100, 10))
reporter.report(Finish().with_error("massive_failure"))

assert len(lines) == 3
event = json.loads(lines[2])
assert event["type"] == _FM_STEP_FAILURE
assert event["data"]["error_msg"] == "massive_failure"
assert len(lines) == 1

0 comments on commit 81bfcb2

Please sign in to comment.