-
Notifications
You must be signed in to change notification settings - Fork 110
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use queue to exchange events between scheduler and evaluator #8424
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #8424 +/- ##
=======================================
Coverage 90.60% 90.61%
=======================================
Files 342 342
Lines 20904 20849 -55
=======================================
- Hits 18941 18892 -49
+ Misses 1963 1957 -6
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
f922eac
to
f5e7c68
Compare
1482331
to
ca4ea6a
Compare
# setup message queues | ||
self._ensemble.scheduler_queue = self._events | ||
self._ensemble.manifest_queue = self._manifest_queue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be moved to the constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it can't as these queues are passed by the evaluator.
self.scheduler_queue: Any = None | ||
self.manifest_queue: Any = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need two queues, or can we have a general one and specify cloudevent types instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to two queues yes, but can update to use Queue[CloudEvent]
instead 👍
src/ert/scheduler/scheduler.py
Outdated
event = await self._events.get() | ||
await self._ee_queue.put(event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we are only moving the events from one queue to the next one. Can we pass self._ee_queue
to the job/driver instead of self._events
? Then we could get rid of the _publisher
task completely
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently it is required, since _ee_queue
is a consumer a does not need to be set.
So is the scheduler_queue for communication |
Yes, it is described in the PR 😸 |
@@ -1,6 +1,7 @@ | |||
import asyncio | |||
import logging | |||
import pickle | |||
import traceback |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
debug leftover
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
traceback is now used to log more info in case something crashed in the evaluator tasks.
@@ -311,9 +313,11 @@ async def forward_checksum(self, event: CloudEvent) -> None: | |||
}, | |||
{event["run_path"]: event.data}, | |||
) | |||
# currently clients still need to receive events via ws |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(what clients?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A good one! Currently the only client using the EVTYPE_FORWARD_MODEL_CHECKSUM
is in the test:
tests/unit_tests/ensemble_evaluator/test_scheduler.py::test_scheduler_receives_checksum_and_waits_for_disk_sync
but both me and Dan don't know how to rewrite this without the websockets since monitor is still based on ws.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the forward_model_runner is still intending to use websocket for the communication of checksum events. Then it sounds odd to use the word "currently"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, particularly, it has nothing to do with job_dispatch, but it is kept only for the sake of test. As this one only informs the test (described above) that the checksum has been received, since scheduler has now only the manifest_queue to trigger verification of checksum.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, but not pretty 🤯
@@ -417,6 +421,9 @@ async def _start_running(self) -> None: | |||
] | |||
# now we wait for the server to actually start | |||
await self._server_started.wait() | |||
# setup message queues | |||
self._ensemble.scheduler_queue = self._events |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we rename self._events
to something more precise?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we can :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: I've kept the self._events
but made Ensemble
stateless wrt. message queues. They are sent as attribute in ensemble.evaluate
function.
f"Exception in evaluator task {task.get_name()}: {task_exception}\n" | ||
f"Traceback: {exc_traceback}" | ||
) | ||
) | ||
raise task_exception |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tre traceback is not printed by this latter raise
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately no. Have been testing it (crashing tasks) and traceback needs to be there.
src/ert/scheduler/job.py
Outdated
@@ -164,7 +163,7 @@ async def run(self, sem: asyncio.BoundedSemaphore, max_submit: int = 1) -> None: | |||
break | |||
|
|||
if self.returncode.result() == 0: | |||
if self._scheduler.wait_for_checksum(): | |||
if self._scheduler._manifest_queue: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at first sight, this looks like there can be a bug introduced when the manifest queue is not yet populated, then it will skip it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure how the bug can be introduced?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I misread the code. Should we write if self._scheduler._manifest_queue is not None
to be more explicit? (I read it like we were testing whether there is anything in the queue)
@@ -92,6 +85,9 @@ def __init__( | |||
ee_token: Optional[str] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These ee_*
class variables can be removed I suppose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, but _update_jobs_json
needs to be moved to LegacyEnsemble
, which I thought could be a different PR?
src/ert/scheduler/scheduler.py
Outdated
f"{self._events.qsize()} items left unprocessed in the queue!" | ||
) | ||
if self._ee_queue: | ||
# only join queue if there is a consumer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find "join" to be a strange word for what is happening here, so in a comment I would write something about waiting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do 👍
3c306b3
to
f5ff029
Compare
@@ -417,10 +421,14 @@ async def _start_running(self) -> None: | |||
] | |||
# now we wait for the server to actually start | |||
await self._server_started.wait() | |||
# let's run | |||
|
|||
# let's run with message queues set |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that comment should be redundant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right 😄 that was more emotional comment than helpful really
src/ert/scheduler/scheduler.py
Outdated
@@ -208,79 +188,21 @@ def count_states(self) -> Dict[JobState, int]: | |||
return counts | |||
|
|||
async def _checksum_consumer(self) -> None: | |||
if not self._ee_uri: | |||
if not self._manifest_queue: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if self._manifest_queue is None
src/ert/scheduler/scheduler.py
Outdated
|
||
async def _publisher(self) -> None: | ||
if not self._ee_uri: | ||
if not self._ensemble_evaluator_queue: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check for None-ness
src/ert/scheduler/scheduler.py
Outdated
logger.error( | ||
f"{self._events.qsize()} items left unprocessed in the queue!" | ||
) | ||
if self._ensemble_evaluator_queue: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is not None
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've checked and queue, even created but empty will be fine with this if.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for readability
src/ert/scheduler/scheduler.py
Outdated
scheduling_tasks = [ | ||
asyncio.create_task(self._publisher(), name="publisher_task"), | ||
asyncio.create_task( | ||
self._process_event_queue(), name="process_event_queue_task" | ||
), | ||
asyncio.create_task(self.driver.poll(), name="poll_task"), | ||
asyncio.create_task(self._checksum_consumer(), name="consumer_task"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name="checksum_consumer_task"
It is replaced by two message queues, which are provided in LegacyEnsemble.evaluate function by the ensemble evaluator and passed over to scheduler: - ensemble_evaluator_queue: responsible for providing CloudEvent (representing realization and driver events) for evaluator - manifest_queue: responsible for providing CloudEvent (representing notification manifest checksum Event) for scheduler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yay!
Issue
Resolves #7721Resolves #8485
Approach
This PR removes websockets based communication between scheduler and ensemble evaluator. It is replaced by two message queues, which are provided in
LegacyEnsemble.evaluate
function by the ensemble evaluator and passed over to scheduler:scheduler_queue
: responsible for providingCloudEvent
(representing realization and driver events) for evaluatormanifest_queue
: responsible for providingCloudEvent
(representing notification manifest checksum Event) for scheduler(Screenshot of new behavior in GUI if applicable)
When applicable