Skip to content

Commit

Permalink
Merge pull request #3541 from hjoliver/queue-warning-c8
Browse files Browse the repository at this point in the history
Queue warning c8
  • Loading branch information
kinow authored Mar 30, 2020
2 parents 0d9d333 + 00dedb4 commit 3b2d9d9
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 13 deletions.
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ the cylc/cylc-ui repository (and see also cylc/cylc-uiserver).
The User Guide an other documention has been removed from the Python package to
the cylc/cylc-doc repository.

### Fixes

Cylc 8.0aX (alpha) releases are not compatible with Cylc 7 or with previous
8.0aX releases, as the API is still under heavy development.

Expand All @@ -33,6 +35,9 @@ The xtrigger examples were moved to a separate `cylc/cylc-xtriggers` project
Jinja filters were moved from its `Jinja2Filters` folder to within the `cylc`
namespace, under `cylc.jinja.filters`.

[#3541](https://github.com/cylc/cylc-flow/pull/3541) - Don't warn that a task
was already added to an internal queue, if the queue is the same.

-------------------------------------------------------------------------------
## __cylc-8.0a2 (2019-Q4?)__

Expand Down
30 changes: 17 additions & 13 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1092,9 +1092,11 @@ def configure_queues(self):
# Then reassign to other queues as requested.
warnings = []
requeued = []
# Record non-default queues by task name, to avoid spurious warnings
# about tasks "already added to a queue", when the queue is the same.
myq = {}
for key, queue in list(queues.copy().items()):
# queues.copy() is essential here to allow items to be removed from
# the queues dict.
myq[key] = []
if key == self.Q_DEFAULT:
continue
# Assign tasks to queue and remove them from default.
Expand All @@ -1109,7 +1111,7 @@ def configure_queues(self):
try:
queues[self.Q_DEFAULT]['members'].remove(fmem)
except ValueError:
if fmem in requeued:
if fmem in requeued and fmem not in myq[key]:
msg = "%s: ignoring %s from %s (%s)" % (
key, fmem, qmember,
'already assigned to a queue')
Expand All @@ -1118,6 +1120,7 @@ def configure_queues(self):
# Ignore: task not used in the graph.
pass
else:
myq[key] = fmem
qmembers.append(fmem)
requeued.append(fmem)
else:
Expand All @@ -1127,31 +1130,32 @@ def configure_queues(self):
queues[self.Q_DEFAULT]['members'].remove(qmember)
except ValueError:
if qmember in requeued:
msg = "%s: ignoring '%s' (%s)" % (
key, qmember, 'task already assigned')
msg = "%s: ignoring %s (%s)" % (
key, qmember,
'already assigned to a queue')
warnings.append(msg)
elif qmember not in all_task_names:
msg = "%s: ignoring '%s' (%s)" % (
msg = "%s: ignoring %s (%s)" % (
key, qmember, 'task not defined')
warnings.append(msg)
else:
# Ignore: task not used in the graph.
pass
else:
myq[key] = qmember
qmembers.append(qmember)
requeued.append(qmember)

if warnings:
err_msg = "Queue configuration warnings:"
for msg in warnings:
err_msg += "\n+ %s" % msg
LOG.warning(err_msg)

if qmembers:
queue['members'] = qmembers
else:
del queues[key]

if warnings:
err_msg = "Queue configuration warnings:"
for msg in warnings:
err_msg += "\n+ %s" % msg
LOG.warning(err_msg)

if cylc.flow.flags.verbose and len(queues) > 1:
log_msg = "Internal queues created:"
for key, queue in queues.items():
Expand Down
25 changes: 25 additions & 0 deletions cylc/flow/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,28 @@ def test_family_inheritance_and_quotes(self):
['MAINFAM_major1_minor10'])
assert 'goodbye_0_major1_minor10' in \
config.runtime['descendants']['SOMEFAM']


def test_queue_config(caplog, tmp_path):
suiterc_content = """
[scheduling]
[[queues]]
[[[q1]]]
members = A, B
[[[q2]]]
members = x
[[dependencies]]
graph = "x => y"
[runtime]
[[A]]
[[B]]
[[x]]
inherit = A, B
[[y]]
"""
suite_rc = tmp_path / "suite.rc"
suite_rc.write_text(suiterc_content)
config = SuiteConfig(suite="qtest", fpath=suite_rc.absolute())
log = caplog.messages[0].split('\n')
assert log[0] == "Queue configuration warnings:"
assert log[1] == "+ q2: ignoring x (already assigned to a queue)"

0 comments on commit 3b2d9d9

Please sign in to comment.