diff --git a/changes.d/6335.fix.md b/changes.d/6335.fix.md new file mode 100644 index 00000000000..2057bae0f7c --- /dev/null +++ b/changes.d/6335.fix.md @@ -0,0 +1 @@ +Fix an issue that could cause broadcasts made to multiple namespaces to fail. diff --git a/cylc/flow/broadcast_mgr.py b/cylc/flow/broadcast_mgr.py index 6cd007ee25a..b9329114302 100644 --- a/cylc/flow/broadcast_mgr.py +++ b/cylc/flow/broadcast_mgr.py @@ -280,8 +280,16 @@ def put_broadcast( bad_namespaces = [] with self.lock: - for setting in settings: - for point_string in point_strings: + for setting in settings or []: + # Coerce setting to cylc runtime object, + # i.e. str to DurationFloat. + coerced_setting = deepcopy(setting) + BroadcastConfigValidator().validate( + coerced_setting, + SPEC['runtime']['__MANY__'], + ) + + for point_string in point_strings or []: # Standardise the point and check its validity. bad_point = False try: @@ -292,26 +300,23 @@ def put_broadcast( bad_point = True if not bad_point and point_string not in self.broadcasts: self.broadcasts[point_string] = {} - for namespace in namespaces: + for namespace in namespaces or []: if namespace not in self.linearized_ancestors: bad_namespaces.append(namespace) elif not bad_point: if namespace not in self.broadcasts[point_string]: self.broadcasts[point_string][namespace] = {} + # Keep saved/reported setting as workflow - # config format. + # config format: modified_settings.append( - (point_string, namespace, deepcopy(setting)) - ) - # Coerce setting to cylc runtime object, - # i.e. str to DurationFloat. - BroadcastConfigValidator().validate( - setting, - SPEC['runtime']['__MANY__'] + (point_string, namespace, setting) ) + + # Apply the broadcast with the "coerced" format: addict( self.broadcasts[point_string][namespace], - setting + coerced_setting, ) # Log the broadcast diff --git a/cylc/flow/scripts/broadcast.py b/cylc/flow/scripts/broadcast.py index c7e5d2a4f3b..2cf068309b7 100755 --- a/cylc/flow/scripts/broadcast.py +++ b/cylc/flow/scripts/broadcast.py @@ -325,6 +325,10 @@ async def run(options: 'Values', workflow_id): """Implement cylc broadcast.""" pclient = get_client(workflow_id, timeout=options.comms_timeout) + # remove any duplicate namespaces + # see https://github.com/cylc/cylc-flow/issues/6334 + namespaces = list(set(options.namespaces)) + ret: Dict[str, Any] = { 'stdout': [], 'stderr': [], @@ -337,7 +341,7 @@ async def run(options: 'Values', workflow_id): 'wFlows': [workflow_id], 'bMode': 'Set', 'cPoints': options.point_strings, - 'nSpaces': options.namespaces, + 'nSpaces': namespaces, 'bSettings': options.settings, 'bCutoff': options.expire, } @@ -382,7 +386,6 @@ async def run(options: 'Values', workflow_id): mutation_kwargs['variables']['bMode'] = 'Expire' # implement namespace and cycle point defaults here - namespaces = options.namespaces if not namespaces: namespaces = ["root"] point_strings = options.point_strings diff --git a/tests/integration/scripts/test_broadcast.py b/tests/integration/scripts/test_broadcast.py index 67a79448041..163d48e552e 100644 --- a/tests/integration/scripts/test_broadcast.py +++ b/tests/integration/scripts/test_broadcast.py @@ -15,13 +15,14 @@ # along with this program. If not, see . from cylc.flow.option_parsers import Options +from cylc.flow.rundb import CylcWorkflowDAO from cylc.flow.scripts.broadcast import _main, get_option_parser BroadcastOptions = Options(get_option_parser()) -async def test_broadcast_multi( +async def test_broadcast_multi_workflow( one_conf, flow, scheduler, @@ -77,3 +78,59 @@ async def test_broadcast_multi( ' settings are not compatible with the workflow' ) in out assert err == '' + + +async def test_broadcast_multi_namespace( + flow, + scheduler, + start, + db_select, +): + """Test a multi-namespace broadcast command. + + See https://github.com/cylc/cylc-flow/issues/6334 + """ + id_ = flow( + { + 'scheduling': { + 'graph': {'R1': 'a & b & c & fin'}, + }, + 'runtime': { + 'root': {'execution time limit': 'PT1S'}, + 'VOWELS': {'execution time limit': 'PT2S'}, + 'CONSONANTS': {'execution time limit': 'PT3S'}, + 'a': {'inherit': 'VOWELS'}, + 'b': {'inherit': 'CONSONANTS'}, + 'c': {'inherit': 'CONSONANTS'}, + }, + } + ) + schd = scheduler(id_) + + async with start(schd): + # issue a broadcast to multiple namespaces + rets = await _main( + BroadcastOptions( + settings=['execution time limit = PT5S'], + namespaces=['root', 'VOWELS', 'CONSONANTS'], + ), + schd.workflow, + ) + + # the broadcast should succeed + assert list(rets.values()) == [True] + + # the broadcast manager should store the "coerced" setting + for task in ['a', 'b', 'c', 'fin']: + assert schd.broadcast_mgr.get_broadcast( + schd.tokens.duplicate(cycle='1', task=task) + ) == {'execution time limit': 5.0} + + # the database should store the "raw" setting + assert sorted( + db_select(schd, True, CylcWorkflowDAO.TABLE_BROADCAST_STATES) + ) == [ + ('*', 'CONSONANTS', 'execution time limit', 'PT5S'), + ('*', 'VOWELS', 'execution time limit', 'PT5S'), + ('*', 'root', 'execution time limit', 'PT5S'), + ]