diff --git a/changes.d/5902.fix.md b/changes.d/5902.fix.md new file mode 100644 index 00000000000..803b9dc9590 --- /dev/null +++ b/changes.d/5902.fix.md @@ -0,0 +1 @@ +Fixed a bug that prevented unsetting `execution time limit` by broadcast or reload. \ No newline at end of file diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index 20ee7379d27..e41e05dfd30 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -36,7 +36,7 @@ ) from shutil import rmtree from time import time -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Any, Union, Optional from cylc.flow import LOG from cylc.flow.job_runner_mgr import JobPollContext @@ -1262,10 +1262,11 @@ def _prep_submit_task_job_impl(self, workflow, itask, rtconfig): itask.submit_num] = itask.platform['name'] itask.summary['job_runner_name'] = itask.platform['job runner'] - with suppress(TypeError): - itask.summary[self.KEY_EXECUTE_TIME_LIMIT] = float( - rtconfig['execution time limit'] - ) + + # None is an allowed non-float number for Execution time limit. + itask.summary[ + self.KEY_EXECUTE_TIME_LIMIT + ] = self.get_execution_time_limit(rtconfig['execution time limit']) # Location of job file, etc self._create_job_log_path(workflow, itask) @@ -1281,6 +1282,30 @@ def _prep_submit_task_job_impl(self, workflow, itask, rtconfig): job_d=job_d ) + @staticmethod + def get_execution_time_limit( + config_execution_time_limit: Any + ) -> Union[None, float]: + """Get execution time limit from config and process it. + + If the etl from the config is a Falsy then return None. + Otherwise try and parse value as float. + + Examples: + >>> from pytest import raises + >>> this = TaskJobManager.get_execution_time_limit + + >>> this(None) + >>> this("54") + 54.0 + >>> this({}) + >>> with raises(ValueError): + ... this('🇳🇿') + """ + if config_execution_time_limit: + return float(config_execution_time_limit) + return None + def get_job_conf( self, workflow, diff --git a/tests/integration/test_task_job_mgr.py b/tests/integration/test_task_job_mgr.py index 297ad889df5..b085162a1da 100644 --- a/tests/integration/test_task_job_mgr.py +++ b/tests/integration/test_task_job_mgr.py @@ -14,6 +14,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +from contextlib import suppress import logging from typing import Any as Fixture @@ -128,3 +129,60 @@ async def test__run_job_cmd_logs_platform_lookup_fail( warning = caplog.records[-1] assert warning.levelname == 'ERROR' assert 'Unable to run command jobs-poll' in warning.msg + + +async def test__prep_submit_task_job_impl_handles_execution_time_limit( + flow: Fixture, + scheduler: Fixture, + start: Fixture, +): + """Ensure that emptying the execution time limit unsets it. + + Previously unsetting the etl by either broadcast or reload + would not unset a previous etl. + + See https://github.com/cylc/cylc-flow/issues/5891 + """ + id_ = flow({ + "scheduling": { + "cycling mode": "integer", + "graph": {"R1": "a"} + }, + "runtime": { + "root": {}, + "a": { + "script": "sleep 10", + "execution time limit": 'PT5S' + } + } + }) + + # Run in live mode - function not called in sim mode. + schd = scheduler(id_, run_mode='live') + async with start(schd): + task_a = schd.pool.get_tasks()[0] + # We're not interested in the job file stuff, just + # in the summary state. + with suppress(FileExistsError): + schd.task_job_mgr._prep_submit_task_job_impl( + schd.workflow, task_a, task_a.tdef.rtconfig) + assert task_a.summary['execution_time_limit'] == 5.0 + + # If we delete the etl it gets deleted in the summary: + task_a.tdef.rtconfig['execution time limit'] = None + with suppress(FileExistsError): + schd.task_job_mgr._prep_submit_task_job_impl( + schd.workflow, task_a, task_a.tdef.rtconfig) + assert not task_a.summary.get('execution_time_limit', '') + + # put everything back and test broadcast too. + task_a.tdef.rtconfig['execution time limit'] = 5.0 + task_a.summary['execution_time_limit'] = 5.0 + schd.broadcast_mgr.broadcasts = { + '1': {'a': {'execution time limit': None}}} + with suppress(FileExistsError): + # We run a higher level function here to ensure + # that the broadcast is applied. + schd.task_job_mgr._prep_submit_task_job( + schd.workflow, task_a) + assert not task_a.summary.get('execution_time_limit', '')