Skip to content

Commit

Permalink
Merge pull request #6335 from oliver-sanders/6334
Browse files Browse the repository at this point in the history
broadcast: fix issue with multiple namespaces
  • Loading branch information
oliver-sanders authored Aug 30, 2024
2 parents d1d1cc4 + 5df5253 commit 574361b
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 15 deletions.
1 change: 1 addition & 0 deletions changes.d/6335.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix an issue that could cause broadcasts made to multiple namespaces to fail.
29 changes: 17 additions & 12 deletions cylc/flow/broadcast_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,16 @@ def put_broadcast(
bad_namespaces = []

with self.lock:
for setting in settings:
for point_string in point_strings:
for setting in settings or []:
# Coerce setting to cylc runtime object,
# i.e. str to DurationFloat.
coerced_setting = deepcopy(setting)
BroadcastConfigValidator().validate(
coerced_setting,
SPEC['runtime']['__MANY__'],
)

for point_string in point_strings or []:
# Standardise the point and check its validity.
bad_point = False
try:
Expand All @@ -292,26 +300,23 @@ def put_broadcast(
bad_point = True
if not bad_point and point_string not in self.broadcasts:
self.broadcasts[point_string] = {}
for namespace in namespaces:
for namespace in namespaces or []:
if namespace not in self.linearized_ancestors:
bad_namespaces.append(namespace)
elif not bad_point:
if namespace not in self.broadcasts[point_string]:
self.broadcasts[point_string][namespace] = {}

# Keep saved/reported setting as workflow
# config format.
# config format:
modified_settings.append(
(point_string, namespace, deepcopy(setting))
)
# Coerce setting to cylc runtime object,
# i.e. str to DurationFloat.
BroadcastConfigValidator().validate(
setting,
SPEC['runtime']['__MANY__']
(point_string, namespace, setting)
)

# Apply the broadcast with the "coerced" format:
addict(
self.broadcasts[point_string][namespace],
setting
coerced_setting,
)

# Log the broadcast
Expand Down
7 changes: 5 additions & 2 deletions cylc/flow/scripts/broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,10 @@ async def run(options: 'Values', workflow_id):
"""Implement cylc broadcast."""
pclient = get_client(workflow_id, timeout=options.comms_timeout)

# remove any duplicate namespaces
# see https://github.com/cylc/cylc-flow/issues/6334
namespaces = list(set(options.namespaces))

ret: Dict[str, Any] = {
'stdout': [],
'stderr': [],
Expand All @@ -337,7 +341,7 @@ async def run(options: 'Values', workflow_id):
'wFlows': [workflow_id],
'bMode': 'Set',
'cPoints': options.point_strings,
'nSpaces': options.namespaces,
'nSpaces': namespaces,
'bSettings': options.settings,
'bCutoff': options.expire,
}
Expand Down Expand Up @@ -382,7 +386,6 @@ async def run(options: 'Values', workflow_id):
mutation_kwargs['variables']['bMode'] = 'Expire'

# implement namespace and cycle point defaults here
namespaces = options.namespaces
if not namespaces:
namespaces = ["root"]
point_strings = options.point_strings
Expand Down
59 changes: 58 additions & 1 deletion tests/integration/scripts/test_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from cylc.flow.option_parsers import Options
from cylc.flow.rundb import CylcWorkflowDAO
from cylc.flow.scripts.broadcast import _main, get_option_parser


BroadcastOptions = Options(get_option_parser())


async def test_broadcast_multi(
async def test_broadcast_multi_workflow(
one_conf,
flow,
scheduler,
Expand Down Expand Up @@ -77,3 +78,59 @@ async def test_broadcast_multi(
' settings are not compatible with the workflow'
) in out
assert err == ''


async def test_broadcast_multi_namespace(
flow,
scheduler,
start,
db_select,
):
"""Test a multi-namespace broadcast command.
See https://github.com/cylc/cylc-flow/issues/6334
"""
id_ = flow(
{
'scheduling': {
'graph': {'R1': 'a & b & c & fin'},
},
'runtime': {
'root': {'execution time limit': 'PT1S'},
'VOWELS': {'execution time limit': 'PT2S'},
'CONSONANTS': {'execution time limit': 'PT3S'},
'a': {'inherit': 'VOWELS'},
'b': {'inherit': 'CONSONANTS'},
'c': {'inherit': 'CONSONANTS'},
},
}
)
schd = scheduler(id_)

async with start(schd):
# issue a broadcast to multiple namespaces
rets = await _main(
BroadcastOptions(
settings=['execution time limit = PT5S'],
namespaces=['root', 'VOWELS', 'CONSONANTS'],
),
schd.workflow,
)

# the broadcast should succeed
assert list(rets.values()) == [True]

# the broadcast manager should store the "coerced" setting
for task in ['a', 'b', 'c', 'fin']:
assert schd.broadcast_mgr.get_broadcast(
schd.tokens.duplicate(cycle='1', task=task)
) == {'execution time limit': 5.0}

# the database should store the "raw" setting
assert sorted(
db_select(schd, True, CylcWorkflowDAO.TABLE_BROADCAST_STATES)
) == [
('*', 'CONSONANTS', 'execution time limit', 'PT5S'),
('*', 'VOWELS', 'execution time limit', 'PT5S'),
('*', 'root', 'execution time limit', 'PT5S'),
]

0 comments on commit 574361b

Please sign in to comment.