Skip to content

Commit

Permalink
Merge branch 'master' into fix-poll-on-restart
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver authored Feb 10, 2022
2 parents 3edb0c5 + 4313a73 commit a4d55d7
Show file tree
Hide file tree
Showing 17 changed files with 232 additions and 103 deletions.
6 changes: 6 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ access to information on configured platforms.
[#4658](https://github.com/cylc/cylc-flow/pull/4658) - At restart, only poll
tasks recorded as active at shutdown.

[#4667](https://github.com/cylc/cylc-flow/pull/4667) - Check manually triggered
tasks are not already preparing for job submission.

[#4640](https://github.com/cylc/cylc-flow/pull/4640) - Fix manual triggering of
runahead-limited parentless tasks.

[#4566](https://github.com/cylc/cylc-flow/pull/4566) - Fix `cylc scan`
invocation for remote scheduler host on a shared filesystem.

Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/broadcast_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def check_ext_triggers(self, itask, ext_trigger_queue):
"""Get queued ext trigger messages and try to satisfy itask.
Ext-triggers are pushed by the remote end, so we can check for
new messages and satisfy dependendent tasks at the same time.
new messages and satisfy dependent tasks at the same time.
Return True if itask has a newly satisfied ext-trigger.
"""
while not ext_trigger_queue.empty():
Expand Down
6 changes: 3 additions & 3 deletions cylc/flow/cfgspec/globalcfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@
A workflow will stall if there are no tasks ready to run and no
waiting external triggers, but the presence of incomplete
tasks or unsatisified prerequisites shows the workflow did not run to
tasks or unsatisfied prerequisites shows the workflow did not run to
completion. The stall timer turns off on any post-stall task activity.
It resets on restarting a stalled workflow.
Expand Down Expand Up @@ -388,7 +388,7 @@
.. versionchanged:: 8.0.0
{REPLACES}``[suite servers]run ports``.
It can no longer be used to definine a non-contiguous port
It can no longer be used to define a non-contiguous port
range.
''')
Conf('condemned', VDR.V_ABSOLUTE_HOST_LIST, desc=f'''
Expand Down Expand Up @@ -1173,7 +1173,7 @@
Conf('clean job submission environment', VDR.V_BOOLEAN, False,
desc='''
Job submission subprocesses inherit their parent environment by
default. Remote jobs inherit the default non-interative shell
default. Remote jobs inherit the default non-interactive shell
environment for their platform. Jobs on the scheduler host
inherit the scheduler environment (unless their job runner
prevents this).
Expand Down
34 changes: 19 additions & 15 deletions cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,13 +623,14 @@ def get_script_common_text(this: str, example: Optional[str] = None):
''')
Conf('cycling mode', VDR.V_STRING, Calendar.MODE_GREGORIAN,
options=list(Calendar.MODES) + ['integer'], desc='''
Choice of integer cycling or one of several calendars.
Cylc runs using the proleptic Gregorian calendar by default.
This allows you to run the workflow with the 360 day
calendar (12 months of 30 days in a year) OR integer cycling. It
also supports use of the 365 (never a leap year) and 366 (always a
leap year) calendars.
Choice of :term:`integer cycling` or one of several
:term:`datetime cycling` calendars.
Cylc runs workflows using the proleptic Gregorian calendar
by default. This setting allows you to instead choose
integer cycling, or one of the other supported non-Gregorian
datetime calendars: 360 day (12 months of 30 days in a year),
365 day (never a leap year) and 366 day (always a leap year).
''')
Conf('runahead limit', VDR.V_STRING, 'P5', desc='''
How many cycles ahead of the slowest tasks the fastest may run.
Expand Down Expand Up @@ -977,6 +978,16 @@ def get_script_common_text(this: str, example: Optional[str] = None):
If no parents are listed default is ``root``.
''')
Conf('script', VDR.V_STRING, desc=dedent('''
The main custom script invoked from the task job script.
It can be an external command or script, or inlined scripting.
See :ref:`Task Job Script Variables` for the list of variables
available in the task execution environment.
''') + get_script_common_text(
this='script', example='my_script.sh'
))
Conf('init-script', VDR.V_STRING, desc=dedent('''
Custom script invoked by the task job script before the task
execution environment is configured.
Expand Down Expand Up @@ -1036,13 +1047,6 @@ def get_script_common_text(this: str, example: Optional[str] = None):
this='pre-script',
example='echo "Hello from workflow ${CYLC_WORKFLOW_ID}!"'
))
Conf('script', VDR.V_STRING, desc=dedent('''
The main custom script invoked from the task job script.
It can be an external command or script, or inlined scripting.
''') + get_script_common_text(
this='script', example='my_script.sh'
))
Conf('post-script', VDR.V_STRING, desc=dedent('''
Custom script invoked by the task job script immediately
after :cylc:conf:`[..]script`.
Expand Down Expand Up @@ -1126,7 +1130,7 @@ def get_script_common_text(this: str, example: Optional[str] = None):
with Conf('meta', desc=r'''
Metadata for the task or task family.
The ``meta`` section containins metadata items for this task or
The ``meta`` section contains metadata items for this task or
family namespace. The items ``title``, ``description`` and
``URL`` are pre-defined and are used by Cylc. Others can be
user-defined and passed to task event handlers to be
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/main_loop/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ async def my_startup_coroutine(schd, state):
^^^^^^^^^^^
Coroutines must be decorated using one of the main loop decorators. The
choise of decorator effects when the coroutine is called and what
choice of decorator effects when the coroutine is called and what
arguments are provided to it.
The available event types are:
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/network/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ def get_graph_raw(
Task identifier for the dependency of
an edge.
right (str):
Task identifier for the dependant task
Task identifier for the dependent task
of an edge.
is_suicide (bool):
True if edge represents a suicide trigger.
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/parsec/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class ParsecValidator:
),
V_RANGE: (
'integer range',
'An integer range specified by a minumum and maximum value.',
'An integer range specified by a minimum and maximum value.',
{
'1..5': 'The numbers 1 to 5 inclusive.',
}
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/scripts/dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def main(_, options: 'Values', workflow_id: str) -> None:
item['name'],
item['cyclePoint'],
item['state']]
values.append('held' if item['isHeld'] else 'unheld')
values.append('held' if item['isHeld'] else 'not-held')
values.append('queued' if item['isQueued']
else 'not-queued')
values.append('runahead' if item['isRunahead']
Expand Down
28 changes: 14 additions & 14 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,9 @@ def load_db_tasks_to_hold(self):
self.workflow_db_mgr.pri_dao.select_tasks_to_hold()
)

def spawn_successor(self, itask: TaskProxy) -> Optional[TaskProxy]:
def spawn_successor_if_parentless(
self, itask: TaskProxy
) -> Optional[TaskProxy]:
"""Spawn next-cycle instance of itask if parentless.
This includes:
Expand Down Expand Up @@ -597,17 +599,15 @@ def release_runahead_task(
# implicit prev-instance parent
return

if not itask.flow_nums:
# No reflow
return

if not runahead_limit_point:
return

# Autospawn successor of itask if parentless.
n_task = self.spawn_successor(itask)
if n_task and n_task.point <= runahead_limit_point:
self.release_runahead_task(n_task, runahead_limit_point)
# Autospawn successor of itask if parentless and reflowing.
if itask.flow_nums:
n_task = self.spawn_successor_if_parentless(itask)
if (
n_task and
runahead_limit_point and
n_task.point <= runahead_limit_point
):
self.release_runahead_task(n_task, runahead_limit_point)

def remove(self, itask, reason=""):
"""Remove a task from the pool (e.g. after a reload)."""
Expand Down Expand Up @@ -1425,9 +1425,9 @@ def force_trigger_tasks(
# In pool already
itasks.append(itask)

# trigger tasks
# trigger tasks if not already preparing or active
for itask in itasks:
if itask.state(*TASK_STATUSES_ACTIVE):
if itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE):
LOG.warning(f"[{itask}] ignoring trigger - already active")
continue
itask.is_manual_submit = True
Expand Down
2 changes: 1 addition & 1 deletion tests/flakyfunctional/hold-release/14-hold-kill/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
sleep 10 # sleep, should still be held after 10 seconds
cylc dump -s -t "${CYLC_WORKFLOW_ID}" >'cylc-dump.out'
diff -u 'cylc-dump.out' - <<'__OUT__'
1, killer, running, unheld, not-queued, not-runahead
1, killer, running, not-held, not-queued, not-runahead
1, sleeper, waiting, held, not-queued, not-runahead
__OUT__
cylc release "${CYLC_WORKFLOW_ID}//1/sleeper"
Expand Down
3 changes: 3 additions & 0 deletions tests/functional/cli/05-colour.t
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
# Uses the "script" command to make stdout log file look like a terminal.

. "$(dirname "$0")/test_header"
if [[ "$OSTYPE" != "linux-gnu"* ]]; then
skip_all "Tests not compatibile with $OSTYPE"
fi
set_test_number 8

ANSI='\e\['
Expand Down
25 changes: 25 additions & 0 deletions tests/functional/spawn-on-demand/13-trigger-runahead.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/usr/bin/env bash
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------

# Check correct behaviour if a parentless task is manually triggered whilst
# runahead-limited. See GitHub #4619.

. "$(dirname "$0")/test_header"
set_test_number 2
reftest
exit
21 changes: 21 additions & 0 deletions tests/functional/spawn-on-demand/13-trigger-runahead/flow.cylc
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# This workflow should behave the same as if foo.1 did not trigger foo.2.

[scheduling]
cycling mode = integer
initial cycle point = 1
final cycle point = 3
runahead limit = P1
[[graph]]
P1 = foo
[runtime]
[[foo]]
script = """
cylc__job__wait_cylc_message_started
if ((CYLC_TASK_CYCLE_POINT == 1)); then
expected="foo, 1, running, not-held, not-queued, not-runahead
foo, 2, waiting, not-held, not-queued, runahead"
diff <(cylc dump -t "${CYLC_WORKFLOW_NAME}") <(echo "$expected")
# Force trigger next instance while it is runahead limited.
cylc trigger $CYLC_WORKFLOW_NAME//2/foo
fi
"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Initial point: 1
Final point: 3
1/foo -triggered off []
2/foo -triggered off []
3/foo -triggered off []
33 changes: 33 additions & 0 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@
from cylc.flow.cycling import PointBase
from cylc.flow.cycling.integer import IntegerPoint
from cylc.flow.scheduler import Scheduler
from cylc.flow.task_state import (
TASK_STATUS_WAITING,
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING,
TASK_STATUS_SUCCEEDED,
)


# NOTE: foo & bar have no parents so at start-up (even with the workflow
Expand Down Expand Up @@ -405,3 +412,29 @@ async def test_hold_point(

assert task_pool.tasks_to_hold == set()
assert db_select(example_flow, True, 'tasks_to_hold') == []


@pytest.mark.parametrize(
'status,should_trigger',
[
(TASK_STATUS_WAITING, True),
(TASK_STATUS_PREPARING, False),
(TASK_STATUS_SUBMITTED, False),
(TASK_STATUS_RUNNING, False),
(TASK_STATUS_SUCCEEDED, True),
]
)
async def test_trigger_states(status, should_trigger, one, run):
"""It should only trigger tasks in compatible states."""

async with run(one):
task = one.pool.filter_task_proxies('1/a')[0][0]

# reset task a to the provided state
task.state.reset(status)

# try triggering the task
one.pool.force_trigger_tasks('1/a')

# check whether the task triggered
assert task.is_manual_submit == should_trigger
Loading

0 comments on commit a4d55d7

Please sign in to comment.