Skip to content

Commit

Permalink
Merge pull request cylc#6341 from cylc/8.3.x-sync
Browse files Browse the repository at this point in the history
🤖 Merge 8.3.x-sync into master
  • Loading branch information
MetRonnie authored Aug 30, 2024
2 parents 33788b3 + 574361b commit 9db2ca3
Show file tree
Hide file tree
Showing 12 changed files with 166 additions and 29 deletions.
1 change: 1 addition & 0 deletions changes.d/6335.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix an issue that could cause broadcasts made to multiple namespaces to fail.
1 change: 1 addition & 0 deletions changes.d/6337.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix potential duplicate job submissions when manually triggering unqueued active tasks.
29 changes: 17 additions & 12 deletions cylc/flow/broadcast_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,16 @@ def put_broadcast(
bad_namespaces = []

with self.lock:
for setting in settings:
for point_string in point_strings:
for setting in settings or []:
# Coerce setting to cylc runtime object,
# i.e. str to DurationFloat.
coerced_setting = deepcopy(setting)
BroadcastConfigValidator().validate(
coerced_setting,
SPEC['runtime']['__MANY__'],
)

for point_string in point_strings or []:
# Standardise the point and check its validity.
bad_point = False
try:
Expand All @@ -292,26 +300,23 @@ def put_broadcast(
bad_point = True
if not bad_point and point_string not in self.broadcasts:
self.broadcasts[point_string] = {}
for namespace in namespaces:
for namespace in namespaces or []:
if namespace not in self.linearized_ancestors:
bad_namespaces.append(namespace)
elif not bad_point:
if namespace not in self.broadcasts[point_string]:
self.broadcasts[point_string][namespace] = {}

# Keep saved/reported setting as workflow
# config format.
# config format:
modified_settings.append(
(point_string, namespace, deepcopy(setting))
)
# Coerce setting to cylc runtime object,
# i.e. str to DurationFloat.
BroadcastConfigValidator().validate(
setting,
SPEC['runtime']['__MANY__']
(point_string, namespace, setting)
)

# Apply the broadcast with the "coerced" format:
addict(
self.broadcasts[point_string][namespace],
setting
coerced_setting,
)

# Log the broadcast
Expand Down
3 changes: 1 addition & 2 deletions cylc/flow/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ def _get_logger(rund, log_name, open_file=True):
"""
logger = logging.getLogger(log_name)
if logger.getEffectiveLevel != logging.INFO:
logger.setLevel(logging.INFO)
logger.setLevel(logging.INFO)
if open_file and not logger.hasHandlers():
_open_install_log(rund, logger)
return logger
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/loggingutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ def patch_log_level(logger: logging.Logger, level: int = logging.INFO):
Defaults to INFO.
"""
orig_level = logger.getEffectiveLevel()
orig_level = logger.level
if level < orig_level:
logger.setLevel(level)
yield
Expand Down
7 changes: 5 additions & 2 deletions cylc/flow/scripts/broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,10 @@ async def run(options: 'Values', workflow_id):
"""Implement cylc broadcast."""
pclient = get_client(workflow_id, timeout=options.comms_timeout)

# remove any duplicate namespaces
# see https://github.com/cylc/cylc-flow/issues/6334
namespaces = list(set(options.namespaces))

ret: Dict[str, Any] = {
'stdout': [],
'stderr': [],
Expand All @@ -337,7 +341,7 @@ async def run(options: 'Values', workflow_id):
'wFlows': [workflow_id],
'bMode': 'Set',
'cPoints': options.point_strings,
'nSpaces': options.namespaces,
'nSpaces': namespaces,
'bSettings': options.settings,
'bCutoff': options.expire,
}
Expand Down Expand Up @@ -382,7 +386,6 @@ async def run(options: 'Values', workflow_id):
mutation_kwargs['variables']['bMode'] = 'Expire'

# implement namespace and cycle point defaults here
namespaces = options.namespaces
if not namespaces:
namespaces = ["root"]
point_strings = options.point_strings
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/task_queues/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ def release_tasks(self, active: Counter[str]) -> 'List[TaskProxy]':
pass

@abstractmethod
def remove_task(self, itask: 'TaskProxy') -> None:
"""Remove a task from the queueing system."""
def remove_task(self, itask: 'TaskProxy') -> bool:
"""Try to remove a task from the queues. Return True if done."""
pass

@abstractmethod
Expand Down
12 changes: 5 additions & 7 deletions cylc/flow/task_queues/independent.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,17 @@ def release_tasks(self, active: Counter[str]) -> List['TaskProxy']:
self.force_released = set()
return released

def remove_task(self, itask: 'TaskProxy') -> None:
"""Remove a task from whichever queue it belongs to."""
for queue in self.queues.values():
if queue.remove(itask):
break
def remove_task(self, itask: 'TaskProxy') -> bool:
"""Try to remove a task from the queues. Return True if done."""
return any(queue.remove(itask) for queue in self.queues.values())

def force_release_task(self, itask: 'TaskProxy') -> None:
"""Remove a task from whichever queue it belongs to.
To be returned when release_tasks() is next called.
"""
self.remove_task(itask)
self.force_released.add(itask)
if self.remove_task(itask):
self.force_released.add(itask)

def adopt_tasks(self, orphans: List[str]) -> None:
"""Adopt orphaned tasks to the default group."""
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/tui/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ def suppress_logging():
silly for the duration of this context manager then set it back again
afterwards.
"""
level = LOG.getEffectiveLevel()
orig_level = LOG.level
LOG.setLevel(99999)
yield
LOG.setLevel(level)
LOG.setLevel(orig_level)


def get_task_icon(
Expand Down
59 changes: 58 additions & 1 deletion tests/integration/scripts/test_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from cylc.flow.option_parsers import Options
from cylc.flow.rundb import CylcWorkflowDAO
from cylc.flow.scripts.broadcast import _main, get_option_parser


BroadcastOptions = Options(get_option_parser())


async def test_broadcast_multi(
async def test_broadcast_multi_workflow(
one_conf,
flow,
scheduler,
Expand Down Expand Up @@ -77,3 +78,59 @@ async def test_broadcast_multi(
' settings are not compatible with the workflow'
) in out
assert err == ''


async def test_broadcast_multi_namespace(
flow,
scheduler,
start,
db_select,
):
"""Test a multi-namespace broadcast command.
See https://github.com/cylc/cylc-flow/issues/6334
"""
id_ = flow(
{
'scheduling': {
'graph': {'R1': 'a & b & c & fin'},
},
'runtime': {
'root': {'execution time limit': 'PT1S'},
'VOWELS': {'execution time limit': 'PT2S'},
'CONSONANTS': {'execution time limit': 'PT3S'},
'a': {'inherit': 'VOWELS'},
'b': {'inherit': 'CONSONANTS'},
'c': {'inherit': 'CONSONANTS'},
},
}
)
schd = scheduler(id_)

async with start(schd):
# issue a broadcast to multiple namespaces
rets = await _main(
BroadcastOptions(
settings=['execution time limit = PT5S'],
namespaces=['root', 'VOWELS', 'CONSONANTS'],
),
schd.workflow,
)

# the broadcast should succeed
assert list(rets.values()) == [True]

# the broadcast manager should store the "coerced" setting
for task in ['a', 'b', 'c', 'fin']:
assert schd.broadcast_mgr.get_broadcast(
schd.tokens.duplicate(cycle='1', task=task)
) == {'execution time limit': 5.0}

# the database should store the "raw" setting
assert sorted(
db_select(schd, True, CylcWorkflowDAO.TABLE_BROADCAST_STATES)
) == [
('*', 'CONSONANTS', 'execution time limit', 'PT5S'),
('*', 'VOWELS', 'execution time limit', 'PT5S'),
('*', 'root', 'execution time limit', 'PT5S'),
]
41 changes: 41 additions & 0 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2111,3 +2111,44 @@ async def test_trigger_queue(one, run, db_select, complete):
one.resume_workflow()
await complete(one, timeout=2)
assert db_select(one, False, 'task_outputs', 'flow_nums') == [('[1, 2]',), ('[1]',)]


async def test_trigger_unqueued(flow, scheduler, start):
"""Test triggering an unqueued active task.
It should not add to the force_released list.
See https://github.com/cylc/cylc-flow/pull/6337
"""
conf = {
'scheduler': {'allow implicit tasks': 'True'},
'scheduling': {
'graph': {
'R1': 'a & b => c'
}
}
}
schd = scheduler(
flow(conf),
run_mode='simulation',
paused_start=False
)

async with start(schd):
# Release tasks 1/a and 1/b
schd.pool.release_runahead_tasks()
schd.release_queued_tasks()
assert pool_get_task_ids(schd.pool) == ['1/a', '1/b']

# Mark 1/a as succeeded and spawn 1/c
task_a = schd.pool.get_task(IntegerPoint("1"), "a")
schd.pool.task_events_mgr.process_message(task_a, 1, 'succeeded')
assert pool_get_task_ids(schd.pool) == ['1/b', '1/c']

# Trigger the partially satisified (and not queued) task 1/c
schd.pool.force_trigger_tasks(['1/c'], [FLOW_ALL])

# It should not add to the queue managers force_released list.
assert not schd.pool.task_queue_mgr.force_released, (
"Triggering an unqueued task should not affect the force_released list"
)
32 changes: 32 additions & 0 deletions tests/unit/test_loggingutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
RotatingLogFileHandler,
get_reload_start_number,
get_sorted_logs_by_time,
patch_log_level,
set_timestamps,
)

Expand Down Expand Up @@ -245,3 +246,34 @@ def test_log_emit_and_glbl_cfg(
# Check log emit does not access global config object:
LOG.debug("Entering zero gravity")
assert mock_cfg.get.call_args_list == []


def test_patch_log_level(caplog: pytest.LogCaptureFixture):
"""Test patch_log_level temporarily changes the log level."""
caplog.set_level(logging.DEBUG)
logger = logging.getLogger("forest")
logger.setLevel(logging.ERROR)
logger.info("nope")
assert not caplog.records
with patch_log_level(logger, logging.INFO):
LOG.info("yep")
assert len(caplog.records) == 1
logger.info("nope")
assert len(caplog.records) == 1


def test_patch_log_level__reset(caplog: pytest.LogCaptureFixture):
"""Test patch_log_level resets the log level correctly after
use, not affected by the parent logger level - see
https://github.com/cylc/cylc-flow/pull/6327
"""
caplog.set_level(logging.ERROR)
logger = logging.getLogger("woods")
assert logger.level == logging.NOTSET
with patch_log_level(logger, logging.INFO):
logger.info("emitted but not captured, as caplog is at ERROR level")
assert not caplog.records
caplog.set_level(logging.INFO)
logger.info("yep")
assert len(caplog.records) == 1
assert logger.level == logging.NOTSET

0 comments on commit 9db2ca3

Please sign in to comment.