Skip to content

Commit

Permalink
Only poll non-waiting tasks (#4658)
Browse files Browse the repository at this point in the history
Don't poll waiting tasks.
  • Loading branch information
hjoliver authored Feb 14, 2022
1 parent 5d7cd85 commit e733725
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 2 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ access to information on configured platforms.

### Fixes

[#4658](https://github.com/cylc/cylc-flow/pull/4658) -
Don't poll waiting tasks (which may have the submit number of a previous job).

[#4620](https://github.com/cylc/cylc-flow/pull/4620) -
Fix queue interactions with the scheduler paused and task held states.

Expand Down
1 change: 0 additions & 1 deletion cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,6 @@ def command_poll_tasks(self, items: List[str]):
return
itasks, _, bad_items = self.pool.filter_task_proxies(items)
self.task_job_mgr.poll_task_jobs(self.workflow, itasks)
# (Could filter itasks by state here if needed)
return len(bad_items)

def command_kill_tasks(self, items: List[str]):
Expand Down
10 changes: 9 additions & 1 deletion cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING,
TASK_STATUS_WAITING,
TASK_STATUSES_ACTIVE
)
from cylc.flow.wallclock import (
Expand Down Expand Up @@ -203,7 +204,14 @@ def poll_task_jobs(self, workflow, itasks, msg=None):
if msg is not None:
LOG.info(msg)
self._run_job_cmd(
self.JOBS_POLL, workflow, itasks,
self.JOBS_POLL, workflow,
[
# Don't poll waiting tasks. (This is not only pointless, it
# is dangerous because a task waiting to rerun has the
# submit number of its previous job, which can be polled).
itask for itask in itasks
if itask.state.status != TASK_STATUS_WAITING
],
self._poll_task_jobs_callback,
self._poll_task_jobs_callback_255
)
Expand Down
31 changes: 31 additions & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,3 +359,34 @@ def _submit_task_jobs(_, itasks, *args, **kwargs):
return submitted_tasks

return _disable_submission


@pytest.fixture
def capture_polling():
"""Suppress job polling and capture polled tasks.
Provides a function to run on a started Scheduler.
async with start(schd):
polled_tasks = capture_polling(schd)
or:
async with run(schd):
polled_tasks = capture_polling(schd)
"""
def _disable_polling(schd: 'Scheduler') -> 'Set[TaskProxy]':
polled_tasks: 'Set[TaskProxy]' = set()

def run_job_cmd(
_1, _2, itasks, _3, _4=None
):
nonlocal polled_tasks
polled_tasks.update(itasks)
return itasks

schd.task_job_mgr._run_job_cmd = run_job_cmd # type: ignore
return polled_tasks

return _disable_polling
62 changes: 62 additions & 0 deletions tests/integration/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@

from cylc.flow.exceptions import CylcError
from cylc.flow.scheduler import Scheduler
from cylc.flow.task_state import (
TASK_STATUS_WAITING,
TASK_STATUS_SUBMIT_FAILED,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING,
TASK_STATUS_FAILED
)


Fixture = Any

Expand Down Expand Up @@ -161,3 +169,57 @@ async def test_holding_tasks_whilst_scheduler_paused(
one.release_queued_tasks()
assert len(one.pre_prep_tasks) == 1
assert len(submitted_tasks) == 1


async def test_no_poll_waiting_tasks(
capture_polling,
flow,
one_conf,
run,
scheduler,
):
"""Waiting tasks shouldn't be polled.
If a waiting task previously it will have the submit number of its previous
job, and polling would erroneously return the state of that job.
See https://github.com/cylc/cylc-flow/issues/4658
"""
reg = flow(one_conf)
one = scheduler(reg, paused_start=True)

log: pytest.LogCaptureFixture
async with run(one) as log:

# Test assumes start up with a waiting task.
task = (one.pool.get_all_tasks())[0]
assert task.state.status == TASK_STATUS_WAITING

polled_tasks = capture_polling(one)

# Waiting tasks should not be polled.
one.command_poll_tasks(['*/*'])
assert polled_tasks == set()

# Even if they have a submit number.
task.submit_num = 1
one.command_poll_tasks(['*/*'])
assert len(polled_tasks) == 0

# But these states should be:
for state in [
TASK_STATUS_SUBMIT_FAILED,
TASK_STATUS_FAILED,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING
]:
task.state.status = state
one.command_poll_tasks(['*/*'])
assert len(polled_tasks) == 1
polled_tasks.clear()

# Shut down with a running task.
task.state.status = TASK_STATUS_RUNNING

# For good measure, check the faked running task is reported at shutdown.
assert "Orphaned task jobs:\n* 1/one (running)" in log.messages

0 comments on commit e733725

Please sign in to comment.