Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only poll non-waiting tasks #4658

Merged
merged 5 commits into from
Feb 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ access to information on configured platforms.

### Fixes

[#4658](https://github.com/cylc/cylc-flow/pull/4658) -
Don't poll waiting tasks (which may have the submit number of a previous job).

[#4620](https://github.com/cylc/cylc-flow/pull/4620) -
Fix queue interactions with the scheduler paused and task held states.

Expand Down
7 changes: 4 additions & 3 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1211,9 +1211,10 @@ async def mutator(root, info, command=None, workflows=None,
workflows = []
if exworkflows is None:
exworkflows = []
w_args = {}
w_args['workflows'] = [Tokens(w_id) for w_id in workflows]
w_args['exworkflows'] = [Tokens(w_id) for w_id in exworkflows]
w_args = {
'workflows': [Tokens(w_id) for w_id in workflows],
'exworkflows': [Tokens(w_id) for w_id in exworkflows]
}
oliver-sanders marked this conversation as resolved.
Show resolved Hide resolved
if args.get('args', False):
args.update(args.get('args', {}))
args.pop('args')
Expand Down
1 change: 0 additions & 1 deletion cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,6 @@ def command_poll_tasks(self, items: List[str]):
return
itasks, _, bad_items = self.pool.filter_task_proxies(items)
self.task_job_mgr.poll_task_jobs(self.workflow, itasks)
# (Could filter itasks by state here if needed)
return len(bad_items)

def command_kill_tasks(self, items: List[str]):
Expand Down
10 changes: 9 additions & 1 deletion cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING,
TASK_STATUS_WAITING,
TASK_STATUSES_ACTIVE
)
from cylc.flow.wallclock import (
Expand Down Expand Up @@ -203,7 +204,14 @@ def poll_task_jobs(self, workflow, itasks, msg=None):
if msg is not None:
LOG.info(msg)
self._run_job_cmd(
self.JOBS_POLL, workflow, itasks,
self.JOBS_POLL, workflow,
[
# Don't poll waiting tasks. (This is not only pointless, it
# is dangerous because a task waiting to rerun has the
# submit number of its previous job, which can be polled).
itask for itask in itasks
if itask.state.status != TASK_STATUS_WAITING
],
self._poll_task_jobs_callback,
self._poll_task_jobs_callback_255
)
Expand Down
31 changes: 31 additions & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,3 +359,34 @@ def _submit_task_jobs(_, itasks, *args, **kwargs):
return submitted_tasks

return _disable_submission


@pytest.fixture
def capture_polling():
"""Suppress job polling and capture polled tasks.

Provides a function to run on a started Scheduler.

async with start(schd):
polled_tasks = capture_polling(schd)

or:

async with run(schd):
polled_tasks = capture_polling(schd)

"""
def _disable_polling(schd: 'Scheduler') -> 'Set[TaskProxy]':
polled_tasks: 'Set[TaskProxy]' = set()

def run_job_cmd(
_1, _2, itasks, _3, _4=None
):
nonlocal polled_tasks
polled_tasks.update(itasks)
return itasks

schd.task_job_mgr._run_job_cmd = run_job_cmd # type: ignore
return polled_tasks

return _disable_polling
62 changes: 62 additions & 0 deletions tests/integration/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@

from cylc.flow.exceptions import CylcError
from cylc.flow.scheduler import Scheduler
from cylc.flow.task_state import (
TASK_STATUS_WAITING,
TASK_STATUS_SUBMIT_FAILED,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING,
TASK_STATUS_FAILED
)


Fixture = Any

Expand Down Expand Up @@ -161,3 +169,57 @@ async def test_holding_tasks_whilst_scheduler_paused(
one.release_queued_tasks()
assert len(one.pre_prep_tasks) == 1
assert len(submitted_tasks) == 1


async def test_no_poll_waiting_tasks(
capture_polling,
flow,
one_conf,
run,
scheduler,
):
"""Waiting tasks shouldn't be polled.

If a waiting task previously it will have the submit number of its previous
job, and polling would erroneously return the state of that job.

See https://github.com/cylc/cylc-flow/issues/4658
"""
reg = flow(one_conf)
one = scheduler(reg, paused_start=True)

log: pytest.LogCaptureFixture
async with run(one) as log:

# Test assumes start up with a waiting task.
task = (one.pool.get_all_tasks())[0]
assert task.state.status == TASK_STATUS_WAITING

polled_tasks = capture_polling(one)

# Waiting tasks should not be polled.
one.command_poll_tasks(['*/*'])
assert polled_tasks == set()

# Even if they have a submit number.
task.submit_num = 1
one.command_poll_tasks(['*/*'])
assert len(polled_tasks) == 0

# But these states should be:
for state in [
TASK_STATUS_SUBMIT_FAILED,
TASK_STATUS_FAILED,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING
]:
task.state.status = state
one.command_poll_tasks(['*/*'])
assert len(polled_tasks) == 1
hjoliver marked this conversation as resolved.
Show resolved Hide resolved
polled_tasks.clear()

# Shut down with a running task.
task.state.status = TASK_STATUS_RUNNING

# For good measure, check the faked running task is reported at shutdown.
assert "Orphaned task jobs:\n* 1/one (running)" in log.messages