Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix potential type error & a few small mistakes #6357

Merged
merged 4 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
if TYPE_CHECKING:
from cylc.flow.cycling import PointBase
from cylc.flow.flow_mgr import FlowNums
from cylc.flow.scheduler import Scheduler

EDGES = 'edges'
FAMILIES = 'families'
Expand Down Expand Up @@ -468,7 +469,7 @@ class DataStoreMgr:
ERR_PREFIX_JOB_NOT_ON_SEQUENCE = 'Invalid cycle point for job: '

def __init__(self, schd, n_edge_distance=1):
self.schd = schd
self.schd: Scheduler = schd
hjoliver marked this conversation as resolved.
Show resolved Hide resolved
self.id_ = Tokens(
user=self.schd.owner,
workflow=self.schd.workflow,
Expand Down Expand Up @@ -1182,10 +1183,7 @@ def generate_ghost_task(
t_id = self.definition_id(name)

if itask is None:
itask = self.schd.pool.get_task(point_string, name)

if itask is None:
itask = TaskProxy(
itask = self.schd.pool.get_task(point_string, name) or TaskProxy(
self.id_,
self.schd.config.get_taskdef(name),
point,
Expand Down Expand Up @@ -1226,6 +1224,7 @@ def generate_ghost_task(
depth=task_def.depth,
graph_depth=n_depth,
name=name,
flow_nums=serialise_set(flow_nums),
)
self.all_n_window_nodes.add(tp_id)
self.n_window_depths.setdefault(n_depth, set()).add(tp_id)
Expand Down
8 changes: 5 additions & 3 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
List,
NamedTuple,
Expand Down Expand Up @@ -437,6 +438,9 @@ class TaskEventsManager():

workflow_cfg: Dict[str, Any]
uuid_str: str
# To be set by the task pool:
spawn_func: Callable[['TaskProxy', str], Any]

mail_interval: float = 0
mail_smtp: Optional[str] = None
mail_footer: Optional[str] = None
Expand All @@ -459,8 +463,6 @@ def __init__(
self._event_timers: Dict[EventKey, Any] = {}
# NOTE: flag for DB use
self.event_timers_updated = True
# To be set by the task pool:
self.spawn_func = None
self.timestamp = timestamp
self.bad_hosts = bad_hosts

Expand Down Expand Up @@ -1966,7 +1968,7 @@ def reset_bad_hosts(self):
)
self.bad_hosts.clear()

def spawn_children(self, itask, output):
def spawn_children(self, itask: 'TaskProxy', output: str) -> None:
# update DB task outputs
self.workflow_db_mgr.put_update_task_outputs(itask)
# spawn child-tasks
Expand Down
41 changes: 23 additions & 18 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,9 @@
is_xtrig_sequential = False
if ntask is None:
# ntask does not exist: spawn it in the flow.
ntask = self.spawn_task(tdef.name, point, flow_nums, flow_wait)
ntask = self.spawn_task(
tdef.name, point, flow_nums, flow_wait=flow_wait
)
# if the task was found set xtrigger checking type.
# otherwise find the xtrigger type if it can't spawn
# for whatever reason.
Expand All @@ -764,7 +766,12 @@
# ntask may still be None
return ntask, is_in_pool, is_xtrig_sequential

def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None:
def spawn_to_rh_limit(
self,
tdef: 'TaskDef',
point: Optional['PointBase'],
flow_nums: 'FlowNums',
) -> None:
"""Spawn parentless task instances from point to runahead limit.

Sequentially checked xtriggers will spawn the next occurrence of their
Expand All @@ -779,16 +786,14 @@
return
if self.runahead_limit_point is None:
self.compute_runahead()
if self.runahead_limit_point is None:
return

Check warning on line 790 in cylc/flow/task_pool.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/task_pool.py#L790

Added line #L790 was not covered by tests

is_xtrig_sequential = False
while point is not None and (point <= self.runahead_limit_point):
if tdef.is_parentless(point):
ntask, is_in_pool, is_xtrig_sequential = (
self.get_or_spawn_task(
point,
tdef,
flow_nums
)
self.get_or_spawn_task(point, tdef, flow_nums)
)
if ntask is not None:
if not is_in_pool:
Expand Down Expand Up @@ -1329,7 +1334,7 @@
"""
return self.abort_task_failed

def spawn_on_output(self, itask, output, forced=False):
def spawn_on_output(self, itask: TaskProxy, output: str) -> None:
"""Spawn child-tasks of given output, into the pool.

Remove the parent task from the pool if complete.
Expand All @@ -1350,7 +1355,6 @@

Args:
output: output to spawn on.
forced: True if called from manual set task command

"""
if (
Expand All @@ -1361,7 +1365,7 @@
self.abort_task_failed = True

children = []
if itask.flow_nums or forced:
if itask.flow_nums:
with suppress(KeyError):
children = itask.graph_children[output]

Expand Down Expand Up @@ -1392,10 +1396,7 @@
if c_task is not None and c_task != itask:
# (Avoid self-suicide: A => !A)
self.merge_flows(c_task, itask.flow_nums)
elif (
c_task is None
and (itask.flow_nums or forced)
):
elif c_task is None and itask.flow_nums:
# If child is not in the pool already, and parent belongs to a
# flow (so it can spawn children), and parent is not waiting
# for an upcoming flow merge before spawning ... then spawn it.
Expand All @@ -1419,7 +1420,10 @@
if not in_pool:
self.add_to_pool(t)

if t.point <= self.runahead_limit_point:
if (
self.runahead_limit_point is not None
and t.point <= self.runahead_limit_point
):
MetRonnie marked this conversation as resolved.
Show resolved Hide resolved
self.rh_release_and_queue(t)

# Event-driven suicide.
Expand Down Expand Up @@ -1659,7 +1663,6 @@
name: str,
point: 'PointBase',
flow_nums: Set[int],
force: bool = False,
flow_wait: bool = False,
) -> Optional[TaskProxy]:
"""Return a new task proxy for the given flow if possible.
Expand Down Expand Up @@ -1724,7 +1727,7 @@
if prev_flow_wait:
self._spawn_after_flow_wait(itask)

if itask.transient and not force:
if itask.transient:
return None

if not itask.transient:
Expand Down Expand Up @@ -2017,7 +2020,9 @@
):
"""Spawn a future task and set prerequisites on it."""

itask = self.spawn_task(taskdef.name, point, flow_nums, flow_wait)
itask = self.spawn_task(
taskdef.name, point, flow_nums, flow_wait=flow_wait
)
if itask is None:
return
if self._set_prereqs_itask(itask, prereqs, flow_nums):
Expand Down
28 changes: 9 additions & 19 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1085,12 +1085,9 @@ async def test_no_flow_tasks_dont_spawn(
'R1': 'a => b => c'
}
},
'scheduler': {
'allow implicit tasks': 'true',
},
})

schd = scheduler(id_)
schd: Scheduler = scheduler(id_)
async with start(schd):
task_a = schd.pool.get_tasks()[0]

Expand All @@ -1099,37 +1096,30 @@ async def test_no_flow_tasks_dont_spawn(

# Set as completed: should not spawn children.
schd.pool.set_prereqs_and_outputs(
[task_a.identity], None, None, [FLOW_NONE])
[task_a.identity], [], [], [FLOW_NONE]
)
assert not schd.pool.get_tasks()

for flow_nums, force, pool in (
for flow_nums, expected_pool in (
# outputs yielded from a no-flow task should not spawn downstreams
(set(), False, []),
# forced spawning downstream of a no-flow task should spawn
# downstreams with flow_nums={}
(set(), True, [('1/b', set())]),
(set(), []),
# outputs yielded from a task with flow numbers should spawn
# downstreams in the same flow
({1}, False, [('1/b', {1})]),
# forced spawning should work in the same way
({1}, True, [('1/b', {1})]),
({1}, [('1/b', {1})]),
):
# set the flow-nums on 1/a
task_a.flow_nums = flow_nums

# spawn on the succeeded output
schd.pool.spawn_on_output(
task_a,
TASK_OUTPUT_SUCCEEDED,
forced=force,
)
schd.pool.spawn_on_output(task_a, TASK_OUTPUT_SUCCEEDED)

schd.pool.spawn_on_all_outputs(task_a)

# ensure the pool is as expected
assert [
(itask.identity, itask.flow_nums)
for itask in schd.pool.get_tasks()
] == pool
] == expected_pool


async def test_task_proxy_remove_from_queues(
Expand Down
Loading