Skip to content

Commit

Permalink
Merge pull request #3981 from hjoliver/get-host-conf-fix
Browse files Browse the repository at this point in the history
Minor post-platforms fix.
  • Loading branch information
hjoliver authored Dec 13, 2020
2 parents 468d50d + 1928a1f commit 22a19a6
Show file tree
Hide file tree
Showing 13 changed files with 184 additions and 108 deletions.
1 change: 0 additions & 1 deletion cylc/flow/data_messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ message PbJob {
optional string script = 21;
optional string shell = 22;
optional string work_sub_dir = 23;
optional string batch_sys_conf = 24;
optional string environment = 25;
optional string directives = 26;
optional string param_var = 28;
Expand Down
115 changes: 50 additions & 65 deletions cylc/flow/data_messages_pb2.py

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion cylc/flow/job_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ def insert_job(self, job_conf):
work_sub_dir=job_conf['work_d'],
name=name,
cycle_point=point_string,
batch_sys_conf=json.dumps(job_conf['batch_system_conf']),
directives=json.dumps(job_conf['directives']),
environment=json.dumps(job_conf['environment']),
param_var=json.dumps(job_conf['param_var'])
Expand Down
1 change: 0 additions & 1 deletion cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,6 @@ class Meta:
pre_script = String()
script = String()
work_sub_dir = String()
batch_sys_conf = GenericScalar(resolver=resolve_json_dump)
environment = GenericScalar(resolver=resolve_json_dump)
directives = GenericScalar(resolver=resolve_json_dump)
param_var = GenericScalar(resolver=resolve_json_dump)
Expand Down
50 changes: 29 additions & 21 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,19 +252,27 @@ def check_job_time(self, itask, now):
else:
return can_poll

def get_host_conf(self, itask, key, default=None, skey="remote"):
"""Return a host setting from suite then global configuration."""
def _get_remote_conf(self, itask, key):
"""Get deprecated "[remote]" items that default to platforms."""
overrides = self.broadcast_mgr.get_broadcast(itask.identity)
if skey in overrides and overrides[skey].get(key) is not None:
ret = overrides[skey][key]
elif itask.tdef.rtconfig[skey].get(key) not in (None, []):
ret = itask.tdef.rtconfig[skey][key]
else:
try:
ret = itask.platform[key]
except (KeyError, ItemNotFoundError):
ret = default
return ret
SKEY = 'remote'
if SKEY not in overrides:
overrides[SKEY] = {}
return (
overrides[SKEY].get(key) or
itask.tdef.rtconfig[SKEY][key] or
itask.platform[key]
)

def _get_suite_platforms_conf(self, itask, key, default):
"""Return top level [runtime] items that default to platforms."""
overrides = self.broadcast_mgr.get_broadcast(itask.identity)
return (
overrides.get(key) or
itask.tdef.rtconfig[key] or
itask.platform[key] or
default
)

def process_events(self, schd_ctx):
"""Process task events that were created by "setup_event_handlers".
Expand Down Expand Up @@ -1001,10 +1009,10 @@ def _setup_job_logs_retrieval(self, itask, event):
host = get_host_from_platform(itask.platform)
if (event not in events or
not is_remote_host(host) or
not self.get_host_conf(itask, "retrieve job logs") or
not self._get_remote_conf(itask, "retrieve job logs") or
id_key in self._event_timers):
return
retry_delays = self.get_host_conf(
retry_delays = self._get_remote_conf(
itask, "retrieve job logs retry delays")
if not retry_delays:
retry_delays = [0]
Expand All @@ -1015,7 +1023,7 @@ def _setup_job_logs_retrieval(self, itask, event):
self.HANDLER_JOB_LOGS_RETRIEVE, # key
self.HANDLER_JOB_LOGS_RETRIEVE, # ctx_type
itask.platform['name'],
self.get_host_conf(itask, "retrieve job logs max size"),
self._get_remote_conf(itask, "retrieve job logs max size"),
),
retry_delays
)
Expand Down Expand Up @@ -1163,9 +1171,9 @@ def _reset_job_timers(self, itask):
timeref = itask.summary['started_time']
timeout_key = 'execution timeout'
timeout = self._get_events_conf(itask, timeout_key)
delays = list(self.get_host_conf(
itask, 'execution polling intervals', skey='job',
default=[900])) # Default 15 minute intervals
delays = list(self._get_suite_platforms_conf(
itask, 'execution polling intervals',
default=[900])) # default 15 minute intervals
if itask.summary[self.KEY_EXECUTE_TIME_LIMIT]:
time_limit = itask.summary[self.KEY_EXECUTE_TIME_LIMIT]
time_limit_delays = itask.platform.get(
Expand All @@ -1186,9 +1194,9 @@ def _reset_job_timers(self, itask):
timeref = itask.summary['submitted_time']
timeout_key = 'submission timeout'
timeout = self._get_events_conf(itask, timeout_key)
delays = list(self.get_host_conf(
itask, 'submission polling intervals', skey='job',
default=[900])) # Default 15 minute intervals
delays = list(self._get_suite_platforms_conf(
itask, 'submission polling intervals',
default=[900]))
try:
itask.timeout = timeref + float(timeout)
timeout_str = intvl_as_str(timeout)
Expand Down
6 changes: 0 additions & 6 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1002,11 +1002,6 @@ def _prep_submit_task_job_impl(self, suite, itask, rtconfig):
itask.submit_num] = itask.platform['name']

itask.summary['batch_sys_name'] = itask.platform['batch system']
try:
batch_sys_conf = self.task_events_mgr.get_host_conf(
itask, 'batch systems')[itask.summary['batch_sys_name']]
except (TypeError, KeyError):
batch_sys_conf = {}
try:
itask.summary[self.KEY_EXECUTE_TIME_LIMIT] = float(
rtconfig['execution time limit'])
Expand All @@ -1026,7 +1021,6 @@ def _prep_submit_task_job_impl(self, suite, itask, rtconfig):
'batch_submit_command_template': (
itask.platform['batch submit command template']
),
'batch_system_conf': batch_sys_conf,
'dependencies': itask.state.get_resolved_dependencies(),
'directives': rtconfig['directives'],
'environment': rtconfig['environment'],
Expand Down
1 change: 0 additions & 1 deletion tests/integration/test_job_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ def job_config(schd):
'pre-script': None,
'script': 'sleep 5; echo "I come in peace"',
'work_d': None,
'batch_system_conf': {},
'directives': {},
'environment': {},
'param_var': {},
Expand Down
2 changes: 0 additions & 2 deletions tests/unit/batch_sys_handlers/test_loadleveler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
[
( # basic
{
'batch_system_conf': {},
'directives': {},
'execution_time_limit': 180,
'job_file_path': '$HOME/cylc-run/chop/log/job/1/axe/01/job',
Expand All @@ -43,7 +42,6 @@
( # some useful directives
{
'batch_system_conf': {},
'directives': {
'-q': 'forever',
'-V': '',
Expand Down
2 changes: 0 additions & 2 deletions tests/unit/batch_sys_handlers/test_lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
[
( # basic
{
'batch_system_conf': {},
'directives': {},
'execution_time_limit': 180,
'job_file_path': '$HOME/cylc-run/chop/log/job/1/axe/01/job',
Expand All @@ -40,7 +39,6 @@
),
( # some useful directives
{
'batch_system_conf': {},
'directives': {
'-q': 'forever',
'-B': '',
Expand Down
2 changes: 0 additions & 2 deletions tests/unit/batch_sys_handlers/test_moab.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
[
( # basic
{
'batch_system_conf': {},
'directives': {},
'execution_time_limit': 180,
'job_file_path': '$HOME/cylc-run/chop/log/job/1/axe/01/job',
Expand All @@ -40,7 +39,6 @@
),
( # some useful directives
{
'batch_system_conf': {},
'directives': {
'-q': 'forever',
'-V': '',
Expand Down
3 changes: 0 additions & 3 deletions tests/unit/batch_sys_handlers/test_pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
[
( # basic
{
'batch_system_conf': {},
'directives': {},
'execution_time_limit': 180,
'job_file_path': '$HOME/cylc-run/chop/log/job/1/axe/01/job',
Expand All @@ -44,7 +43,6 @@
),
( # super short job name length maximum
{
'batch_system_conf': {'job name length maximum': 6},
'directives': {},
'execution_time_limit': 180,
'job_file_path': '$HOME/cylc-run/chop/log/job/1/axe/01/job',
Expand All @@ -64,7 +62,6 @@
),
( # some useful directives
{
'batch_system_conf': {},
'directives': {
'-q': 'forever',
'-V': '',
Expand Down
3 changes: 0 additions & 3 deletions tests/unit/batch_sys_handlers/test_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
[
( # basic
{
'batch_system_conf': {},
'directives': {},
'execution_time_limit': 180,
'job_file_path': '$HOME/cylc-run/chop/log/job/1/axe/01/job',
Expand All @@ -49,7 +48,6 @@
),
( # task name with % character
{
'batch_system_conf': {},
'directives': {},
'execution_time_limit': 180,
'job_file_path': (
Expand All @@ -73,7 +71,6 @@
),
( # some useful directives
{
'batch_system_conf': {},
'directives': {
'-p': 'middle',
'--no-requeue': '',
Expand Down
105 changes: 105 additions & 0 deletions tests/unit/test_task_events_mgr_2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from unittest.mock import Mock
import pytest


from cylc.flow.task_events_mgr import TaskEventsManager


@pytest.mark.parametrize(
"broadcast, remote, platforms, expected",
[
("hpc1", "a", "b", "hpc1"),
(None, "hpc1", "b", "hpc1"),
(None, None, "hpc1", "hpc1"),
(None, None, None, None),
]
)
def test_get_remote_conf(broadcast, remote, platforms, expected):
"""Test TaskEventsManager._get_remote_conf()."""

task_events_mgr = TaskEventsManager(
None, None, None, None, None, None, None)

task_events_mgr.broadcast_mgr = Mock(
get_broadcast=lambda x: {
"remote": {
"host": broadcast
}
}
)

itask = Mock(
identity='foo.1',
tdef=Mock(
rtconfig={
'remote': {
'host': remote
}
}
),
platform={
'host': platforms
}
)

assert task_events_mgr._get_remote_conf(itask, 'host') == expected


DEFAULT = [900]


@pytest.mark.parametrize(
"broadcast, suite, platforms, expected",
[
([800], [700], [600], [800]),
(None, [700], [600], [700]),
(None, None, [600], [600]),
(None, None, None, DEFAULT),
]
)
def test_get_suite_platforms_conf(broadcast, suite, platforms, expected):
"""Test TaskEventsManager._get_polling_interval_conf()."""

task_events_mgr = TaskEventsManager(
None, None, None, None, None, None, None)

KEY = "execution polling intervals"

task_events_mgr.broadcast_mgr = Mock(
get_broadcast=lambda x: {
KEY: broadcast
}
)

itask = Mock(
identity='foo.1',
tdef=Mock(
rtconfig={
KEY: suite
}
),
platform={
KEY: platforms
}
)

assert (
task_events_mgr._get_suite_platforms_conf(itask, KEY, DEFAULT) ==
expected
)

0 comments on commit 22a19a6

Please sign in to comment.