Skip to content

Commit

Permalink
Ensure that platform from group selection checks broadcast manager
Browse files Browse the repository at this point in the history
for updates to task config.
Ensure that 255 callback gives a sensible platform in warning messages.
  • Loading branch information
wxtim committed Aug 29, 2024
1 parent 347921f commit a1e9628
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 4 deletions.
1 change: 1 addition & 0 deletions changes.d/6330.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug where broadcasting failed to change platform selected after host selection failure.
4 changes: 2 additions & 2 deletions cylc/flow/subprocpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,12 +557,12 @@ def _run_callback(callback, args_=None):
# Backup, get a platform name from the config:
for arg in callback_args:
if isinstance(arg, TaskProxy):
platform_name = arg.tdef.rtconfig['platform']
platform_name = arg.platform['name']

Check warning on line 560 in cylc/flow/subprocpool.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/subprocpool.py#L560

Added line #L560 was not covered by tests
elif (
isinstance(arg, list)
and isinstance(arg[0], TaskProxy)
):
platform_name = arg[0].tdef.rtconfig['platform']
platform_name = arg[0].platform['name']

if cls.ssh_255_fail(ctx) or cls.rsync_255_fail(ctx, platform) is True:
# Job log retrieval passes a special object as a command key
Expand Down
6 changes: 4 additions & 2 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,10 +306,12 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,

# Get another platform, if task config platform is a group
use_next_platform_in_group = False
if itask.tdef.rtconfig['platform']:
bc_mgr = self.task_events_mgr.broadcast_mgr
rtconf = bc_mgr.get_updated_rtconfig(itask)
if rtconf['platform']:
try:
platform = get_platform(
itask.tdef.rtconfig['platform'],
rtconf['platform'],
bad_hosts=self.bad_hosts
)
except PlatformLookupError:
Expand Down
49 changes: 49 additions & 0 deletions tests/integration/test_task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,52 @@ async def test__prep_submit_task_job_impl_handles_execution_time_limit(
schd.task_job_mgr._prep_submit_task_job(
schd.workflow, task_a)
assert not task_a.summary.get('execution_time_limit', '')


async def test_broadcast_platform_change(
mock_glbl_cfg,
flow,
scheduler,
start,
log_filter,
):
"""Broadcast can change task platform.
Even after host selection failure.
see https://github.com/cylc/cylc-flow/issues/6320
"""
mock_glbl_cfg(
'cylc.flow.platforms.glbl_cfg',
'''
[platforms]
[[foo]]
hosts = food
''')

id_ = flow({
"scheduling": {"graph": {"R1": "mytask"}},
# Platform = None doesn't cause this issue!
"runtime": {"mytask": {"platform": "localhost"}}})

schd = scheduler(id_, run_mode='live')

async with start(schd) as log:
# Change the task platform with broadcast:
schd.broadcast_mgr.put_broadcast(
['1'], ['mytask'], [{'platform': 'foo'}])

# Simulate prior failure to contact hosts:
schd.task_job_mgr.task_remote_mgr.bad_hosts = {'food'}

# Attempt job submission:
schd.task_job_mgr.submit_task_jobs(
schd.workflow,
schd.pool.get_tasks(),
schd.server.curve_auth,
schd.server.client_pub_key_dir)

# Check that task platform hasn't become "localhost":
assert schd.pool.get_tasks()[0].platform['name'] == 'foo'
# ... and that remote init failed because all hosts bad:
assert log_filter(log, contains="(no hosts were reachable)")

0 comments on commit a1e9628

Please sign in to comment.