Skip to content

Commit

Permalink
Fix host selection bug for deprecated [runtime][<task>][remote]host
Browse files Browse the repository at this point in the history
… syntax (#4570)

Fix host selection bug

Avoid selecting localhost if host specified using deprecated `[runtime][<task>][remote]host` can't be reached
  • Loading branch information
MetRonnie authored Jan 17, 2022
1 parent 8c81b09 commit 760b077
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 26 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ of workflows inside other installed workflows.
[#4540](https://github.com/cylc/cylc-flow/pull/4540) - Handle the `/` character
in job names, for PBS 19.2.1+.

[#4570](https://github.com/cylc/cylc-flow/pull/4570) - Fix incorrect fallback
to localhost if `[runtime][<task>][remote]host` is unreachable.

[#4543](https://github.com/cylc/cylc-flow/pull/4543) -
`cylc play --stopcp=reload` now takes its value from
`[scheduling]stop after cycle point` instead of using the final cycle point.
Expand Down
8 changes: 6 additions & 2 deletions cylc/flow/platforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,18 @@ def log_platform_event(

@overload
def get_platform(
task_conf: Union[str, None] = None, task_id: str = UNKNOWN_TASK
task_conf: Union[str, None] = None,
task_id: str = UNKNOWN_TASK,
bad_hosts: Optional[Set[str]] = None
) -> Dict[str, Any]:
...


@overload
def get_platform(
task_conf: Dict[str, Any], task_id: str = UNKNOWN_TASK
task_conf: Dict[str, Any],
task_id: str = UNKNOWN_TASK,
bad_hosts: Optional[Set[str]] = None
) -> Optional[Dict[str, Any]]:
...

Expand Down
48 changes: 24 additions & 24 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,16 +287,18 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,

# Get another platform, if task config platform is a group
use_next_platform_in_group = False
try:
platform = get_platform(
itask.tdef.rtconfig['platform'],
bad_hosts=self.bad_hosts
)
# If were able to select a new platform;
if platform and platform != itask.platform:
use_next_platform_in_group = True
except NoPlatformsError:
use_next_platform_in_group = False
if itask.tdef.rtconfig['platform']:
try:
platform = get_platform(
itask.tdef.rtconfig['platform'],
bad_hosts=self.bad_hosts
)
except NoPlatformsError:
pass
else:
# If were able to select a new platform;
if platform and platform != itask.platform:
use_next_platform_in_group = True

if use_next_platform_in_group:
# store the previous platform's hosts so that when
Expand All @@ -317,10 +319,8 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,
# group selected in task config are exhausted we clear
# bad_hosts for all the hosts we have
# tried for this platform or group.
self.bad_hosts = (
self.bad_hosts - set(itask.platform['hosts']))
self.bad_hosts = (
self.bad_hosts - self.bad_hosts_to_clear)
self.bad_hosts -= set(itask.platform['hosts'])
self.bad_hosts -= self.bad_hosts_to_clear
self.bad_hosts_to_clear.clear()
LOG.critical(
PlatformError(
Expand All @@ -339,7 +339,7 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,
install_target = get_install_target_from_platform(platform)
ri_map = self.task_remote_mgr.remote_init_map

if (ri_map.get(install_target) != REMOTE_FILE_INSTALL_DONE):
if ri_map.get(install_target) != REMOTE_FILE_INSTALL_DONE:
if install_target == get_localhost_install_target():
# Skip init and file install for localhost.
LOG.debug(f"REMOTE INIT NOT REQUIRED for {install_target}")
Expand All @@ -358,12 +358,12 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,
)
continue

elif (ri_map[install_target] == REMOTE_INIT_DONE):
elif ri_map[install_target] == REMOTE_INIT_DONE:
# Already done remote init so move on to file install
self.task_remote_mgr.file_install(platform)
continue

elif (ri_map[install_target] in self.IN_PROGRESS):
elif ri_map[install_target] in self.IN_PROGRESS:
# Remote init or file install in progress.
for itask in itasks:
msg = self.IN_PROGRESS[ri_map[install_target]]
Expand All @@ -372,7 +372,7 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,
msg
)
continue
elif (ri_map[install_target] == REMOTE_INIT_255):
elif ri_map[install_target] == REMOTE_INIT_255:
# Remote init previously failed becase a host was
# unreachable, so start it again.
del ri_map[install_target]
Expand Down Expand Up @@ -434,8 +434,7 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,

if ri_map[install_target] == REMOTE_FILE_INSTALL_255:
del ri_map[install_target]
self.task_remote_mgr.file_install(
platform)
self.task_remote_mgr.file_install(platform)
for itask in itasks:
self.data_store_mgr.delta_job_msg(
itask.tokens.duplicate(
Expand All @@ -445,12 +444,13 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,
)
continue

if (ri_map[install_target] in [REMOTE_INIT_FAILED,
REMOTE_FILE_INSTALL_FAILED]):
if ri_map[install_target] in {
REMOTE_INIT_FAILED, REMOTE_FILE_INSTALL_FAILED
}:
# Remote init or install failed. Set submit-failed for all
# affected tasks and remove target from remote init map
# - this enables new tasks to re-initialise that target
init_error = (ri_map[install_target])
init_error = ri_map[install_target]
del ri_map[install_target]
for itask in itasks:
itask.waiting_on_job_prep = False
Expand Down Expand Up @@ -1162,7 +1162,7 @@ def _prep_submit_task_job(self, workflow, itask, check_syntax=True):
rtconfig['remote']['host'] = host_n

try:
platform = get_platform(rtconfig, self.bad_hosts)
platform = get_platform(rtconfig, bad_hosts=self.bad_hosts)

except PlatformLookupError as exc:
# Submit number not yet incremented
Expand Down
59 changes: 59 additions & 0 deletions tests/functional/intelligent-host-selection/07-cylc7-badhost.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#!/usr/bin/env bash
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#------------------------------------------------------------------------------
# Test task does not run on localhost if Cylc 7 syntax
# [runtime][<task>][remote]host is unreachable -
# https://github.com/cylc/cylc-flow/issues/4569

. "$(dirname "$0")/test_header"
set_test_number 4

# Host name picked for unlikelihood of matching any real host
BAD_HOST="f65b965bb914"

create_test_global_config "" "
[platforms]
[[badhostplatform]]
hosts = ${BAD_HOST}
"

init_workflow "${TEST_NAME_BASE}" << __FLOW__
[scheduler]
[[events]]
stall timeout = PT0M
abort on stall timeout = True
[scheduling]
cycling mode = integer
[[graph]]
R1 = sattler
[runtime]
[[sattler]]
[[[remote]]]
host = ${BAD_HOST}
__FLOW__

run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"

workflow_run_fail "${TEST_NAME_BASE}-run" \
cylc play --no-detach "${WORKFLOW_NAME}"

grep_workflow_log_ok "${TEST_NAME_BASE}-grep-1" \
"platform: badhostplatform - initialisation did not complete (no hosts were reachable)"

grep_workflow_log_ok "${TEST_NAME_BASE}-grep-2" "CRITICAL - Workflow stalled"

purge

0 comments on commit 760b077

Please sign in to comment.