From 0b2a1d7295b43d258b504904af78d6e50b2f96c2 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Wed, 3 Jan 2024 13:42:47 +0000
Subject: [PATCH 1/6] Made execution time limit unsettable by reload or
broadcast. Formerly a suppress statement was supressing cases when we wanted
to set itask.summary['execution time limit'] = None.
Added tests.
---
changes.d/5902.fix.md | 2 +
cylc/flow/task_job_mgr.py | 40 ++++++++++++++---
tests/integration/test_task_job_mgr.py | 59 ++++++++++++++++++++++++++
3 files changed, 96 insertions(+), 5 deletions(-)
create mode 100644 changes.d/5902.fix.md
diff --git a/changes.d/5902.fix.md b/changes.d/5902.fix.md
new file mode 100644
index 00000000000..677dcd2ab20
--- /dev/null
+++ b/changes.d/5902.fix.md
@@ -0,0 +1,2 @@
+Fixed a case where reloading or broadcasting execution time limit=
+would have no effect.
\ No newline at end of file
diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py
index 20ee7379d27..985cbd2672d 100644
--- a/cylc/flow/task_job_mgr.py
+++ b/cylc/flow/task_job_mgr.py
@@ -36,7 +36,7 @@
)
from shutil import rmtree
from time import time
-from typing import TYPE_CHECKING, Optional
+from typing import TYPE_CHECKING, Any, Union, Optional
from cylc.flow import LOG
from cylc.flow.job_runner_mgr import JobPollContext
@@ -1262,10 +1262,11 @@ def _prep_submit_task_job_impl(self, workflow, itask, rtconfig):
itask.submit_num] = itask.platform['name']
itask.summary['job_runner_name'] = itask.platform['job runner']
- with suppress(TypeError):
- itask.summary[self.KEY_EXECUTE_TIME_LIMIT] = float(
- rtconfig['execution time limit']
- )
+
+ # None is an allowed non-float number for Execution time limit.
+ itask.summary[
+ self.KEY_EXECUTE_TIME_LIMIT
+ ] = self.get_execution_time_limit(rtconfig['execution time limit'])
# Location of job file, etc
self._create_job_log_path(workflow, itask)
@@ -1281,6 +1282,35 @@ def _prep_submit_task_job_impl(self, workflow, itask, rtconfig):
job_d=job_d
)
+ @staticmethod
+ def get_execution_time_limit(
+ config_execution_time_limit: Any
+ ) -> Union[None, float]:
+ """Get execution time limit from config and process it.
+
+ We are making no assumptions about what can be passed in,
+ but passing silently and returning None if the value is
+ not a type convertable to a float:
+
+ Examples:
+ >>> this = TaskJobManager.get_execution_time_limit
+ >>> this(None)
+ >>> this("54")
+ 54.0
+ >>> this({})
+
+ N.b.
+ This function does not save you from a value error:
+ >>> from pytest import raises
+ >>> with raises(ValueError):
+ ... this("🇳🇿")
+ """
+ execution_time_limit = None
+ if config_execution_time_limit is not None:
+ with suppress(TypeError):
+ execution_time_limit = float(config_execution_time_limit)
+ return execution_time_limit
+
def get_job_conf(
self,
workflow,
diff --git a/tests/integration/test_task_job_mgr.py b/tests/integration/test_task_job_mgr.py
index 297ad889df5..8e68a720cc0 100644
--- a/tests/integration/test_task_job_mgr.py
+++ b/tests/integration/test_task_job_mgr.py
@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
+from contextlib import suppress
import logging
from typing import Any as Fixture
@@ -128,3 +129,61 @@ async def test__run_job_cmd_logs_platform_lookup_fail(
warning = caplog.records[-1]
assert warning.levelname == 'ERROR'
assert 'Unable to run command jobs-poll' in warning.msg
+
+
+async def test__prep_submit_task_job_impl_handles_execution_time_limit(
+ flow: Fixture,
+ scheduler: Fixture,
+ start: Fixture,
+ validate: Fixture,
+ pytestconfig: Fixture
+):
+ """Ensure that emptying the execution time limit unsets it.
+
+ Previously unsetting the etl by either broadcast or reload
+ would not unset a previous etl.
+
+ See https://github.com/cylc/cylc-flow/issues/5891
+ """
+ id_ = flow({
+ "scheduling": {
+ "cycling mode": "integer",
+ "graph": {"R1": "a"}
+ },
+ "runtime": {
+ "root": {},
+ "a": {
+ "script": "sleep 10",
+ "execution time limit": 'PT5S'
+ }
+ }
+ })
+ # Debugging only:
+ if pytestconfig.option.verbose > 2:
+ validate(id_)
+
+ # Run in live mode - function not called in sim mode.
+ schd = scheduler(id_, run_mode='live')
+ async with start(schd):
+ task_a = schd.pool.get_tasks()[0]
+ # We're not interested in the job file stuff, just
+ # in the summary state.
+ with suppress(FileExistsError):
+ schd.task_job_mgr._prep_submit_task_job_impl(
+ schd.workflow, task_a, task_a.tdef.rtconfig)
+ assert task_a.summary['execution_time_limit'] == 5.0
+
+ # If we delete the etl it gets deleted in the summary:
+ task_a.tdef.rtconfig['execution time limit'] = None
+ schd.task_job_mgr._prep_submit_task_job_impl(
+ schd.workflow, task_a, task_a.tdef.rtconfig)
+ assert not task_a.summary.get('execution_time_limit', '')
+
+ # put everything back and test broadcast too.
+ task_a.tdef.rtconfig['execution time limit'] = 5.0
+ task_a.summary['execution_time_limit'] = 5.0
+ schd.broadcast_mgr.broadcasts = {
+ '1': {'a': {'execution time limit': None}}}
+ schd.task_job_mgr._prep_submit_task_job_impl(
+ schd.workflow, task_a, task_a.tdef.rtconfig)
+ assert not task_a.summary.get('execution_time_limit', '')
From f1d559f20e3c0b3567feaa8f1cff1ab893155e5c Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Wed, 3 Jan 2024 14:47:04 +0000
Subject: [PATCH 2/6] response to verbal review
---
tests/integration/test_task_job_mgr.py | 5 -----
1 file changed, 5 deletions(-)
diff --git a/tests/integration/test_task_job_mgr.py b/tests/integration/test_task_job_mgr.py
index 8e68a720cc0..5bfdf856a8f 100644
--- a/tests/integration/test_task_job_mgr.py
+++ b/tests/integration/test_task_job_mgr.py
@@ -135,8 +135,6 @@ async def test__prep_submit_task_job_impl_handles_execution_time_limit(
flow: Fixture,
scheduler: Fixture,
start: Fixture,
- validate: Fixture,
- pytestconfig: Fixture
):
"""Ensure that emptying the execution time limit unsets it.
@@ -158,9 +156,6 @@ async def test__prep_submit_task_job_impl_handles_execution_time_limit(
}
}
})
- # Debugging only:
- if pytestconfig.option.verbose > 2:
- validate(id_)
# Run in live mode - function not called in sim mode.
schd = scheduler(id_, run_mode='live')
From 4bc51b958eb1c3ea363f11a20e9092e5ab1bb77e Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Thu, 4 Jan 2024 08:59:03 +0000
Subject: [PATCH 3/6] Apply suggestions from code review
Co-authored-by: Oliver Sanders
---
cylc/flow/task_job_mgr.py | 8 +++-----
1 file changed, 3 insertions(+), 5 deletions(-)
diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py
index 985cbd2672d..e67b1d1c1c0 100644
--- a/cylc/flow/task_job_mgr.py
+++ b/cylc/flow/task_job_mgr.py
@@ -1305,11 +1305,9 @@ def get_execution_time_limit(
>>> with raises(ValueError):
... this("🇳🇿")
"""
- execution_time_limit = None
- if config_execution_time_limit is not None:
- with suppress(TypeError):
- execution_time_limit = float(config_execution_time_limit)
- return execution_time_limit
+ with suppress(TypeError):
+ return float(config_execution_time_limit)
+ return None
def get_job_conf(
self,
From 95d7a87c354828f00fe4f7a5b473b4a3c3b3821c Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Thu, 4 Jan 2024 09:09:29 +0000
Subject: [PATCH 4/6] Prevent false positives in test for broadcast. Test
broadcast using _prep_submit_task_job rather than _prep_submit_task_job_impl
so that the fake broadcast is applied.
---
tests/integration/test_task_job_mgr.py | 28 +++++++++++++++-----------
1 file changed, 16 insertions(+), 12 deletions(-)
diff --git a/tests/integration/test_task_job_mgr.py b/tests/integration/test_task_job_mgr.py
index 5bfdf856a8f..b085162a1da 100644
--- a/tests/integration/test_task_job_mgr.py
+++ b/tests/integration/test_task_job_mgr.py
@@ -166,19 +166,23 @@ async def test__prep_submit_task_job_impl_handles_execution_time_limit(
with suppress(FileExistsError):
schd.task_job_mgr._prep_submit_task_job_impl(
schd.workflow, task_a, task_a.tdef.rtconfig)
- assert task_a.summary['execution_time_limit'] == 5.0
+ assert task_a.summary['execution_time_limit'] == 5.0
- # If we delete the etl it gets deleted in the summary:
- task_a.tdef.rtconfig['execution time limit'] = None
+ # If we delete the etl it gets deleted in the summary:
+ task_a.tdef.rtconfig['execution time limit'] = None
+ with suppress(FileExistsError):
schd.task_job_mgr._prep_submit_task_job_impl(
schd.workflow, task_a, task_a.tdef.rtconfig)
- assert not task_a.summary.get('execution_time_limit', '')
+ assert not task_a.summary.get('execution_time_limit', '')
- # put everything back and test broadcast too.
- task_a.tdef.rtconfig['execution time limit'] = 5.0
- task_a.summary['execution_time_limit'] = 5.0
- schd.broadcast_mgr.broadcasts = {
- '1': {'a': {'execution time limit': None}}}
- schd.task_job_mgr._prep_submit_task_job_impl(
- schd.workflow, task_a, task_a.tdef.rtconfig)
- assert not task_a.summary.get('execution_time_limit', '')
+ # put everything back and test broadcast too.
+ task_a.tdef.rtconfig['execution time limit'] = 5.0
+ task_a.summary['execution_time_limit'] = 5.0
+ schd.broadcast_mgr.broadcasts = {
+ '1': {'a': {'execution time limit': None}}}
+ with suppress(FileExistsError):
+ # We run a higher level function here to ensure
+ # that the broadcast is applied.
+ schd.task_job_mgr._prep_submit_task_job(
+ schd.workflow, task_a)
+ assert not task_a.summary.get('execution_time_limit', '')
From 9ec9caa3cfe6c6a4429419a832c572dd0d55a8e4 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Tue, 9 Jan 2024 14:24:00 +0000
Subject: [PATCH 5/6] Update changes.d/5902.fix.md
Co-authored-by: Hilary James Oliver
---
changes.d/5902.fix.md | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
diff --git a/changes.d/5902.fix.md b/changes.d/5902.fix.md
index 677dcd2ab20..803b9dc9590 100644
--- a/changes.d/5902.fix.md
+++ b/changes.d/5902.fix.md
@@ -1,2 +1 @@
-Fixed a case where reloading or broadcasting execution time limit=
-would have no effect.
\ No newline at end of file
+Fixed a bug that prevented unsetting `execution time limit` by broadcast or reload.
\ No newline at end of file
From 9e7428683d71c1fdee7c2281ec30227fdf2c9e78 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Tue, 9 Jan 2024 15:35:37 +0000
Subject: [PATCH 6/6] made function less tolerant
---
cylc/flow/task_job_mgr.py | 15 ++++++---------
1 file changed, 6 insertions(+), 9 deletions(-)
diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py
index e67b1d1c1c0..e41e05dfd30 100644
--- a/cylc/flow/task_job_mgr.py
+++ b/cylc/flow/task_job_mgr.py
@@ -1288,24 +1288,21 @@ def get_execution_time_limit(
) -> Union[None, float]:
"""Get execution time limit from config and process it.
- We are making no assumptions about what can be passed in,
- but passing silently and returning None if the value is
- not a type convertable to a float:
+ If the etl from the config is a Falsy then return None.
+ Otherwise try and parse value as float.
Examples:
+ >>> from pytest import raises
>>> this = TaskJobManager.get_execution_time_limit
+
>>> this(None)
>>> this("54")
54.0
>>> this({})
-
- N.b.
- This function does not save you from a value error:
- >>> from pytest import raises
>>> with raises(ValueError):
- ... this("🇳🇿")
+ ... this('🇳🇿')
"""
- with suppress(TypeError):
+ if config_execution_time_limit:
return float(config_execution_time_limit)
return None