Skip to content

Commit

Permalink
Add test for ParsecError traceback
Browse files Browse the repository at this point in the history
  • Loading branch information
MetRonnie committed Feb 28, 2022
1 parent 0a1d3b7 commit 8ccce2b
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 7 deletions.
3 changes: 2 additions & 1 deletion cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,7 @@ def get_command_method(self, command_name: str) -> Callable:
"""Return a command processing method or raise AttributeError."""
return getattr(self, f'command_{command_name}')

def queue_command(self, command, kwargs):
def queue_command(self, command: str, kwargs: dict) -> None:
self.command_queue.put((
command,
tuple(kwargs.values()), {}
Expand Down Expand Up @@ -1753,6 +1753,7 @@ async def _shutdown(self, reason: Exception) -> None:
[(b'shutdown', str(reason).encode('utf-8'))]
)
self.publisher.stop()
self.publisher = None
if self.curve_auth:
self.curve_auth.stop() # stop the authentication thread

Expand Down
6 changes: 5 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,16 @@ def log_filter():
level: Filter out records if they aren't at this logging level.
contains: Filter out records if this string is not in the message.
regex: Filter out records if the message doesn't match this regex.
exact_match: Filter out records if the message does not exactly match
this string.
"""
def _log_filter(
log: pytest.LogCaptureFixture,
name: Optional[str] = None,
level: Optional[int] = None,
contains: Optional[str] = None,
regex: Optional[str] = None
regex: Optional[str] = None,
exact_match: Optional[str] = None,
) -> List[Tuple[str, int, str]]:
return [
(log_name, log_level, log_message)
Expand All @@ -100,6 +103,7 @@ def _log_filter(
and (level is None or level == log_level)
and (contains is None or contains in log_message)
and (regex is None or re.match(regex, log_message))
and (exact_match is None or exact_match == log_message)
]
return _log_filter

Expand Down
113 changes: 113 additions & 0 deletions tests/integration/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@

import asyncio
import logging
from pathlib import Path
import pytest
from typing import Any, Callable

from cylc.flow.exceptions import CylcError
from cylc.flow.parsec.exceptions import ParsecError
from cylc.flow.pathutil import get_cylc_run_dir, get_workflow_run_dir
from cylc.flow.scheduler import Scheduler, SchedulerStop
from cylc.flow.task_state import (
TASK_STATUS_WAITING,
Expand All @@ -29,10 +32,15 @@
TASK_STATUS_FAILED
)

from .utils.flow_tools import _make_flow


Fixture = Any


TRACEBACK_MSG = "Traceback (most recent call last):"


async def test_is_paused_after_stop(
one_conf: Fixture, flow: Fixture, scheduler: Fixture, run: Fixture,
db_select: Fixture):
Expand Down Expand Up @@ -237,3 +245,108 @@ async def test_no_poll_waiting_tasks(

# For good measure, check the faked running task is reported at shutdown.
assert "Orphaned task jobs:\n* 1/one (running)" in log.messages


@pytest.mark.parametrize('reload', [False, True])
@pytest.mark.parametrize(
'test_conf, expected_msg',
[
pytest.param(
{'Alan Wake': "It's not a lake, it's an ocean"},
"IllegalItemError: Alan Wake",
id="illegal item"
),
pytest.param(
{
'scheduling': {
'initial cycle point': "2k22",
'graph': {'R1': "a => b"}
}
},
("IllegalValueError: (type=cycle point) "
"[scheduling]initial cycle point = 2k22 - (Invalid cycle point)"),
id="illegal cycle point"
)
]
)
async def test_illegal_config_load(
test_conf: dict,
expected_msg: str,
reload: bool,
flow: Callable,
one_conf: dict,
start: Callable,
run: Callable,
scheduler: Callable,
log_filter: Callable
):
"""Test that ParsecErrors (illegal config) - that occur during config load
when running a workflow - are displayed without traceback.
Params:
test_conf: Dict to update one_conf with.
expected_msg: Expected log message at error level.
reload: If False, test a workflow start with invalid config.
If True, test a workflow start with valid config followed by
reload with invalid config.
"""
if not reload:
one_conf.update(test_conf)
reg: str = flow(one_conf)
schd: Scheduler = scheduler(reg)
log: pytest.LogCaptureFixture

if reload:
one_conf.update(test_conf)
run_dir = Path(get_workflow_run_dir(reg))
async with run(schd) as log:
# Shouldn't be any errors at this stage:
assert not log_filter(log, level=logging.ERROR)
# Modify flow.cylc:
_make_flow(get_cylc_run_dir(), run_dir, one_conf, '')
schd.queue_command('reload_workflow', {})
assert log_filter(
log, level=logging.WARNING, exact_match=expected_msg
)
else:
with pytest.raises(ParsecError):
async with start(schd) as log:
pass
assert log_filter(
log,
level=logging.ERROR,
exact_match=f"Workflow shutting down - {expected_msg}"
)

assert TRACEBACK_MSG not in log.text


async def test_unexpected_ParsecError(
flow: Callable,
one_conf: dict,
start: Callable,
scheduler: Callable,
log_filter: Callable,
monkeypatch: pytest.MonkeyPatch
):
"""Test that ParsecErrors - that occur at any time other than config load
when running a workflow - are displayed with traceback, because they are
not expected."""
reg: str = flow(one_conf)
schd: Scheduler = scheduler(reg)
log: pytest.LogCaptureFixture

def raise_ParsecError(*a, **k):
raise ParsecError("Mock error")

monkeypatch.setattr(schd, '_configure_contact', raise_ParsecError)

with pytest.raises(ParsecError):
async with start(schd) as log:
pass

assert log_filter(
log, level=logging.CRITICAL,
exact_match="Workflow shutting down - Mock error"
)
assert TRACEBACK_MSG in log.text
16 changes: 11 additions & 5 deletions tests/integration/utils/flow_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
"""

import asyncio
from pathlib import Path
from async_timeout import timeout
from contextlib import asynccontextmanager, contextmanager
import logging
import pytest
from typing import Any, Optional
from typing import Any, Optional, Union
from uuid import uuid1

from cylc.flow import CYLC_LOG
Expand All @@ -38,13 +39,18 @@
from .flow_writer import flow_config_str


def _make_flow(run_dir, test_dir, conf, name=None):
def _make_flow(
cylc_run_dir: Union[Path, str],
test_dir: Path,
conf: Union[dict, str],
name: Optional[str] = None
) -> str:
"""Construct a workflow on the filesystem."""
if not name:
if name is None:
name = str(uuid1())
flow_run_dir = (test_dir / name)
flow_run_dir.mkdir(parents=True)
reg = str(flow_run_dir.relative_to(run_dir))
flow_run_dir.mkdir(parents=True, exist_ok=True)
reg = str(flow_run_dir.relative_to(cylc_run_dir))
if isinstance(conf, dict):
conf = flow_config_str(conf)
with open((flow_run_dir / WorkflowFiles.FLOW_FILE), 'w+') as flow_file:
Expand Down

0 comments on commit 8ccce2b

Please sign in to comment.