Skip to content

Commit

Permalink
Merge pull request #4620 from oliver-sanders/4278
Browse files Browse the repository at this point in the history
queues: fix interactions with the scheduler paused and task held states
  • Loading branch information
hjoliver authored Feb 11, 2022
2 parents 2dfb12a + b102fa3 commit 9a10c85
Show file tree
Hide file tree
Showing 15 changed files with 372 additions and 136 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ access to information on configured platforms.

### Fixes

[#4620](https://github.com/cylc/cylc-flow/pull/4620) -
Fix queue interactions with the scheduler paused and task held states.

[#4667](https://github.com/cylc/cylc-flow/pull/4667) - Check manually triggered
tasks are not already preparing for job submission.

Expand Down
61 changes: 35 additions & 26 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ class Scheduler:
curve_auth: Optional[ThreadAuthenticator] = None
client_pub_key_dir: Optional[str] = None

# queue-released tasks still in prep
pre_submit_tasks: Optional[List[TaskProxy]] = None
# queue-released tasks awaiting job preparation
pre_prep_tasks: Optional[List[TaskProxy]] = None

# profiling
_profile_amounts: Optional[dict] = None
Expand Down Expand Up @@ -268,7 +268,7 @@ def __init__(self, reg: str, options: Values) -> None:
# mutable defaults
self._profile_amounts = {}
self._profile_update_times = {}
self.pre_submit_tasks = []
self.pre_prep_tasks = []
self.bad_hosts: Set[str] = set()

self.restored_stop_task_id = None
Expand Down Expand Up @@ -598,10 +598,9 @@ async def log_start(self):
LOG.info("Quiet mode on")
LOG.setLevel(log_level)

async def start_scheduler(self):
async def run_scheduler(self):
"""Start the scheduler main loop."""
try:
self._configure_contact()
if self.is_restart:
self.restart_remote_init()
self.run_event_handlers(self.EVENT_STARTUP, 'workflow starting')
Expand Down Expand Up @@ -641,27 +640,30 @@ async def start_scheduler(self):
finally:
self.profiler.stop()

async def run(self):
"""Run the startup sequence.
* initialise
* configure
* start_servers
* start_scheduler
async def start(self):
"""Run the startup sequence but don't set the main loop running.
Lightweight wrapper for convenience.
Lightweight wrapper for testing convenience.
"""
try:
await self.initialise()
await self.configure()
await self.start_servers()
await self.log_start()
self._configure_contact()
except (KeyboardInterrupt, asyncio.CancelledError, Exception) as exc:
await self.handle_exception(exc)
else:
# note start_scheduler handles its own shutdown logic
await self.start_scheduler()

async def run(self):
"""Run the startup sequence and set the main loop running.
Lightweight wrapper for testing convenience.
"""
await self.start()
# note run_scheduler handles its own shutdown logic
await self.run_scheduler()

def _load_pool_from_tasks(self):
"""Load task pool with specified tasks, for a new run."""
Expand Down Expand Up @@ -1252,20 +1254,27 @@ def release_queued_tasks(self):
"""
# Forget tasks that are no longer preparing for job submission.
self.pre_submit_tasks = [
itask for itask in self.pre_submit_tasks if
self.pre_prep_tasks = [
itask for itask in self.pre_prep_tasks if
itask.waiting_on_job_prep
]

# Add newly released tasks to those still preparing.
self.pre_submit_tasks += self.pool.release_queued_tasks()

if (
self.pre_submit_tasks and
not self.is_paused and
self.stop_mode is None and
self.auto_restart_time is None
not self.is_paused
and self.stop_mode is None
and self.auto_restart_time is None
):
# Add newly released tasks to those still preparing.
self.pre_prep_tasks += self.pool.release_queued_tasks(
# the number of tasks waiting to go through the task
# submission pipeline
self.pre_prep_tasks
)

if not self.pre_prep_tasks:
# No tasks to submit.
return

# Start the job submission process.
self.is_updated = True

Expand All @@ -1277,7 +1286,7 @@ def release_queued_tasks(self):
meth = LOG.info
for itask in self.task_job_mgr.submit_task_jobs(
self.workflow,
self.pre_submit_tasks,
self.pre_prep_tasks,
self.curve_auth,
self.client_pub_key_dir,
self.config.run_mode('simulation')
Expand Down
19 changes: 14 additions & 5 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,15 +708,24 @@ def queue_task(self, itask: TaskProxy) -> None:
self.data_store_mgr.delta_task_queued(itask)
self.task_queue_mgr.push_task(itask)

def release_queued_tasks(self):
def release_queued_tasks(self, pre_prep_tasks):
"""Return list of queue-released tasks for job prep."""
released = self.task_queue_mgr.release_tasks(
Counter(
[
t.tdef.name for t in self.get_tasks()
if t.state(TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING)
# active tasks
t.tdef.name
for t in self.get_tasks()
if t.state(
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING
)
] + [
# tasks await job preparation which have not yet
# entered the preparing state
itask.tdef.name
for itask in pre_prep_tasks
]
)
)
Expand Down
12 changes: 9 additions & 3 deletions cylc/flow/task_queues/independent.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def release(self, active: Counter[str]) -> List[TaskProxy]:
"""Release tasks if below the active limit."""
# The "active" argument counts active tasks by name.
released: List[TaskProxy] = []
held: List[TaskProxy] = []
n_active: int = 0
for mem in self.members:
n_active += active[mem]
Expand All @@ -51,9 +52,14 @@ def release(self, active: Counter[str]) -> List[TaskProxy]:
except IndexError:
# deque empty
break
released.append(itask)
n_active += 1
active.update({itask.tdef.name: 1})
if itask.state.is_held:
held.append(itask)
else:
released.append(itask)
n_active += 1
active.update({itask.tdef.name: 1})
for itask in held:
self.deque.appendleft(itask)
return released

def remove(self, itask: TaskProxy) -> bool:
Expand Down
61 changes: 52 additions & 9 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
from pathlib import Path
import pytest
from shutil import rmtree
from typing import List, TYPE_CHECKING, Tuple
from typing import List, TYPE_CHECKING, Tuple, Set

from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.config import WorkflowConfig
from cylc.flow.pathutil import get_cylc_run_dir
from cylc.flow.rundb import CylcWorkflowDAO
Expand All @@ -33,11 +32,13 @@
from .utils.flow_tools import (
_make_flow,
_make_scheduler,
_run_flow
_run_flow,
_start_flow,
)

if TYPE_CHECKING:
from cylc.flow.scheduler import Scheduler
from cylc.flow.task_proxy import TaskProxy


@pytest.hookimpl(tryfirst=True, hookwrapper=True)
Expand Down Expand Up @@ -160,15 +161,27 @@ def scheduler():


@pytest.fixture(scope='module')
def mod_run(run_dir: Path):
"""Run a module-level flow."""
return partial(_run_flow, run_dir, None)
def mod_start():
"""Start a scheduler but don't set it running (module scope)."""
return partial(_start_flow, None)


@pytest.fixture
def run(run_dir: Path, caplog: pytest.LogCaptureFixture):
"""Run a function-level flow."""
return partial(_run_flow, run_dir, caplog)
def start(caplog: pytest.LogCaptureFixture):
"""Start a scheduler but don't set it running."""
return partial(_start_flow, caplog)


@pytest.fixture(scope='module')
def mod_run():
"""Start a scheduler and set it running (module scope)."""
return partial(_run_flow, None)


@pytest.fixture
def run(caplog: pytest.LogCaptureFixture):
"""Start a scheduler and set it running."""
return partial(_run_flow, caplog)


@pytest.fixture
Expand Down Expand Up @@ -316,3 +329,33 @@ def _validate(reg: str, **kwargs) -> None:
)

return _validate


@pytest.fixture
def capture_submission():
"""Suppress job submission and capture submitted tasks.
Provides a function to run on a Scheduler *whilst started*, use like so:
async with start(schd):
submitted_tasks = capture_submission(schd)
or:
async with run(schd):
submitted_tasks = capture_submission(schd)
"""

def _disable_submission(schd: 'Scheduler') -> 'Set[TaskProxy]':
submitted_tasks: 'Set[TaskProxy]' = set()

def _submit_task_jobs(_, itasks, *args, **kwargs):
nonlocal submitted_tasks
submitted_tasks.update(itasks)
return itasks

schd.task_job_mgr.submit_task_jobs = _submit_task_jobs # type: ignore
return submitted_tasks

return _disable_submission
34 changes: 24 additions & 10 deletions tests/integration/test_data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from cylc.flow.task_state import (
TASK_STATUS_FAILED,
TASK_STATUS_SUCCEEDED,
TASK_STATUS_WAITING,
)
from cylc.flow.wallclock import get_current_time_string

Expand Down Expand Up @@ -86,7 +87,7 @@ def ext_id(schd):


@pytest.fixture(scope='module')
async def harness(mod_flow, mod_scheduler, mod_run):
async def harness(mod_flow, mod_scheduler, mod_start):
flow_def = {
'scheduler': {
'allow implicit tasks': True
Expand All @@ -99,11 +100,8 @@ async def harness(mod_flow, mod_scheduler, mod_run):
}
reg: str = mod_flow(flow_def)
schd: 'Scheduler' = mod_scheduler(reg)
async with mod_run(schd):
schd.pool.hold_tasks('*')
schd.resume_workflow()
# Think this is needed to save the data state at first start (?)
# Fails without it.. and a test needs to overwrite schd data with this.
async with mod_start(schd):
await schd.update_data_structure()
data = schd.data_store_mgr.data[schd.data_store_mgr.workflow_id]
yield schd, data

Expand Down Expand Up @@ -164,7 +162,7 @@ def test_initiate_data_model(harness):
assert len(data[WORKFLOW].task_proxies) == 2


def test_delta_task_state(harness):
async def test_delta_task_state(harness):
"""Test update_data_structure. This method will generate and
apply adeltas/updates given."""
schd, data = harness
Expand All @@ -178,11 +176,19 @@ def test_delta_task_state(harness):
assert TASK_STATUS_FAILED in set(collect_states(
schd.data_store_mgr.updated, TASK_PROXIES))

# put things back the way we found them
for itask in schd.pool.get_all_tasks():
itask.state.reset(TASK_STATUS_WAITING)
schd.data_store_mgr.delta_task_state(itask)
await schd.update_data_structure()

def test_delta_task_held(harness):

async def test_delta_task_held(harness):
"""Test update_data_structure. This method will generate and
apply adeltas/updates given."""
schd, data = harness
schd.pool.hold_tasks('*')
await schd.update_data_structure()
assert True in {t.is_held for t in data[TASK_PROXIES].values()}
for itask in schd.pool.get_all_tasks():
itask.state.reset(is_held=False)
Expand All @@ -192,6 +198,10 @@ def test_delta_task_held(harness):
for t in schd.data_store_mgr.updated[TASK_PROXIES].values()
}

# put things back the way we found them
schd.pool.release_held_tasks('*')
await schd.update_data_structure()


def test_insert_job(harness):
"""Test method that adds a new job to the store."""
Expand Down Expand Up @@ -246,12 +256,14 @@ def test_delta_job_time(harness):
)


def test_update_data_structure(harness):
async def test_update_data_structure(harness):
"""Test update_data_structure. This method will generate and
apply adeltas/updates given."""
schd, data = harness
w_id = schd.data_store_mgr.workflow_id
schd.data_store_mgr.data[w_id] = data
schd.pool.hold_tasks('*')
await schd.update_data_structure()
assert TASK_STATUS_FAILED not in set(collect_states(data, TASK_PROXIES))
assert TASK_STATUS_FAILED not in set(collect_states(data, FAMILY_PROXIES))
assert TASK_STATUS_FAILED not in data[WORKFLOW].state_totals
Expand All @@ -267,7 +279,9 @@ def test_update_data_structure(harness):
# state totals changed
assert TASK_STATUS_FAILED in data[WORKFLOW].state_totals
# Shows pruning worked
assert len({t.is_held for t in data[TASK_PROXIES].values()}) == 1
# TODO: fixme
# https://github.com/cylc/cylc-flow/issues/4175#issuecomment-1025666413
# assert len({t.is_held for t in data[TASK_PROXIES].values()}) == 1


def test_delta_task_prerequisite(harness):
Expand Down
Loading

0 comments on commit 9a10c85

Please sign in to comment.