Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor post-platforms fix. #3981

Merged
merged 2 commits into from
Dec 13, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cylc/flow/data_messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ message PbJob {
optional string script = 21;
optional string shell = 22;
optional string work_sub_dir = 23;
optional string batch_sys_conf = 24;
optional string environment = 25;
optional string directives = 26;
optional string param_var = 28;
Expand Down
115 changes: 50 additions & 65 deletions cylc/flow/data_messages_pb2.py

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion cylc/flow/job_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ def insert_job(self, job_conf):
work_sub_dir=job_conf['work_d'],
name=name,
cycle_point=point_string,
batch_sys_conf=json.dumps(job_conf['batch_system_conf']),
directives=json.dumps(job_conf['directives']),
environment=json.dumps(job_conf['environment']),
param_var=json.dumps(job_conf['param_var'])
Expand Down
1 change: 0 additions & 1 deletion cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,6 @@ class Meta:
pre_script = String()
script = String()
work_sub_dir = String()
batch_sys_conf = GenericScalar(resolver=resolve_json_dump)
environment = GenericScalar(resolver=resolve_json_dump)
directives = GenericScalar(resolver=resolve_json_dump)
param_var = GenericScalar(resolver=resolve_json_dump)
Expand Down
50 changes: 29 additions & 21 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,19 +252,27 @@ def check_job_time(self, itask, now):
else:
return can_poll

def get_host_conf(self, itask, key, default=None, skey="remote"):
"""Return a host setting from suite then global configuration."""
def _get_remote_conf(self, itask, key):
"""Get deprecated "[remote]" items that default to platforms."""
overrides = self.broadcast_mgr.get_broadcast(itask.identity)
if skey in overrides and overrides[skey].get(key) is not None:
ret = overrides[skey][key]
elif itask.tdef.rtconfig[skey].get(key) not in (None, []):
ret = itask.tdef.rtconfig[skey][key]
else:
try:
ret = itask.platform[key]
except (KeyError, ItemNotFoundError):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no way to get ItemNotFoundError here. And KeyError can only occur if an illegal config item is requested.

ret = default
return ret
SKEY = 'remote'
if SKEY not in overrides:
overrides[SKEY] = {}
return (
overrides[SKEY].get(key) or
itask.tdef.rtconfig[SKEY][key] or
itask.platform[key]
)

def _get_suite_platforms_conf(self, itask, key, default):
"""Return top level [runtime] items that default to platforms."""
overrides = self.broadcast_mgr.get_broadcast(itask.identity)
return (
overrides.get(key) or
itask.tdef.rtconfig[key] or
itask.platform[key] or
default
)

def process_events(self, schd_ctx):
"""Process task events that were created by "setup_event_handlers".
Expand Down Expand Up @@ -1001,10 +1009,10 @@ def _setup_job_logs_retrieval(self, itask, event):
host = get_host_from_platform(itask.platform)
if (event not in events or
not is_remote_host(host) or
not self.get_host_conf(itask, "retrieve job logs") or
not self._get_remote_conf(itask, "retrieve job logs") or
id_key in self._event_timers):
return
retry_delays = self.get_host_conf(
retry_delays = self._get_remote_conf(
itask, "retrieve job logs retry delays")
if not retry_delays:
retry_delays = [0]
Expand All @@ -1015,7 +1023,7 @@ def _setup_job_logs_retrieval(self, itask, event):
self.HANDLER_JOB_LOGS_RETRIEVE, # key
self.HANDLER_JOB_LOGS_RETRIEVE, # ctx_type
itask.platform['name'],
self.get_host_conf(itask, "retrieve job logs max size"),
self._get_remote_conf(itask, "retrieve job logs max size"),
),
retry_delays
)
Expand Down Expand Up @@ -1163,9 +1171,9 @@ def _reset_job_timers(self, itask):
timeref = itask.summary['started_time']
timeout_key = 'execution timeout'
timeout = self._get_events_conf(itask, timeout_key)
delays = list(self.get_host_conf(
itask, 'execution polling intervals', skey='job',
default=[900])) # Default 15 minute intervals
delays = list(self._get_suite_platforms_conf(
itask, 'execution polling intervals',
default=[900])) # default 15 minute intervals
if itask.summary[self.KEY_EXECUTE_TIME_LIMIT]:
time_limit = itask.summary[self.KEY_EXECUTE_TIME_LIMIT]
time_limit_delays = itask.platform.get(
Expand All @@ -1186,9 +1194,9 @@ def _reset_job_timers(self, itask):
timeref = itask.summary['submitted_time']
timeout_key = 'submission timeout'
timeout = self._get_events_conf(itask, timeout_key)
delays = list(self.get_host_conf(
itask, 'submission polling intervals', skey='job',
default=[900])) # Default 15 minute intervals
delays = list(self._get_suite_platforms_conf(
itask, 'submission polling intervals',
default=[900]))
try:
itask.timeout = timeref + float(timeout)
timeout_str = intvl_as_str(timeout)
Expand Down
6 changes: 0 additions & 6 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1002,11 +1002,6 @@ def _prep_submit_task_job_impl(self, suite, itask, rtconfig):
itask.submit_num] = itask.platform['name']

itask.summary['batch_sys_name'] = itask.platform['batch system']
try:
batch_sys_conf = self.task_events_mgr.get_host_conf(
itask, 'batch systems')[itask.summary['batch_sys_name']]
except (TypeError, KeyError):
batch_sys_conf = {}
try:
itask.summary[self.KEY_EXECUTE_TIME_LIMIT] = float(
rtconfig['execution time limit'])
Expand All @@ -1026,7 +1021,6 @@ def _prep_submit_task_job_impl(self, suite, itask, rtconfig):
'batch_submit_command_template': (
itask.platform['batch submit command template']
),
'batch_system_conf': batch_sys_conf,
'dependencies': itask.state.get_resolved_dependencies(),
'directives': rtconfig['directives'],
'environment': rtconfig['environment'],
Expand Down
1 change: 0 additions & 1 deletion tests/integration/test_job_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ def job_config(schd):
'pre-script': None,
'script': 'sleep 5; echo "I come in peace"',
'work_d': None,
'batch_system_conf': {},
'directives': {},
'environment': {},
'param_var': {},
Expand Down
2 changes: 0 additions & 2 deletions tests/unit/batch_sys_handlers/test_loadleveler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
[
( # basic
{
'batch_system_conf': {},
'directives': {},
'execution_time_limit': 180,
'job_file_path': '$HOME/cylc-run/chop/log/job/1/axe/01/job',
Expand All @@ -43,7 +42,6 @@

( # some useful directives
{
'batch_system_conf': {},
'directives': {
'-q': 'forever',
'-V': '',
Expand Down
2 changes: 0 additions & 2 deletions tests/unit/batch_sys_handlers/test_lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
[
( # basic
{
'batch_system_conf': {},
'directives': {},
'execution_time_limit': 180,
'job_file_path': '$HOME/cylc-run/chop/log/job/1/axe/01/job',
Expand All @@ -40,7 +39,6 @@
),
( # some useful directives
{
'batch_system_conf': {},
'directives': {
'-q': 'forever',
'-B': '',
Expand Down
2 changes: 0 additions & 2 deletions tests/unit/batch_sys_handlers/test_moab.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
[
( # basic
{
'batch_system_conf': {},
'directives': {},
'execution_time_limit': 180,
'job_file_path': '$HOME/cylc-run/chop/log/job/1/axe/01/job',
Expand All @@ -40,7 +39,6 @@
),
( # some useful directives
{
'batch_system_conf': {},
'directives': {
'-q': 'forever',
'-V': '',
Expand Down
3 changes: 0 additions & 3 deletions tests/unit/batch_sys_handlers/test_pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
[
( # basic
{
'batch_system_conf': {},
'directives': {},
'execution_time_limit': 180,
'job_file_path': '$HOME/cylc-run/chop/log/job/1/axe/01/job',
Expand All @@ -44,7 +43,6 @@
),
( # super short job name length maximum
{
'batch_system_conf': {'job name length maximum': 6},
'directives': {},
'execution_time_limit': 180,
'job_file_path': '$HOME/cylc-run/chop/log/job/1/axe/01/job',
Expand All @@ -64,7 +62,6 @@
),
( # some useful directives
{
'batch_system_conf': {},
'directives': {
'-q': 'forever',
'-V': '',
Expand Down
3 changes: 0 additions & 3 deletions tests/unit/batch_sys_handlers/test_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
[
( # basic
{
'batch_system_conf': {},
'directives': {},
'execution_time_limit': 180,
'job_file_path': '$HOME/cylc-run/chop/log/job/1/axe/01/job',
Expand All @@ -49,7 +48,6 @@
),
( # task name with % character
{
'batch_system_conf': {},
'directives': {},
'execution_time_limit': 180,
'job_file_path': (
Expand All @@ -73,7 +71,6 @@
),
( # some useful directives
{
'batch_system_conf': {},
'directives': {
'-p': 'middle',
'--no-requeue': '',
Expand Down
105 changes: 105 additions & 0 deletions tests/unit/test_task_events_mgr_2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from unittest.mock import Mock
import pytest


from cylc.flow.task_events_mgr import TaskEventsManager


@pytest.mark.parametrize(
"broadcast, remote, platforms, expected",
hjoliver marked this conversation as resolved.
Show resolved Hide resolved
[
("hpc1", "a", "b", "hpc1"),
(None, "hpc1", "b", "hpc1"),
(None, None, "hpc1", "hpc1"),
(None, None, None, None),
]
)
def test_get_remote_conf(remote, platforms, broadcast, expected):
"""Test TaskEventsManager._get_remote_conf()."""

task_events_mgr = TaskEventsManager(
None, None, None, None, None, None, None)

task_events_mgr.broadcast_mgr = Mock(
get_broadcast=lambda x: {
"remote": {
"host": broadcast
}
}
)

itask = Mock(
identity='foo.1',
tdef=Mock(
rtconfig={
'remote': {
'host': remote
}
}
),
platform={
'host': platforms
}
)

assert task_events_mgr._get_remote_conf(itask, 'host') == expected


DEFAULT = [900]


@pytest.mark.parametrize(
"broadcast, suite, platforms, expected",
[
([800], [700], [600], [800]),
(None, [700], [600], [700]),
(None, None, [600], [600]),
(None, None, None, DEFAULT),
]
)
def test_get_suite_platforms_conf(broadcast, suite, platforms, expected):
"""Test TaskEventsManager._get_polling_interval_conf()."""

task_events_mgr = TaskEventsManager(
None, None, None, None, None, None, None)

KEY = "execution polling intervals"

task_events_mgr.broadcast_mgr = Mock(
get_broadcast=lambda x: {
KEY: broadcast
}
)

itask = Mock(
identity='foo.1',
tdef=Mock(
rtconfig={
KEY: suite
}
),
platform={
KEY: platforms
}
)

assert (
task_events_mgr._get_suite_platforms_conf(itask, KEY, DEFAULT) ==
expected
)