Skip to content

Commit

Permalink
Add new integration test.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Feb 13, 2022
1 parent 95d3ce9 commit 65e3952
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 0 deletions.
31 changes: 31 additions & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,3 +359,34 @@ def _submit_task_jobs(_, itasks, *args, **kwargs):
return submitted_tasks

return _disable_submission


@pytest.fixture
def capture_polling():
"""Suppress job polling and capture polled tasks.
Provides a function to run on a started Scheduler.
async with start(schd):
polled_tasks = capture_polling(schd)
or:
async with run(schd):
polled_tasks = capture_polling(schd)
"""
def _disable_polling(schd: 'Scheduler') -> 'Set[TaskProxy]':
polled_tasks: 'Set[TaskProxy]' = set()

def run_job_cmd(
_1, _2, itasks, _3, _4=None
):
nonlocal polled_tasks
polled_tasks.update(itasks)
return itasks

schd.task_job_mgr._run_job_cmd = run_job_cmd # type: ignore
return polled_tasks

return _disable_polling
61 changes: 61 additions & 0 deletions tests/integration/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@

from cylc.flow.exceptions import CylcError
from cylc.flow.scheduler import Scheduler
from cylc.flow.task_state import (
TASK_STATUS_WAITING,
TASK_STATUS_SUBMIT_FAILED,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING,
TASK_STATUS_FAILED
)


Fixture = Any

Expand Down Expand Up @@ -161,3 +169,56 @@ async def test_holding_tasks_whilst_scheduler_paused(
one.release_queued_tasks()
assert len(one.pre_prep_tasks) == 1
assert len(submitted_tasks) == 1


async def test_no_poll_waiting_tasks(
capture_polling,
flow,
one_conf,
run,
scheduler,
):
"""Waiting tasks shouldn't be polled.
If a waiting task previously it will have the submit number of its previous
job, and polling would erroneously return the state of that job.
See https://github.com/cylc/cylc-flow/issues/4658
"""
reg = flow(one_conf)
one = scheduler(reg, paused_start=True)

log: pytest.LogCaptureFixture
async with run(one) as log:

# Test assumes start up with a waiting task.
task = (one.pool.get_all_tasks())[0]
assert task.state.status == TASK_STATUS_WAITING

polled_tasks = capture_polling(one)

# Waiting tasks should not be polled.
one.command_poll_tasks(['*/*'])
assert polled_tasks == set()

# Even if they have a submit number.
task.submit_num = 1
one.command_poll_tasks(['*/*'])
assert len(polled_tasks) == 0

# But these states should be:
for state in [
TASK_STATUS_SUBMIT_FAILED,
TASK_STATUS_FAILED,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING
]:
task.state.status = state
one.command_poll_tasks(['*/*'])
assert len(polled_tasks) == 1

# Shut down with a running task.
task.state.status = TASK_STATUS_RUNNING

# For good measure, check the faked running task is reported at shutdown.
assert "Orphaned task jobs:\n* 1/one (running)" in log.messages

0 comments on commit 65e3952

Please sign in to comment.