Skip to content

Commit

Permalink
Merge pull request #6379 from cylc/8.3.x-sync
Browse files Browse the repository at this point in the history
🤖 Merge 8.3.x-sync into master
  • Loading branch information
MetRonnie authored Sep 26, 2024
2 parents a722b26 + e5b0cef commit 85d11b4
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 45 deletions.
9 changes: 4 additions & 5 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
from cylc.flow.cycling import PointBase
from cylc.flow.flow_mgr import FlowNums
from cylc.flow.prerequisite import Prerequisite
from cylc.flow.scheduler import Scheduler

EDGES = 'edges'
FAMILIES = 'families'
Expand Down Expand Up @@ -469,7 +470,7 @@ class DataStoreMgr:
ERR_PREFIX_JOB_NOT_ON_SEQUENCE = 'Invalid cycle point for job: '

def __init__(self, schd, n_edge_distance=1):
self.schd = schd
self.schd: Scheduler = schd
self.id_ = Tokens(
user=self.schd.owner,
workflow=self.schd.workflow,
Expand Down Expand Up @@ -1183,10 +1184,7 @@ def generate_ghost_task(
t_id = self.definition_id(name)

if itask is None:
itask = self.schd.pool.get_task(point_string, name)

if itask is None:
itask = TaskProxy(
itask = self.schd.pool.get_task(point_string, name) or TaskProxy(
self.id_,
self.schd.config.get_taskdef(name),
point,
Expand Down Expand Up @@ -1227,6 +1225,7 @@ def generate_ghost_task(
depth=task_def.depth,
graph_depth=n_depth,
name=name,
flow_nums=serialise_set(flow_nums),
)
self.all_n_window_nodes.add(tp_id)
self.n_window_depths.setdefault(n_depth, set()).add(tp_id)
Expand Down
8 changes: 5 additions & 3 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
List,
NamedTuple,
Expand Down Expand Up @@ -437,6 +438,9 @@ class TaskEventsManager():

workflow_cfg: Dict[str, Any]
uuid_str: str
# To be set by the task pool:
spawn_func: Callable[['TaskProxy', str], Any]

mail_interval: float = 0
mail_smtp: Optional[str] = None
mail_footer: Optional[str] = None
Expand All @@ -459,8 +463,6 @@ def __init__(
self._event_timers: Dict[EventKey, Any] = {}
# NOTE: flag for DB use
self.event_timers_updated = True
# To be set by the task pool:
self.spawn_func = None
self.timestamp = timestamp
self.bad_hosts = bad_hosts

Expand Down Expand Up @@ -1966,7 +1968,7 @@ def reset_bad_hosts(self):
)
self.bad_hosts.clear()

def spawn_children(self, itask, output):
def spawn_children(self, itask: 'TaskProxy', output: str) -> None:
# update DB task outputs
self.workflow_db_mgr.put_update_task_outputs(itask)
# spawn child-tasks
Expand Down
41 changes: 23 additions & 18 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,9 @@ def get_or_spawn_task(
is_xtrig_sequential = False
if ntask is None:
# ntask does not exist: spawn it in the flow.
ntask = self.spawn_task(tdef.name, point, flow_nums, flow_wait)
ntask = self.spawn_task(
tdef.name, point, flow_nums, flow_wait=flow_wait
)
# if the task was found set xtrigger checking type.
# otherwise find the xtrigger type if it can't spawn
# for whatever reason.
Expand All @@ -765,7 +767,12 @@ def get_or_spawn_task(
# ntask may still be None
return ntask, is_in_pool, is_xtrig_sequential

def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None:
def spawn_to_rh_limit(
self,
tdef: 'TaskDef',
point: Optional['PointBase'],
flow_nums: 'FlowNums',
) -> None:
"""Spawn parentless task instances from point to runahead limit.
Sequentially checked xtriggers will spawn the next occurrence of their
Expand All @@ -780,16 +787,14 @@ def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None:
return
if self.runahead_limit_point is None:
self.compute_runahead()
if self.runahead_limit_point is None:
return

is_xtrig_sequential = False
while point is not None and (point <= self.runahead_limit_point):
if tdef.is_parentless(point):
ntask, is_in_pool, is_xtrig_sequential = (
self.get_or_spawn_task(
point,
tdef,
flow_nums
)
self.get_or_spawn_task(point, tdef, flow_nums)
)
if ntask is not None:
if not is_in_pool:
Expand Down Expand Up @@ -1328,7 +1333,7 @@ def check_abort_on_task_fails(self):
"""
return self.abort_task_failed

def spawn_on_output(self, itask, output, forced=False):
def spawn_on_output(self, itask: TaskProxy, output: str) -> None:
"""Spawn child-tasks of given output, into the pool.
Remove the parent task from the pool if complete.
Expand All @@ -1349,7 +1354,6 @@ def spawn_on_output(self, itask, output, forced=False):
Args:
output: output to spawn on.
forced: True if called from manual set task command
"""
if (
Expand All @@ -1360,7 +1364,7 @@ def spawn_on_output(self, itask, output, forced=False):
self.abort_task_failed = True

children = []
if itask.flow_nums or forced:
if itask.flow_nums:
with suppress(KeyError):
children = itask.graph_children[output]

Expand Down Expand Up @@ -1391,10 +1395,7 @@ def spawn_on_output(self, itask, output, forced=False):
if c_task is not None and c_task != itask:
# (Avoid self-suicide: A => !A)
self.merge_flows(c_task, itask.flow_nums)
elif (
c_task is None
and (itask.flow_nums or forced)
):
elif c_task is None and itask.flow_nums:
# If child is not in the pool already, and parent belongs to a
# flow (so it can spawn children), and parent is not waiting
# for an upcoming flow merge before spawning ... then spawn it.
Expand All @@ -1418,7 +1419,10 @@ def spawn_on_output(self, itask, output, forced=False):
if not in_pool:
self.add_to_pool(t)

if t.point <= self.runahead_limit_point:
if (
self.runahead_limit_point is not None
and t.point <= self.runahead_limit_point
):
self.rh_release_and_queue(t)

# Event-driven suicide.
Expand Down Expand Up @@ -1658,7 +1662,6 @@ def spawn_task(
name: str,
point: 'PointBase',
flow_nums: Set[int],
force: bool = False,
flow_wait: bool = False,
) -> Optional[TaskProxy]:
"""Return a new task proxy for the given flow if possible.
Expand Down Expand Up @@ -1723,7 +1726,7 @@ def spawn_task(
if prev_flow_wait:
self._spawn_after_flow_wait(itask)

if itask.transient and not force:
if itask.transient:
return None

if not itask.transient:
Expand Down Expand Up @@ -2016,7 +2019,9 @@ def _set_prereqs_tdef(
):
"""Spawn a future task and set prerequisites on it."""

itask = self.spawn_task(taskdef.name, point, flow_nums, flow_wait)
itask = self.spawn_task(
taskdef.name, point, flow_nums, flow_wait=flow_wait
)
if itask is None:
return
if self._set_prereqs_itask(itask, prereqs, flow_nums):
Expand Down
28 changes: 9 additions & 19 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1087,12 +1087,9 @@ async def test_no_flow_tasks_dont_spawn(
'R1': 'a => b => c'
}
},
'scheduler': {
'allow implicit tasks': 'true',
},
})

schd = scheduler(id_)
schd: Scheduler = scheduler(id_)
async with start(schd):
task_a = schd.pool.get_tasks()[0]

Expand All @@ -1101,37 +1098,30 @@ async def test_no_flow_tasks_dont_spawn(

# Set as completed: should not spawn children.
schd.pool.set_prereqs_and_outputs(
[task_a.identity], None, None, [FLOW_NONE])
[task_a.identity], [], [], [FLOW_NONE]
)
assert not schd.pool.get_tasks()

for flow_nums, force, pool in (
for flow_nums, expected_pool in (
# outputs yielded from a no-flow task should not spawn downstreams
(set(), False, []),
# forced spawning downstream of a no-flow task should spawn
# downstreams with flow_nums={}
(set(), True, [('1/b', set())]),
(set(), []),
# outputs yielded from a task with flow numbers should spawn
# downstreams in the same flow
({1}, False, [('1/b', {1})]),
# forced spawning should work in the same way
({1}, True, [('1/b', {1})]),
({1}, [('1/b', {1})]),
):
# set the flow-nums on 1/a
task_a.flow_nums = flow_nums

# spawn on the succeeded output
schd.pool.spawn_on_output(
task_a,
TASK_OUTPUT_SUCCEEDED,
forced=force,
)
schd.pool.spawn_on_output(task_a, TASK_OUTPUT_SUCCEEDED)

schd.pool.spawn_on_all_outputs(task_a)

# ensure the pool is as expected
assert [
(itask.identity, itask.flow_nums)
for itask in schd.pool.get_tasks()
] == pool
] == expected_pool


async def test_task_proxy_remove_from_queues(
Expand Down

0 comments on commit 85d11b4

Please sign in to comment.