Skip to content

Commit

Permalink
Apply suggestions from code review
Browse files Browse the repository at this point in the history
Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
  • Loading branch information
wxtim and MetRonnie committed May 28, 2024
1 parent d30ac43 commit 14ed16a
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 33 deletions.
2 changes: 1 addition & 1 deletion cylc/flow/platforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ def get_platform_from_group(
else:
platform_names = group['platforms']

# Return False if there are no platforms available to be selected.
# If there are no platforms available to be selected:
if not platform_names:
hosts_consumed = [
host
Expand Down
15 changes: 6 additions & 9 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
)
from shutil import rmtree
from time import time
from typing import TYPE_CHECKING, Any, Union, Optional
from typing import TYPE_CHECKING, Any, List, Union, Optional

from cylc.flow import LOG
from cylc.flow.job_runner_mgr import JobPollContext
Expand Down Expand Up @@ -230,7 +230,7 @@ def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True):
"""
prepared_tasks = []
bad_tasks = []
out_of_hosts_tasks: list = []
out_of_hosts_tasks: List[TaskProxy] = []
for itask in itasks:
if not itask.state(TASK_STATUS_PREPARING):
# bump the submit_num *before* resetting the state so that the
Expand All @@ -240,9 +240,8 @@ def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True):
self.data_store_mgr.delta_task_state(itask)
prep_task = self._prep_submit_task_job(
workflow, itask, check_syntax=check_syntax)
if prep_task is True:
if isinstance(prep_task, NoPlatformsError):
# This is a task whose platform has run out of hosts
# it's neither bad or good.
out_of_hosts_tasks.append(itask)
elif prep_task:
prepared_tasks.append(itask)
Expand Down Expand Up @@ -288,10 +287,8 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,
auth_itasks.setdefault(platform_name, [])
auth_itasks[platform_name].append(itask)
# Submit task jobs for each platform
done_tasks = bad_tasks

# Out of host tasks can be considered done for now:
[done_tasks.append(itask) for itask in out_of_hosts_tasks]
# Non-prepared tasks can be considered done for now:
done_tasks = [*bad_tasks, *out_of_hosts_tasks]

for _, itasks in sorted(auth_itasks.items()):
# Find the first platform where >1 host has not been tried and
Expand Down Expand Up @@ -1201,7 +1198,7 @@ def _prep_submit_task_job(
self._set_retry_timers(itask, rtconfig)
self._prep_submit_task_job_error(
workflow, itask, '(no platforms available)', exc)
return True
return exc
self._prep_submit_task_job_error(
workflow, itask, '(platform not defined)', exc)
return False
Expand Down
41 changes: 18 additions & 23 deletions tests/integration/test_platforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,20 @@
"""Integration testing for platforms functionality.
"""

from contextlib import suppress

async def test_foo(flow, scheduler, run, mock_glbl_cfg, validate, monkeypatch):
from cylc.flow.exceptions import NoPlatformsError
from cylc.flow.task_job_logs import get_task_job_activity_log


async def test_prep_submit_task_tries_multiple_platforms(
flow, scheduler, run, mock_glbl_cfg, monkeypatch, tmp_path
):
"""Preparation tries multiple platforms within a group if the
task platform setting matches a group, and that after all platforms
have been tried that the hosts matching that platform group are
cleared.
"""
global_conf = '''
[platforms]
[[myplatform]]
Expand All @@ -30,31 +42,14 @@ async def test_foo(flow, scheduler, run, mock_glbl_cfg, validate, monkeypatch):
mock_glbl_cfg('cylc.flow.platforms.glbl_cfg', global_conf)

wid = flow({
"scheduling": {
"graph": {
"R1": "foo"
}
},
"runtime": {
"root": {},
"print-config": {
"script": "cylc config"
},
"foo": {
"script": "sleep 10",
"platform": "mygroup",
"submission retry delays": '3*PT5S'
}
}
"scheduling": {"graph": {"R1": "foo"}},
"runtime": {"foo": {"platform": "mygroup"}}
})
validate(wid)
schd = scheduler(wid, paused_start=False, run_mode='live')
async with run(schd) as log:
async with run(schd):
itask = schd.pool.get_tasks()[0]

# Avoid breaking on trying to create log file path:
schd.task_job_mgr._create_job_log_path = lambda *_: None
itask.submit_num = 1
schd.task_job_mgr.bad_hosts = {'broken', 'broken2'}
res = schd.task_job_mgr._prep_submit_task_job(schd.workflow, itask)
assert res is True
assert isinstance(res, NoPlatformsError)
assert not schd.task_job_mgr.bad_hosts

0 comments on commit 14ed16a

Please sign in to comment.