Skip to content

Commit

Permalink
f
Browse files Browse the repository at this point in the history
  • Loading branch information
wxtim committed Sep 24, 2024
1 parent b1fda3d commit f3e770f
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 111 deletions.
2 changes: 1 addition & 1 deletion cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1339,7 +1339,7 @@ def get_script_common_text(this: str, example: Optional[str] = None):
)
Conf(
'run mode', VDR.V_STRING,
options=list(TASK_CONFIG_RUN_MODES) + [''],
options=list(TASK_CONFIG_RUN_MODES),
default=RunMode.LIVE.value,
desc=f'''
For a workflow run in live mode run this task in skip
Expand Down
42 changes: 42 additions & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
install as cylc_install,
get_option_parser as install_gop
)
from cylc.flow.task_state import TASK_STATUS_SUBMITTED, TASK_STATUS_SUCCEEDED
from cylc.flow.util import serialise_set
from cylc.flow.wallclock import get_current_time_string
from cylc.flow.workflow_files import infer_latest_run_from_id
Expand All @@ -47,6 +48,7 @@
_start_flow,
)


if TYPE_CHECKING:
from cylc.flow.network.client import WorkflowRuntimeClient
from cylc.flow.scheduler import Scheduler
Expand Down Expand Up @@ -672,3 +674,43 @@ async def _reftest(
return triggers

return _reftest


@pytest.fixture
def capture_live_submissions(capcall, monkeypatch):
"""Capture live submission attempts.
This prevents real jobs from being submitted to the system.
If you call this fixture from a test, it will return a set of tasks that
would have been submitted had this fixture not been used.
"""
def fake_submit(self, _workflow, itasks, *_):
self.submit_nonlive_task_jobs(_workflow, itasks, 'simulation')
for itask in itasks:
for status in (TASK_STATUS_SUBMITTED, TASK_STATUS_SUCCEEDED):
self.task_events_mgr.process_message(
itask,
'INFO',
status,
'2000-01-01T00:00:00Z',
'(received)',
)
return itasks

# suppress and capture live submissions
submit_live_calls = capcall(
'cylc.flow.task_job_mgr.TaskJobManager.submit_livelike_task_jobs',
fake_submit)



def get_submissions():
nonlocal submit_live_calls
return {
itask.identity
for ((_self, _workflow, itasks, *_), _kwargs) in submit_live_calls
for itask in itasks
}

return get_submissions
106 changes: 40 additions & 66 deletions tests/integration/run_modes/test_mode_overrides.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,50 +31,6 @@
import pytest

from cylc.flow.run_modes import WORKFLOW_RUN_MODES
from cylc.flow.task_state import TASK_STATUS_SUBMITTED, TASK_STATUS_SUCCEEDED


@pytest.fixture
def capture_live_submissions(capcall, monkeypatch):
"""Capture live submission attempts.
This prevents real jobs from being submitted to the system.
If you call this fixture from a test, it will return a set of tasks that
would have been submitted had this fixture not been used.
"""
def fake_submit(self, _workflow, itasks, *_):
for itask in itasks:
for status in (TASK_STATUS_SUBMITTED, TASK_STATUS_SUCCEEDED):
self.task_events_mgr.process_message(
itask,
'INFO',
status,
'2000-01-01T00:00:00Z',
'(received)',
)
return itasks

# suppress and capture live submissions
submit_live_calls = capcall(
'cylc.flow.task_job_mgr.TaskJobManager.submit_livelike_task_jobs',
fake_submit)

# Patch out method for writing data store and database which would
# otherwise cause unrelated failure:
monkeypatch.setattr(
'cylc.flow.task_events_mgr.TaskEventsManager._insert_task_job',
lambda *_, **__: None)

def get_submissions():
nonlocal submit_live_calls
return {
itask.identity
for ((_self, _workflow, itasks, *_), _kwargs) in submit_live_calls
for itask in itasks
}

return get_submissions


@pytest.mark.parametrize('workflow_run_mode', sorted(WORKFLOW_RUN_MODES))
Expand Down Expand Up @@ -118,46 +74,65 @@ async def test_force_trigger_does_not_override_run_mode(
):
"""Force-triggering a task will not override the run mode.
Tasks with run mode = skip will continue to abide by
the is_held flag as normal.
Taken from spec at
https://github.com/cylc/cylc-admin/blob/master/
docs/proposal-skip-mode.md#proposal
https://github.com/cylc/cylc-admin/blob/master/docs/proposal-skip-mode.md#proposal
"""
wid = flow({
'scheduling': {'graph': {'R1': 'foo'}},
'runtime': {'foo': {'run mode': 'skip'}}
})
schd = scheduler(wid)
schd = scheduler(wid, run_mode="live")
async with start(schd):
# Check that task isn't held at first
foo = schd.pool.get_tasks()[0]
assert foo.state.is_held is False

# Hold task, check that it's held:
schd.pool.hold_tasks('1/foo')
assert foo.state.is_held is True

# Trigger task, check that it's _still_ held:
# Force trigger task:
schd.pool.force_trigger_tasks('1/foo', [1])
assert foo.state.is_held is True

# run_mode will always be simulation from test
# workflow before submit routine...
assert not foo.run_mode

# ... but job submission will always change this to the correct mode:
schd.task_job_mgr.submit_task_jobs(
schd.workflow,
[foo],
schd.server.curve_auth,
schd.server.client_pub_key_dir)

assert foo.run_mode == 'skip'


async def test_run_mode_skip_abides_by_held(
flow,
scheduler,
run,
complete
):
"""Tasks with run mode = skip will continue to abide by the
is_held flag as normal.
Taken from spec at
https://github.com/cylc/cylc-admin/blob/master/docs/proposal-skip-mode.md#proposal
"""
wid = flow({
'scheduling': {'graph': {'R1': 'foo'}},
'runtime': {'foo': {'run mode': 'skip'}}
})
schd = scheduler(wid, run_mode="live", paused_start=False)
async with run(schd):
foo = schd.pool.get_tasks()[0]
assert foo.state.is_held is False

# Hold task, check that it's held:
schd.pool.hold_tasks('1/foo')
assert foo.state.is_held is True

# Run to completion, should happen if task isn't held:
with pytest.raises(
Exception,
match="Timeout waiting for workflow to shut down"
):
await complete(schd, timeout=5)


async def test_run_mode_override_from_broadcast(
flow, scheduler, run, complete, log_filter
flow, scheduler, start, complete, log_filter, capture_live_submissions
):
"""Test that run_mode modifications only apply to one task.
"""
Expand All @@ -173,7 +148,7 @@ async def test_run_mode_override_from_broadcast(
id_ = flow(cfg)
schd = scheduler(id_, run_mode='live', paused_start=False)

async with run(schd):
async with start(schd):
schd.broadcast_mgr.put_broadcast(
['1000'], ['foo'], [{'run mode': 'skip'}])

Expand All @@ -184,6 +159,5 @@ async def test_run_mode_override_from_broadcast(
[foo_1000, foo_1001],
schd.server.curve_auth,
schd.server.client_pub_key_dir)

assert foo_1000.run_mode == 'skip'
assert foo_1001.run_mode == 'live'
assert capture_live_submissions() == {'1001/foo'}
21 changes: 14 additions & 7 deletions tests/integration/run_modes/test_nonlive.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
'flow_nums': '[1]',
'is_manual_submit': 0,
'try_num': 1,
'submit_status': None,
'submit_status': 0,
'run_signal': None,
'run_status': None,
'platform_name': 'localhost',
'job_runner_name': 'background',
'run_status': 0,
'platform_name': 'simulation',
'job_runner_name': 'simulation',
'job_id': None},
'skip': {
'flow_nums': '[1]',
Expand All @@ -47,7 +47,7 @@ def not_time(data: Dict[str, Any]):
return {k: v for k, v in data.items() if 'time' not in k}


async def test_task_jobs(flow, scheduler, start):
async def test_task_jobs(flow, scheduler, start, capture_live_submissions):
"""Ensure that task job data is added to the database correctly
for each run mode.
"""
Expand All @@ -58,6 +58,8 @@ async def test_task_jobs(flow, scheduler, start):
mode: {'run mode': mode} for mode in KGO}
}))
async with start(schd):
task_proxies = schd.pool.get_tasks()

schd.task_job_mgr.submit_task_jobs(
schd.workflow,
schd.pool.get_tasks(),
Expand Down Expand Up @@ -88,8 +90,13 @@ async def test_task_jobs(flow, scheduler, start):
assert taskdata == kgo, (
f'Mode {mode}: incorrect db entries.')

assert task_proxies[0].run_mode == 'simulation'
assert task_proxies[1].run_mode == 'skip'


async def test_mean_task_time(flow, scheduler, run, complete):
async def test_mean_task_time(
flow, scheduler, start, complete, capture_live_submissions
):
"""Non-live tasks are not added to the list of task times,
so skipping tasks will not affect how long Cylc expects tasks to run.
"""
Expand All @@ -100,7 +107,7 @@ async def test_mean_task_time(flow, scheduler, run, complete):
'graph': {'P1Y': 'foo'}}
}), run_mode='live')

async with run(schd):
async with start(schd):
itask = schd.pool.get_tasks()[0]
assert list(itask.tdef.elapsed_times) == []

Expand Down
1 change: 0 additions & 1 deletion tests/integration/run_modes/test_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

"""Test the workings of simulation mode"""

from pathlib import Path
import pytest
from pytest import param

Expand Down
39 changes: 11 additions & 28 deletions tests/integration/run_modes/test_skip.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,30 +78,35 @@ async def test_broadcast_changes_set_skip_outputs(
| The skip keyword should not be allowed in custom outputs.
"""
wid = flow({
'scheduling': {'graph': {'R1': 'foo:expect_this'}},
'runtime': {'foo': {'outputs': {'expect_this': 'some message'}}}
'scheduling': {'graph': {'R1': 'foo:x?\nfoo:y?'}},
'runtime': {'foo': {'outputs': {
'x': 'some message', 'y': 'another message'}}}
})
schd = scheduler(wid, run_mode='live')
async with start(schd):
schd.broadcast_mgr.put_broadcast(
['1'],
['foo'],
[{'skip': {'outputs': 'expect_this'}}],
[{'skip': {'outputs': 'x'}}],
)
foo, = schd.pool.get_tasks()
schd.pool.set_prereqs_and_outputs(
'1/foo', ['skip'], [], ['all'])

foo_outputs = foo.state.outputs.get_completed_outputs()

assert 'expect_this' in foo_outputs
assert foo_outputs['expect_this'] == '(manually completed)'
assert foo_outputs == {
'submitted': '(manually completed)',
'started': '(manually completed)',
'succeeded': '(manually completed)',
'x': '(manually completed)'}


async def test_skip_mode_outputs(
flow, scheduler, reftest,
):
"""Nearly a functional test of the output emission of skip mode tasks
"""Skip mode can be configured by the `[runtime][<namespace>][skip]`
section.
Skip mode proposal point 2
https://github.com/cylc/cylc-admin/blob/master/docs/proposal-skip-mode.md
Expand Down Expand Up @@ -199,28 +204,6 @@ async def test_doesnt_release_held_tasks(
assert log_filter(log, contains='=> succeeded'), msg.format('succeed')


async def test_force_trigger_doesnt_change_mode(
flow, scheduler, run, complete
):
"""Point 6 from the skip mode proposal
https://github.com/cylc/cylc-admin/blob/master/docs/proposal-skip-mode.md
| Force-triggering a task will not override the run mode.
"""
wid = flow({
'scheduling': {'graph': {'R1': 'slow => skip'}},
'runtime': {
'slow': {'script': 'sleep 6'},
'skip': {'script': 'exit 1', 'run mode': 'skip'}
}
})
schd = scheduler(wid, run_mode='live', paused_start=False)
async with run(schd):
schd.pool.force_trigger_tasks(['1/skip'], [1])
# This will timeout if the skip task has become live on triggering:
await complete(schd, '1/skip', timeout=6)


async def test_prereqs_marked_satisfied_by_skip_mode(
flow, scheduler, start, log_filter, complete
):
Expand Down
8 changes: 0 additions & 8 deletions tests/integration/utils/flow_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,6 @@ def __make_scheduler(id_: str, **opts: Any) -> Scheduler:
schd.workflow_db_mgr.on_workflow_shutdown()


def caplogprinter(caplog):
_ = [print(i) for i in caplog.messages]


@asynccontextmanager
async def _start_flow(
caplog: Optional[pytest.LogCaptureFixture],
Expand All @@ -128,8 +124,6 @@ async def _start_flow(
"""Start a scheduler but don't set the main loop running."""
if caplog:
caplog.set_level(level, CYLC_LOG)
# Debug functionality
caplog.print = lambda: caplogprinter(caplog)

await schd.install()

Expand Down Expand Up @@ -160,8 +154,6 @@ async def _run_flow(
"""Start a scheduler and set the main loop running."""
if caplog:
caplog.set_level(level, CYLC_LOG)
# Debug functionality
caplog.print = lambda: caplogprinter(caplog)

await schd.install()

Expand Down

0 comments on commit f3e770f

Please sign in to comment.