From 0b2a1d7295b43d258b504904af78d6e50b2f96c2 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Wed, 3 Jan 2024 13:42:47 +0000 Subject: [PATCH] Made execution time limit unsettable by reload or broadcast. Formerly a suppress statement was supressing cases when we wanted to set itask.summary['execution time limit'] = None. Added tests. --- changes.d/5902.fix.md | 2 + cylc/flow/task_job_mgr.py | 40 ++++++++++++++--- tests/integration/test_task_job_mgr.py | 59 ++++++++++++++++++++++++++ 3 files changed, 96 insertions(+), 5 deletions(-) create mode 100644 changes.d/5902.fix.md diff --git a/changes.d/5902.fix.md b/changes.d/5902.fix.md new file mode 100644 index 00000000000..677dcd2ab20 --- /dev/null +++ b/changes.d/5902.fix.md @@ -0,0 +1,2 @@ +Fixed a case where reloading or broadcasting execution time limit= +would have no effect. \ No newline at end of file diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index 20ee7379d27..985cbd2672d 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -36,7 +36,7 @@ ) from shutil import rmtree from time import time -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Any, Union, Optional from cylc.flow import LOG from cylc.flow.job_runner_mgr import JobPollContext @@ -1262,10 +1262,11 @@ def _prep_submit_task_job_impl(self, workflow, itask, rtconfig): itask.submit_num] = itask.platform['name'] itask.summary['job_runner_name'] = itask.platform['job runner'] - with suppress(TypeError): - itask.summary[self.KEY_EXECUTE_TIME_LIMIT] = float( - rtconfig['execution time limit'] - ) + + # None is an allowed non-float number for Execution time limit. + itask.summary[ + self.KEY_EXECUTE_TIME_LIMIT + ] = self.get_execution_time_limit(rtconfig['execution time limit']) # Location of job file, etc self._create_job_log_path(workflow, itask) @@ -1281,6 +1282,35 @@ def _prep_submit_task_job_impl(self, workflow, itask, rtconfig): job_d=job_d ) + @staticmethod + def get_execution_time_limit( + config_execution_time_limit: Any + ) -> Union[None, float]: + """Get execution time limit from config and process it. + + We are making no assumptions about what can be passed in, + but passing silently and returning None if the value is + not a type convertable to a float: + + Examples: + >>> this = TaskJobManager.get_execution_time_limit + >>> this(None) + >>> this("54") + 54.0 + >>> this({}) + + N.b. + This function does not save you from a value error: + >>> from pytest import raises + >>> with raises(ValueError): + ... this("🇳🇿") + """ + execution_time_limit = None + if config_execution_time_limit is not None: + with suppress(TypeError): + execution_time_limit = float(config_execution_time_limit) + return execution_time_limit + def get_job_conf( self, workflow, diff --git a/tests/integration/test_task_job_mgr.py b/tests/integration/test_task_job_mgr.py index 297ad889df5..8e68a720cc0 100644 --- a/tests/integration/test_task_job_mgr.py +++ b/tests/integration/test_task_job_mgr.py @@ -14,6 +14,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +from contextlib import suppress import logging from typing import Any as Fixture @@ -128,3 +129,61 @@ async def test__run_job_cmd_logs_platform_lookup_fail( warning = caplog.records[-1] assert warning.levelname == 'ERROR' assert 'Unable to run command jobs-poll' in warning.msg + + +async def test__prep_submit_task_job_impl_handles_execution_time_limit( + flow: Fixture, + scheduler: Fixture, + start: Fixture, + validate: Fixture, + pytestconfig: Fixture +): + """Ensure that emptying the execution time limit unsets it. + + Previously unsetting the etl by either broadcast or reload + would not unset a previous etl. + + See https://github.com/cylc/cylc-flow/issues/5891 + """ + id_ = flow({ + "scheduling": { + "cycling mode": "integer", + "graph": {"R1": "a"} + }, + "runtime": { + "root": {}, + "a": { + "script": "sleep 10", + "execution time limit": 'PT5S' + } + } + }) + # Debugging only: + if pytestconfig.option.verbose > 2: + validate(id_) + + # Run in live mode - function not called in sim mode. + schd = scheduler(id_, run_mode='live') + async with start(schd): + task_a = schd.pool.get_tasks()[0] + # We're not interested in the job file stuff, just + # in the summary state. + with suppress(FileExistsError): + schd.task_job_mgr._prep_submit_task_job_impl( + schd.workflow, task_a, task_a.tdef.rtconfig) + assert task_a.summary['execution_time_limit'] == 5.0 + + # If we delete the etl it gets deleted in the summary: + task_a.tdef.rtconfig['execution time limit'] = None + schd.task_job_mgr._prep_submit_task_job_impl( + schd.workflow, task_a, task_a.tdef.rtconfig) + assert not task_a.summary.get('execution_time_limit', '') + + # put everything back and test broadcast too. + task_a.tdef.rtconfig['execution time limit'] = 5.0 + task_a.summary['execution_time_limit'] = 5.0 + schd.broadcast_mgr.broadcasts = { + '1': {'a': {'execution time limit': None}}} + schd.task_job_mgr._prep_submit_task_job_impl( + schd.workflow, task_a, task_a.tdef.rtconfig) + assert not task_a.summary.get('execution_time_limit', '')