Skip to content

Commit

Permalink
Merge pull request cylc#6133 from cylc/8.2.x-sync
Browse files Browse the repository at this point in the history
🤖 Merge 8.2.x-sync into master
  • Loading branch information
oliver-sanders authored Jun 13, 2024
2 parents 1a9c681 + 9ed0cc9 commit c43bf80
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 6 deletions.
1 change: 1 addition & 0 deletions changes.d/fix.6109.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed bug affecting job submission where the list of bad hosts was not always reset correctly.
9 changes: 7 additions & 2 deletions cylc/flow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
Dict,
Optional,
Sequence,
Set,
Union,
TYPE_CHECKING,
)
Expand Down Expand Up @@ -455,13 +456,17 @@ class NoPlatformsError(PlatformLookupError):
target.
place:
Where the attempt to get the platform failed.
"""
def __init__(
self, identity: str, set_type: str = 'group', place: str = ''
self,
identity: str,
hosts_consumed: Set[str],
set_type: str = 'group',
place: str = '',
):
self.identity = identity
self.type = set_type
self.hosts_consumed = hosts_consumed
if place:
self.place = f' during {place}.'
else:
Expand Down
9 changes: 7 additions & 2 deletions cylc/flow/platforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,9 +302,14 @@ def get_platform_from_group(
else:
platform_names = group['platforms']

# Return False if there are no platforms available to be selected.
# If there are no platforms available to be selected:
if not platform_names:
raise NoPlatformsError(group_name)
hosts_consumed = {
host
for platform in group['platforms']
for host in platform_from_name(platform)['hosts']}
raise NoPlatformsError(
group_name, hosts_consumed)

# Get the selection method
method = group['selection']['method']
Expand Down
12 changes: 11 additions & 1 deletion cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,13 +274,15 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,

if not prepared_tasks:
return bad_tasks

auth_itasks = {} # {platform: [itask, ...], ...}

for itask in prepared_tasks:
platform_name = itask.platform['name']
auth_itasks.setdefault(platform_name, [])
auth_itasks[platform_name].append(itask)
# Submit task jobs for each platform
# Non-prepared tasks can be considered done for now:
done_tasks = bad_tasks

for _, itasks in sorted(auth_itasks.items()):
Expand Down Expand Up @@ -1107,7 +1109,7 @@ def _prep_submit_task_job(
Returns:
* itask - preparation complete.
* None - preparation in progress.
* False - perparation failed.
* False - preparation failed.
"""
if itask.local_job_file_path:
Expand Down Expand Up @@ -1201,6 +1203,14 @@ def _prep_submit_task_job(
itask.summary['platforms_used'][itask.submit_num] = ''
# Retry delays, needed for the try_num
self._create_job_log_path(workflow, itask)
if isinstance(exc, NoPlatformsError):
# Clear all hosts from all platforms in group from
# bad_hosts:
self.bad_hosts -= exc.hosts_consumed
self._set_retry_timers(itask, rtconfig)
self._prep_submit_task_job_error(
workflow, itask, '(no platforms available)', exc)
return False
self._prep_submit_task_job_error(
workflow, itask, '(platform not defined)', exc)
return False
Expand Down
5 changes: 4 additions & 1 deletion cylc/flow/task_remote_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,10 @@ def remote_tidy(self) -> None:
else:
LOG.error(
NoPlatformsError(
install_target, 'install target', 'remote tidy'))
install_target,
set(),
'install target',
'remote tidy'))
# Wait for commands to complete for a max of 10 seconds
timeout = time() + 10.0
while queue and time() < timeout:
Expand Down
53 changes: 53 additions & 0 deletions tests/integration/test_platforms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Integration testing for platforms functionality."""


async def test_prep_submit_task_tries_multiple_platforms(
flow, scheduler, start, mock_glbl_cfg
):
"""Preparation tries multiple platforms within a group if the
task platform setting matches a group, and that after all platforms
have been tried that the hosts matching that platform group are
cleared.
See https://github.com/cylc/cylc-flow/pull/6109
"""
global_conf = '''
[platforms]
[[myplatform]]
hosts = broken
[[anotherbad]]
hosts = broken2
[platform groups]
[[mygroup]]
platforms = myplatform, anotherbad'''
mock_glbl_cfg('cylc.flow.platforms.glbl_cfg', global_conf)

wid = flow({
"scheduling": {"graph": {"R1": "foo"}},
"runtime": {"foo": {"platform": "mygroup"}}
})
schd = scheduler(wid, run_mode='live')
async with start(schd):
itask = schd.pool.get_tasks()[0]
itask.submit_num = 1
# simulate failed attempts to contact the job hosts
schd.task_job_mgr.bad_hosts = {'broken', 'broken2'}
res = schd.task_job_mgr._prep_submit_task_job(schd.workflow, itask)
assert res is False
# ensure the bad hosts have been cleared
assert not schd.task_job_mgr.bad_hosts

0 comments on commit c43bf80

Please sign in to comment.