Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

platforms: fix unreachable hosts not reset on platform group failure #6109

Merged
merged 13 commits into from
Jun 12, 2024
1 change: 1 addition & 0 deletions changes.d/fix.6109.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed bug affecting job submission where the list of bad hosts was not always reset correctly.
9 changes: 8 additions & 1 deletion cylc/flow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
Callable,
Dict,
Iterable,
Set,
NoReturn,
Optional,
Tuple,
Expand Down Expand Up @@ -444,15 +445,21 @@ class NoPlatformsError(PlatformLookupError):

Args:
identity: The name of the platform group or install target
hosts_consumed: Hosts which have already been tried.
set_type: Whether the set of platforms is a platform group or an
install target
place: Where the attempt to get the platform failed.
wxtim marked this conversation as resolved.
Show resolved Hide resolved
"""
def __init__(
self, identity: str, set_type: str = 'group', place: str = ''
self,
identity: str,
hosts_consumed: Set[str],
set_type: str = 'group',
place: str = '',
):
self.identity = identity
self.type = set_type
self.hosts_consumed = hosts_consumed
if place:
self.place = f' during {place}.'
else:
Expand Down
9 changes: 7 additions & 2 deletions cylc/flow/platforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,9 +302,14 @@ def get_platform_from_group(
else:
platform_names = group['platforms']

# Return False if there are no platforms available to be selected.
# If there are no platforms available to be selected:
if not platform_names:
raise NoPlatformsError(group_name)
hosts_consumed = {
host
for platform in group['platforms']
for host in platform_from_name(platform)['hosts']}
raise NoPlatformsError(
group_name, hosts_consumed)

# Get the selection method
method = group['selection']['method']
Expand Down
13 changes: 12 additions & 1 deletion cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,18 +267,21 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,
# Prepare tasks for job submission
prepared_tasks, bad_tasks = self.prep_submit_task_jobs(
workflow, itasks)

# Reset consumed host selection results
self.task_remote_mgr.subshell_eval_reset()

if not prepared_tasks:
return bad_tasks

auth_itasks = {} # {platform: [itask, ...], ...}

for itask in prepared_tasks:
platform_name = itask.platform['name']
auth_itasks.setdefault(platform_name, [])
auth_itasks[platform_name].append(itask)
# Submit task jobs for each platform
# Non-prepared tasks can be considered done for now:
done_tasks = bad_tasks

for _, itasks in sorted(auth_itasks.items()):
Expand Down Expand Up @@ -1087,7 +1090,7 @@ def _prep_submit_task_job(
Returns:
* itask - preparation complete.
* None - preparation in progress.
* False - perparation failed.
* False - preparation failed.
wxtim marked this conversation as resolved.
Show resolved Hide resolved

"""
if itask.local_job_file_path:
Expand Down Expand Up @@ -1181,6 +1184,14 @@ def _prep_submit_task_job(
itask.summary['platforms_used'][itask.submit_num] = ''
# Retry delays, needed for the try_num
self._create_job_log_path(workflow, itask)
if isinstance(exc, NoPlatformsError):
# Clear all hosts from all platforms in group from
# bad_hosts:
self.bad_hosts -= exc.hosts_consumed
self._set_retry_timers(itask, rtconfig)
self._prep_submit_task_job_error(
workflow, itask, '(no platforms available)', exc)
return False
self._prep_submit_task_job_error(
workflow, itask, '(platform not defined)', exc)
return False
Expand Down
5 changes: 4 additions & 1 deletion cylc/flow/task_remote_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,10 @@ def remote_tidy(self) -> None:
else:
LOG.error(
NoPlatformsError(
install_target, 'install target', 'remote tidy'))
install_target,
set(),
'install target',
'remote tidy'))
# Wait for commands to complete for a max of 10 seconds
timeout = time() + 10.0
while queue and time() < timeout:
Expand Down
53 changes: 53 additions & 0 deletions tests/integration/test_platforms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Integration testing for platforms functionality."""


async def test_prep_submit_task_tries_multiple_platforms(
flow, scheduler, start, mock_glbl_cfg
):
"""Preparation tries multiple platforms within a group if the
task platform setting matches a group, and that after all platforms
have been tried that the hosts matching that platform group are
cleared.
wxtim marked this conversation as resolved.
Show resolved Hide resolved

See https://github.com/cylc/cylc-flow/pull/6109
"""
global_conf = '''
[platforms]
[[myplatform]]
hosts = broken
[[anotherbad]]
hosts = broken2
[platform groups]
[[mygroup]]
platforms = myplatform, anotherbad'''
mock_glbl_cfg('cylc.flow.platforms.glbl_cfg', global_conf)

wid = flow({
"scheduling": {"graph": {"R1": "foo"}},
"runtime": {"foo": {"platform": "mygroup"}}
})
schd = scheduler(wid, run_mode='live')
async with start(schd):
itask = schd.pool.get_tasks()[0]
itask.submit_num = 1
# simulate failed attempts to contact the job hosts
schd.task_job_mgr.bad_hosts = {'broken', 'broken2'}
wxtim marked this conversation as resolved.
Show resolved Hide resolved
res = schd.task_job_mgr._prep_submit_task_job(schd.workflow, itask)
assert res is False
# ensure the bad hosts have been cleared
assert not schd.task_job_mgr.bad_hosts
wxtim marked this conversation as resolved.
Show resolved Hide resolved
Loading