Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix failure to clean on remote host with leftover contact file #4978

Merged
merged 4 commits into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test_functional.yml
Original file line number Diff line number Diff line change
Expand Up @@ -282,4 +282,4 @@ jobs:
with:
name: '${{ github.workflow }} ${{ matrix.name }} ${{ matrix.chunk }}'
flags: functional-tests
fail_ci_if_error: true
fail_ci_if_error: false
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ now fails if
[owner setting](https://cylc.github.io/cylc-doc/latest/html/reference/config/workflow.html#flow.cylc[runtime][%3Cnamespace%3E][remote]owner)
is used, as that setting no longer has any effect.

[#4978](https://github.com/cylc/cylc-flow/pull/4978) - `cylc clean`: fix
occasional failure to clean on remote hosts due to leftover contact file.

[#4889](https://github.com/cylc/cylc-flow/pull/4889) - `cylc clean`: don't
prompt if no matching workflows.

Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/main_loop/health_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,5 @@ def _check_contact_file(scheduler):
except (AssertionError, IOError, ValueError, ServiceFileError):
raise CylcError(
'%s: contact file corrupted/modified and may be left'
% workflow_files.get_contact_file(scheduler.workflow)
% workflow_files.get_contact_file_path(scheduler.workflow)
)
2 changes: 1 addition & 1 deletion cylc/flow/network/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def dir_is_flow(listing: Iterable[Path]) -> Optional[bool]:
if path.name == WorkflowFiles.LOG_DIR:
if (
(path / 'suite' / 'log').exists()
and not (path / 'workflow').exists()
and not (path / 'scheduler').exists()
Copy link
Member Author

@MetRonnie MetRonnie Jul 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated change but spotted this leftover today

):
# ... already run by Cylc 7 (and not re-run by Cylc 8 after
# removing the DB)
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1782,7 +1782,7 @@ async def _shutdown(self, reason: Exception) -> None:
# from running event handlers), because the existence of the file is
# used to determine if the workflow is running
if self.contact_data:
fname = workflow_files.get_contact_file(self.workflow)
fname = workflow_files.get_contact_file_path(self.workflow)
try:
os.unlink(fname)
except OSError as exc:
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/task_remote_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
KeyOwner,
KeyType,
WorkflowFiles,
get_contact_file,
get_contact_file_path,
get_workflow_srv_dir,
)

Expand Down Expand Up @@ -596,7 +596,7 @@ def _remote_init_items(self, comms_meth: CommsMeth):
if comms_meth in [CommsMeth.SSH, CommsMeth.ZMQ]:
# Contact file
items.append((
get_contact_file(self.workflow),
get_contact_file_path(self.workflow),
os.path.join(
WorkflowFiles.Service.DIRNAME,
WorkflowFiles.Service.CONTACT)))
Expand Down
36 changes: 16 additions & 20 deletions cylc/flow/workflow_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ def detect_old_contact_file(reg: str, contact_data=None) -> None:
# NOTE: can raise CylcError
process_is_running = _is_process_running(old_host, old_pid, old_cmd)

fname = get_contact_file(reg)
fname = get_contact_file_path(reg)
if process_is_running:
# ... the process is running, raise an exception
raise ServiceFileError(
Expand Down Expand Up @@ -541,7 +541,7 @@ def dump_contact_file(reg, data):
# The double fsync logic ensures that if the contact file is written to
# a shared file system e.g. via NFS, it will be immediately visible
# from by a process on other hosts after the current process returns.
with open(get_contact_file(reg), "wb") as handle:
with open(get_contact_file_path(reg), "wb") as handle:
for key, value in sorted(data.items()):
handle.write(("%s=%s\n" % (key, value)).encode())
os.fsync(handle.fileno())
Expand All @@ -550,7 +550,7 @@ def dump_contact_file(reg, data):
os.close(dir_fileno)


def get_contact_file(reg):
def get_contact_file_path(reg: str) -> str:
"""Return name of contact file."""
return os.path.join(
get_workflow_srv_dir(reg), WorkflowFiles.Service.CONTACT)
Expand Down Expand Up @@ -607,10 +607,10 @@ def get_workflow_srv_dir(reg):

def load_contact_file(reg: str) -> Dict[str, str]:
"""Load contact file. Return data as key=value dict."""
file_base = WorkflowFiles.Service.CONTACT
path = get_workflow_srv_dir(reg)
file_content = _load_local_item(file_base, path)
if not file_content:
try:
with open(get_contact_file_path(reg)) as f:
file_content = f.read()
except IOError:
raise ServiceFileError("Couldn't load contact file")
data: Dict[str, str] = {}
for line in file_content.splitlines():
Expand All @@ -626,10 +626,7 @@ def load_contact_file(reg: str) -> Dict[str, str]:

async def load_contact_file_async(reg, run_dir=None):
if not run_dir:
path = Path(
get_workflow_srv_dir(reg),
WorkflowFiles.Service.CONTACT
)
path = Path(get_contact_file_path(reg))
else:
path = Path(
run_dir,
Expand Down Expand Up @@ -745,6 +742,14 @@ def _clean_check(opts: 'Values', reg: str, run_dir: Path) -> None:
# Thing to clean must be a dir or broken symlink:
if not run_dir.is_dir() and not run_dir.is_symlink():
raise FileNotFoundError(f"No directory to clean at {run_dir}")
db_path = (
run_dir / WorkflowFiles.Service.DIRNAME / WorkflowFiles.Service.DB
)
if opts.local_only and not db_path.is_file():
# Will reach here if this is cylc clean re-invoked on remote host
# (workflow DB only exists on scheduler host); don't need to worry
# about contact file.
return
try:
detect_old_contact_file(reg)
except ServiceFileError as exc:
Expand Down Expand Up @@ -1147,15 +1152,6 @@ def get_workflow_title(reg):
return title


def _load_local_item(item, path):
"""Load and return content of a file (item) in path."""
try:
with open(os.path.join(path, item)) as file_:
return file_.read()
except IOError:
return None


def get_platforms_from_db(run_dir):
"""Load the set of names of platforms (that jobs ran on) from the
workflow database.
Expand Down
56 changes: 56 additions & 0 deletions tests/functional/cylc-clean/05-old-remote-contact.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!/usr/bin/env bash
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
# Test that cylc clean succesfully removes the workflow on remote host even
# when there is a leftover contact file with an unreachable host recorded in it

export REQUIRE_PLATFORM='loc:remote fs:indep comms:tcp'
. "$(dirname "$0")/test_header"
set_test_number 3

SSH_CMD="$(cylc config -d -i "[platforms][${CYLC_TEST_PLATFORM}]ssh command") ${CYLC_TEST_HOST}"

init_workflow "${TEST_NAME_BASE}" << __FLOW__
[scheduling]
[[graph]]
R1 = dilophosaurus
[runtime]
[[dilophosaurus]]
platform = ${CYLC_TEST_PLATFORM}
__FLOW__


run_ok "${TEST_NAME_BASE}-validate" cylc validate "$WORKFLOW_NAME"

workflow_run_ok "${TEST_NAME_BASE}-run" cylc play "$WORKFLOW_NAME" --no-detach

# Create a fake old contact file on the remote host
echo | $SSH_CMD "cat > \$HOME/cylc-run/${WORKFLOW_NAME}/.service/contact" << EOF
CYLC_API=5
CYLC_VERSION=8.0.0
CYLC_WORKFLOW_COMMAND=echo Hello John
CYLC_WORKFLOW_HOST=unreachable.isla_nublar.ingen
CYLC_WORKFLOW_ID=${WORKFLOW_NAME}
CYLC_WORKFLOW_PID=99999
CYLC_WORKFLOW_PORT=00000
EOF

TEST_NAME="cylc-clean"
run_ok "$TEST_NAME" cylc clean --remote "$WORKFLOW_NAME"
dump_std "$TEST_NAME"

purge
8 changes: 4 additions & 4 deletions tests/integration/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,8 @@ def cylc7_run_dir(tmp_path):
cylc8 = tmp_path / 'cylc8'
cylc8.mkdir()
(cylc8 / WorkflowFiles.SUITE_RC).touch()
Path(cylc8, WorkflowFiles.LOG_DIR, 'workflow').mkdir(parents=True)
Path(cylc8, WorkflowFiles.LOG_DIR, 'workflow', 'log').touch()
Path(cylc8, WorkflowFiles.LOG_DIR, 'scheduler').mkdir(parents=True)
Path(cylc8, WorkflowFiles.LOG_DIR, 'scheduler', 'log').touch()

# a Cylc 7 workflow installed by Cylc 8 but not run yet.
# (should appear in scan results)
Expand All @@ -449,8 +449,8 @@ def cylc7_run_dir(tmp_path):
(cylc8 / WorkflowFiles.SUITE_RC).touch()
Path(cylc8, WorkflowFiles.LOG_DIR, 'suite').mkdir(parents=True)
Path(cylc8, WorkflowFiles.LOG_DIR, 'suite', 'log').touch()
Path(cylc8, WorkflowFiles.LOG_DIR, 'workflow').mkdir(parents=True)
Path(cylc8, WorkflowFiles.LOG_DIR, 'workflow', 'log').touch()
Path(cylc8, WorkflowFiles.LOG_DIR, 'scheduler').mkdir(parents=True)
Path(cylc8, WorkflowFiles.LOG_DIR, 'scheduler', 'log').touch()

return tmp_path

Expand Down
7 changes: 3 additions & 4 deletions tests/unit/test_workflow_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,8 @@ def test_clean_check__fail(
stopped: bool,
err: Type[Exception],
err_msg: str,
monkeypatch: pytest.MonkeyPatch
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
"""Test that _clean_check() fails appropriately.

Expand All @@ -395,8 +396,6 @@ def test_clean_check__fail(
err: Expected error class.
err_msg: Message that is expected to be in the exception.
"""
run_dir = mock.Mock()

def mocked_detect_old_contact_file(*a, **k):
if not stopped:
raise ServiceFileError('Mocked error')
Expand All @@ -407,7 +406,7 @@ def mocked_detect_old_contact_file(*a, **k):
)

with pytest.raises(err) as exc:
workflow_files._clean_check(CleanOptions(), reg, run_dir)
workflow_files._clean_check(CleanOptions(), reg, tmp_path)
assert err_msg in str(exc.value)


Expand Down