diff --git a/cylc/flow/cfgspec/workflow.py b/cylc/flow/cfgspec/workflow.py index a3efae13831..1da7401d2d5 100644 --- a/cylc/flow/cfgspec/workflow.py +++ b/cylc/flow/cfgspec/workflow.py @@ -1339,7 +1339,7 @@ def get_script_common_text(this: str, example: Optional[str] = None): ) Conf( 'run mode', VDR.V_STRING, - options=list(TASK_CONFIG_RUN_MODES) + [''], + options=list(TASK_CONFIG_RUN_MODES), default=RunMode.LIVE.value, desc=f''' For a workflow run in live mode run this task in skip diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index bce6ea64e9f..2f0aa5afab4 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -33,6 +33,7 @@ install as cylc_install, get_option_parser as install_gop ) +from cylc.flow.task_state import TASK_STATUS_SUBMITTED, TASK_STATUS_SUCCEEDED from cylc.flow.util import serialise_set from cylc.flow.wallclock import get_current_time_string from cylc.flow.workflow_files import infer_latest_run_from_id @@ -47,6 +48,7 @@ _start_flow, ) + if TYPE_CHECKING: from cylc.flow.network.client import WorkflowRuntimeClient from cylc.flow.scheduler import Scheduler @@ -672,3 +674,43 @@ async def _reftest( return triggers return _reftest + + +@pytest.fixture +def capture_live_submissions(capcall, monkeypatch): + """Capture live submission attempts. + + This prevents real jobs from being submitted to the system. + + If you call this fixture from a test, it will return a set of tasks that + would have been submitted had this fixture not been used. + """ + def fake_submit(self, _workflow, itasks, *_): + self.submit_nonlive_task_jobs(_workflow, itasks, 'simulation') + for itask in itasks: + for status in (TASK_STATUS_SUBMITTED, TASK_STATUS_SUCCEEDED): + self.task_events_mgr.process_message( + itask, + 'INFO', + status, + '2000-01-01T00:00:00Z', + '(received)', + ) + return itasks + + # suppress and capture live submissions + submit_live_calls = capcall( + 'cylc.flow.task_job_mgr.TaskJobManager.submit_livelike_task_jobs', + fake_submit) + + + + def get_submissions(): + nonlocal submit_live_calls + return { + itask.identity + for ((_self, _workflow, itasks, *_), _kwargs) in submit_live_calls + for itask in itasks + } + + return get_submissions diff --git a/tests/integration/run_modes/test_mode_overrides.py b/tests/integration/run_modes/test_mode_overrides.py index cae6a5c710c..c54065e8e21 100644 --- a/tests/integration/run_modes/test_mode_overrides.py +++ b/tests/integration/run_modes/test_mode_overrides.py @@ -31,50 +31,6 @@ import pytest from cylc.flow.run_modes import WORKFLOW_RUN_MODES -from cylc.flow.task_state import TASK_STATUS_SUBMITTED, TASK_STATUS_SUCCEEDED - - -@pytest.fixture -def capture_live_submissions(capcall, monkeypatch): - """Capture live submission attempts. - - This prevents real jobs from being submitted to the system. - - If you call this fixture from a test, it will return a set of tasks that - would have been submitted had this fixture not been used. - """ - def fake_submit(self, _workflow, itasks, *_): - for itask in itasks: - for status in (TASK_STATUS_SUBMITTED, TASK_STATUS_SUCCEEDED): - self.task_events_mgr.process_message( - itask, - 'INFO', - status, - '2000-01-01T00:00:00Z', - '(received)', - ) - return itasks - - # suppress and capture live submissions - submit_live_calls = capcall( - 'cylc.flow.task_job_mgr.TaskJobManager.submit_livelike_task_jobs', - fake_submit) - - # Patch out method for writing data store and database which would - # otherwise cause unrelated failure: - monkeypatch.setattr( - 'cylc.flow.task_events_mgr.TaskEventsManager._insert_task_job', - lambda *_, **__: None) - - def get_submissions(): - nonlocal submit_live_calls - return { - itask.identity - for ((_self, _workflow, itasks, *_), _kwargs) in submit_live_calls - for itask in itasks - } - - return get_submissions @pytest.mark.parametrize('workflow_run_mode', sorted(WORKFLOW_RUN_MODES)) @@ -118,34 +74,19 @@ async def test_force_trigger_does_not_override_run_mode( ): """Force-triggering a task will not override the run mode. - Tasks with run mode = skip will continue to abide by - the is_held flag as normal. - Taken from spec at - https://github.com/cylc/cylc-admin/blob/master/ - docs/proposal-skip-mode.md#proposal + https://github.com/cylc/cylc-admin/blob/master/docs/proposal-skip-mode.md#proposal """ wid = flow({ 'scheduling': {'graph': {'R1': 'foo'}}, 'runtime': {'foo': {'run mode': 'skip'}} }) - schd = scheduler(wid) + schd = scheduler(wid, run_mode="live") async with start(schd): - # Check that task isn't held at first foo = schd.pool.get_tasks()[0] - assert foo.state.is_held is False - - # Hold task, check that it's held: - schd.pool.hold_tasks('1/foo') - assert foo.state.is_held is True - # Trigger task, check that it's _still_ held: + # Force trigger task: schd.pool.force_trigger_tasks('1/foo', [1]) - assert foo.state.is_held is True - - # run_mode will always be simulation from test - # workflow before submit routine... - assert not foo.run_mode # ... but job submission will always change this to the correct mode: schd.task_job_mgr.submit_task_jobs( @@ -153,11 +94,45 @@ async def test_force_trigger_does_not_override_run_mode( [foo], schd.server.curve_auth, schd.server.client_pub_key_dir) + assert foo.run_mode == 'skip' +async def test_run_mode_skip_abides_by_held( + flow, + scheduler, + run, + complete +): + """Tasks with run mode = skip will continue to abide by the + is_held flag as normal. + + Taken from spec at + https://github.com/cylc/cylc-admin/blob/master/docs/proposal-skip-mode.md#proposal + """ + wid = flow({ + 'scheduling': {'graph': {'R1': 'foo'}}, + 'runtime': {'foo': {'run mode': 'skip'}} + }) + schd = scheduler(wid, run_mode="live", paused_start=False) + async with run(schd): + foo = schd.pool.get_tasks()[0] + assert foo.state.is_held is False + + # Hold task, check that it's held: + schd.pool.hold_tasks('1/foo') + assert foo.state.is_held is True + + # Run to completion, should happen if task isn't held: + with pytest.raises( + Exception, + match="Timeout waiting for workflow to shut down" + ): + await complete(schd, timeout=5) + + async def test_run_mode_override_from_broadcast( - flow, scheduler, run, complete, log_filter + flow, scheduler, start, complete, log_filter, capture_live_submissions ): """Test that run_mode modifications only apply to one task. """ @@ -173,7 +148,7 @@ async def test_run_mode_override_from_broadcast( id_ = flow(cfg) schd = scheduler(id_, run_mode='live', paused_start=False) - async with run(schd): + async with start(schd): schd.broadcast_mgr.put_broadcast( ['1000'], ['foo'], [{'run mode': 'skip'}]) @@ -184,6 +159,5 @@ async def test_run_mode_override_from_broadcast( [foo_1000, foo_1001], schd.server.curve_auth, schd.server.client_pub_key_dir) - assert foo_1000.run_mode == 'skip' - assert foo_1001.run_mode == 'live' + assert capture_live_submissions() == {'1001/foo'} diff --git a/tests/integration/run_modes/test_nonlive.py b/tests/integration/run_modes/test_nonlive.py index c6a1ac83b34..973f39e3301 100644 --- a/tests/integration/run_modes/test_nonlive.py +++ b/tests/integration/run_modes/test_nonlive.py @@ -22,11 +22,11 @@ 'flow_nums': '[1]', 'is_manual_submit': 0, 'try_num': 1, - 'submit_status': None, + 'submit_status': 0, 'run_signal': None, - 'run_status': None, - 'platform_name': 'localhost', - 'job_runner_name': 'background', + 'run_status': 0, + 'platform_name': 'simulation', + 'job_runner_name': 'simulation', 'job_id': None}, 'skip': { 'flow_nums': '[1]', @@ -47,7 +47,7 @@ def not_time(data: Dict[str, Any]): return {k: v for k, v in data.items() if 'time' not in k} -async def test_task_jobs(flow, scheduler, start): +async def test_task_jobs(flow, scheduler, start, capture_live_submissions): """Ensure that task job data is added to the database correctly for each run mode. """ @@ -58,6 +58,8 @@ async def test_task_jobs(flow, scheduler, start): mode: {'run mode': mode} for mode in KGO} })) async with start(schd): + task_proxies = schd.pool.get_tasks() + schd.task_job_mgr.submit_task_jobs( schd.workflow, schd.pool.get_tasks(), @@ -88,8 +90,13 @@ async def test_task_jobs(flow, scheduler, start): assert taskdata == kgo, ( f'Mode {mode}: incorrect db entries.') + assert task_proxies[0].run_mode == 'simulation' + assert task_proxies[1].run_mode == 'skip' + -async def test_mean_task_time(flow, scheduler, run, complete): +async def test_mean_task_time( + flow, scheduler, start, complete, capture_live_submissions +): """Non-live tasks are not added to the list of task times, so skipping tasks will not affect how long Cylc expects tasks to run. """ @@ -100,7 +107,7 @@ async def test_mean_task_time(flow, scheduler, run, complete): 'graph': {'P1Y': 'foo'}} }), run_mode='live') - async with run(schd): + async with start(schd): itask = schd.pool.get_tasks()[0] assert list(itask.tdef.elapsed_times) == [] diff --git a/tests/integration/run_modes/test_simulation.py b/tests/integration/run_modes/test_simulation.py index b8a42ff1a27..7beb08e6f5b 100644 --- a/tests/integration/run_modes/test_simulation.py +++ b/tests/integration/run_modes/test_simulation.py @@ -16,7 +16,6 @@ """Test the workings of simulation mode""" -from pathlib import Path import pytest from pytest import param diff --git a/tests/integration/run_modes/test_skip.py b/tests/integration/run_modes/test_skip.py index ca44aad08dd..6994e979630 100644 --- a/tests/integration/run_modes/test_skip.py +++ b/tests/integration/run_modes/test_skip.py @@ -78,15 +78,16 @@ async def test_broadcast_changes_set_skip_outputs( | The skip keyword should not be allowed in custom outputs. """ wid = flow({ - 'scheduling': {'graph': {'R1': 'foo:expect_this'}}, - 'runtime': {'foo': {'outputs': {'expect_this': 'some message'}}} + 'scheduling': {'graph': {'R1': 'foo:x?\nfoo:y?'}}, + 'runtime': {'foo': {'outputs': { + 'x': 'some message', 'y': 'another message'}}} }) schd = scheduler(wid, run_mode='live') async with start(schd): schd.broadcast_mgr.put_broadcast( ['1'], ['foo'], - [{'skip': {'outputs': 'expect_this'}}], + [{'skip': {'outputs': 'x'}}], ) foo, = schd.pool.get_tasks() schd.pool.set_prereqs_and_outputs( @@ -94,14 +95,18 @@ async def test_broadcast_changes_set_skip_outputs( foo_outputs = foo.state.outputs.get_completed_outputs() - assert 'expect_this' in foo_outputs - assert foo_outputs['expect_this'] == '(manually completed)' + assert foo_outputs == { + 'submitted': '(manually completed)', + 'started': '(manually completed)', + 'succeeded': '(manually completed)', + 'x': '(manually completed)'} async def test_skip_mode_outputs( flow, scheduler, reftest, ): - """Nearly a functional test of the output emission of skip mode tasks + """Skip mode can be configured by the `[runtime][][skip]` + section. Skip mode proposal point 2 https://github.com/cylc/cylc-admin/blob/master/docs/proposal-skip-mode.md @@ -199,28 +204,6 @@ async def test_doesnt_release_held_tasks( assert log_filter(log, contains='=> succeeded'), msg.format('succeed') -async def test_force_trigger_doesnt_change_mode( - flow, scheduler, run, complete -): - """Point 6 from the skip mode proposal - https://github.com/cylc/cylc-admin/blob/master/docs/proposal-skip-mode.md - - | Force-triggering a task will not override the run mode. - """ - wid = flow({ - 'scheduling': {'graph': {'R1': 'slow => skip'}}, - 'runtime': { - 'slow': {'script': 'sleep 6'}, - 'skip': {'script': 'exit 1', 'run mode': 'skip'} - } - }) - schd = scheduler(wid, run_mode='live', paused_start=False) - async with run(schd): - schd.pool.force_trigger_tasks(['1/skip'], [1]) - # This will timeout if the skip task has become live on triggering: - await complete(schd, '1/skip', timeout=6) - - async def test_prereqs_marked_satisfied_by_skip_mode( flow, scheduler, start, log_filter, complete ): diff --git a/tests/integration/utils/flow_tools.py b/tests/integration/utils/flow_tools.py index 3da32733ffc..fef15e3e3dc 100644 --- a/tests/integration/utils/flow_tools.py +++ b/tests/integration/utils/flow_tools.py @@ -115,10 +115,6 @@ def __make_scheduler(id_: str, **opts: Any) -> Scheduler: schd.workflow_db_mgr.on_workflow_shutdown() -def caplogprinter(caplog): - _ = [print(i) for i in caplog.messages] - - @asynccontextmanager async def _start_flow( caplog: Optional[pytest.LogCaptureFixture], @@ -128,8 +124,6 @@ async def _start_flow( """Start a scheduler but don't set the main loop running.""" if caplog: caplog.set_level(level, CYLC_LOG) - # Debug functionality - caplog.print = lambda: caplogprinter(caplog) await schd.install() @@ -160,8 +154,6 @@ async def _run_flow( """Start a scheduler and set the main loop running.""" if caplog: caplog.set_level(level, CYLC_LOG) - # Debug functionality - caplog.print = lambda: caplogprinter(caplog) await schd.install()