Skip to content

Commit

Permalink
only create one xtrigger per retry type per task
Browse files Browse the repository at this point in the history
  • Loading branch information
oliver-sanders committed Oct 24, 2019
1 parent 2ab9536 commit 05a6f34
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 57 deletions.
74 changes: 53 additions & 21 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -686,8 +686,57 @@ def _job_logs_retrieval_callback(self, proc_ctx, schd_ctx):
except KeyError as exc:
LOG.exception(exc)

def _get_retry_xtrigger(self, unix_time, submit_retry=False):
label = f'{"submit_" if submit_retry else ""}retry'
def _retry_task(self, itask, timer, submit_retry=False):
"""Retry a task.
Args:
itask (cylc.flow.task_proxy.TaskProxy):
The task to retry.
timer (cylc.flow.task_action_timer.TaskActionTimer):
The [retry] timer to which triggered this retry.
submit_retry (bool):
False if this is an execution retry.
True if this is a submission retry.
"""
# devive an xtrigger label for this retry
label = '_'.join((
'cylc',
'submit_retry' if submit_retry else 'retry',
itask.identity
))
kwargs = {
'absolute_as_seconds': timer.timeout
}

# if this isn't the first retry the xtrigger will already exist
if label in itask.state.xtriggers:
# retry xtrigger already exists from a previous retry, modify it
xtrig = itask.state.xtriggers[label]
self.xtrigger_mgr.mutate_trig(label, kwargs)
else:
# create a new retry xtrigger
xtrig = SubFuncContext(
label,
'wall_clock',
[],
kwargs
)
self.xtrigger_mgr.add_trig(
label,
xtrig,
os.getenv("CYLC_SUITE_RUN_DIR")
)
itask.state.add_xtrigger(label)
itask.state.reset(TASK_STATUS_WAITING)


def _get_retry_xtrigger(self, itask, unix_time, submit_retry=False):
label = (
'cylc',
'submit_retry' if submit_retry else 'retry',
itask.identity
).join('_')
xtrig = SubFuncContext(
label,
'wall_clock',
Expand Down Expand Up @@ -723,15 +772,7 @@ def _process_message_failed(self, itask, event_time, message):
else:
# There is an execution retry lined up.
timer = itask.try_timers[TimerFlags.EXECUTION_RETRY]
label, xtrig = self._get_retry_xtrigger(timer.timeout)
self.xtrigger_mgr.add_trig( # TODO centralise?
label,
xtrig,
os.getenv("CYLC_SUITE_RUN_DIR") # TODO?
)
itask.state.add_xtrigger(label)
itask.state.reset(TASK_STATUS_WAITING)

self._retry_task(itask, timer)
delay_msg = f"retrying in {timer.delay_timeout_as_str()}"
if itask.state.is_held:
delay_msg = "held (%s)" % delay_msg
Expand Down Expand Up @@ -820,16 +861,7 @@ def _process_message_submit_failed(self, itask, event_time):
else:
# There is a submission retry lined up.
timer = itask.try_timers[TimerFlags.SUBMISSION_RETRY]
label, xtrig = self._get_retry_xtrigger(
timer.timeout, submit_retry=True)
self.xtrigger_mgr.add_trig(
label,
xtrig,
os.getenv("CYLC_SUITE_RUN_DIR") # TODO?
)
itask.state.add_xtrigger(label)
itask.state.reset(TASK_STATUS_WAITING)

self._retry_task(itask, timer, submit_retry=True)
delay_msg = f"submit-retrying in {timer.delay_timeout_as_str()}"
if itask.state.is_held:
delay_msg = "held (%s)" % delay_msg
Expand Down
3 changes: 3 additions & 0 deletions cylc/flow/task_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,9 @@ def _add_prerequisites(self, point, tdef):
def add_xtrigger(self, label, satisfied=False):
self.xtriggers[label] = satisfied

def get_xtrigger(self, label):
return self.xtriggers[label]

def _add_xtriggers(self, point, tdef):
"""Add task xtriggers valid for the current sequence.
Expand Down
74 changes: 38 additions & 36 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,32 +86,17 @@ class XtriggerManager(object):
(with same offset from cycle point) will be satisfied by the same function
call.
"""
Args:
suite (str): suite name
user (str): suite owner
broadcast_mgr (BroadcastMgr): the Broadcast Manager
proc_pool (SubProcPool): pool of Subprocesses
suite_run_dir (str): suite run directory
suite_share_dir (str): suite share directory
suite_work_dir (str): suite work directory
suite_source_dir (str): suite source directory
@staticmethod
def validate_xtrigger(fname: str, fdir: str):
"""Validate an Xtrigger function.
Args:
fname (str): function name
fdir(str): function directory
Raises:
ImportError: if the function module was not found
AttributeError: if the function was not found in the xtrigger
module
ValueError: if the function is not callable
"""
try:
func = get_func(fname, fdir)
except ImportError:
raise ImportError(
f"ERROR: xtrigger module '{fname}' not found")
except AttributeError:
raise AttributeError(
f"ERROR: '{fname}' not found in xtrigger module '{fname}'")
if not callable(func):
raise ValueError(
f"ERROR: '{fname}' not callable in xtrigger module '{fname}'")
"""

def __init__(
self,
Expand All @@ -125,17 +110,6 @@ def __init__(
suite_work_dir: str = None,
suite_source_dir: str = None,
):
"""Initialize the xtrigger manager.
Args:
suite (str): suite name
user (str): suite owner
broadcast_mgr (BroadcastMgr): the Broadcast Manager
proc_pool (SubProcPool): pool of Subprocesses
suite_run_dir (str): suite run directory
suite_share_dir (str): suite share directory
suite_source_dir (str): suite source directory
"""
# Suite function and clock triggers by label.
self.functx_map = {}
# When next to call a function, by signature.
Expand Down Expand Up @@ -163,6 +137,31 @@ def __init__(
self.broadcast_mgr = broadcast_mgr
self.suite_source_dir = suite_source_dir

@staticmethod
def validate_xtrigger(fname: str, fdir: str):
"""Validate an Xtrigger function.
Args:
fname (str): function name
fdir(str): function directory
Raises:
ImportError: if the function module was not found
AttributeError: if the function was not found in the xtrigger
module
ValueError: if the function is not callable
"""
try:
func = get_func(fname, fdir)
except ImportError:
raise ImportError(
f"ERROR: xtrigger module '{fname}' not found")
except AttributeError:
raise AttributeError(
f"ERROR: '{fname}' not found in xtrigger module '{fname}'")
if not callable(func):
raise ValueError(
f"ERROR: '{fname}' not callable in xtrigger module '{fname}'")

def add_trig(self, label: str, fctx: SubFuncContext, fdir: str):
"""Add a new xtrigger function.
Expand All @@ -189,6 +188,9 @@ def add_trig(self, label: str, fctx: SubFuncContext, fdir: str):
# Not a string arg.
pass

def mutate_trig(self, label, kwargs):
self.functx_map[label].func_kwargs.update(kwargs)

def load_xtrigger_for_restart(self, row_idx: int, row: Tuple[str, str]):
"""Load satisfied xtrigger results from suite DB.
Expand Down

0 comments on commit 05a6f34

Please sign in to comment.