Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Xtrigger function arg validation. #5452

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/5452.md
MetRonnie marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support xtrigger argument validation.
18 changes: 14 additions & 4 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from cylc.flow.id import Tokens
from cylc.flow.cycling.integer import IntegerInterval
from cylc.flow.cycling.iso8601 import ingest_time, ISO8601Interval

from cylc.flow.exceptions import (
CylcError,
InputError,
Expand All @@ -82,6 +83,7 @@
from cylc.flow.print_tree import print_tree
from cylc.flow.simulation import configure_sim_modes
from cylc.flow.subprocctx import SubFuncContext
from cylc.flow.subprocpool import get_func
from cylc.flow.task_events_mgr import (
EventData,
get_event_handler_data
Expand Down Expand Up @@ -1711,10 +1713,8 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
if label != 'wall_clock':
raise WorkflowConfigError(f"xtrigger not defined: {label}")
else:
# Allow "@wall_clock" in the graph as an undeclared
# zero-offset clock xtrigger.
xtrig = SubFuncContext(
'wall_clock', 'wall_clock', [], {})
# Allow "@wall_clock" in graph as implicit zero-offset.
xtrig = SubFuncContext('wall_clock', 'wall_clock', [], {})

if xtrig.func_name == 'wall_clock':
if self.cycling_type == INTEGER_CYCLING_TYPE:
Expand All @@ -1729,10 +1729,20 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
with suppress(IndexError):
xtrig.func_kwargs["offset"] = xtrig.func_args[0]

# Call the xtrigger's validate_config function if it has one.
with suppress(AttributeError, ImportError):
get_func(xtrig.func_name, "validate_config", self.fdir)(
xtrig.func_args,
xtrig.func_kwargs,
xtrig.get_signature()
)

if self.xtrigger_mgr is None:
# Validation only.
XtriggerManager.validate_xtrigger(label, xtrig, self.fdir)
else:
self.xtrigger_mgr.add_trig(label, xtrig, self.fdir)

self.taskdefs[right].add_xtrig_label(label, seq)

def get_actual_first_point(self, start_point):
Expand Down
24 changes: 16 additions & 8 deletions cylc/flow/subprocpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ def _killpg(proc, signal):
return True


def get_func(func_name, src_dir):
"""Find and return an xtrigger function from a module of the same name.
def get_func(mod_name, func_name, src_dir):
"""Find and return a named function from a named module.

Can be in <src_dir>/lib/python, cylc.flow.xtriggers, or in Python path.
MetRonnie marked this conversation as resolved.
Show resolved Hide resolved

These locations are checked in this order:
- <src_dir>/lib/python/
Expand All @@ -78,13 +80,17 @@ def get_func(func_name, src_dir):
Workflow source directory passed in as this is executed in an independent
process in the command pool and therefore doesn't know about the workflow.

Raises:
ImportError, if the module is not found
AttributeError, if the function is not found in the module

"""
if func_name in _XTRIG_FUNCS:
return _XTRIG_FUNCS[func_name]
if (mod_name, func_name) in _XTRIG_FUNCS:
# Found and cached already.
return _XTRIG_FUNCS[(mod_name, func_name)]

# First look in <src-dir>/lib/python.
sys.path.insert(0, os.path.join(src_dir, 'lib', 'python'))
mod_name = func_name
try:
mod_by_name = __import__(mod_name, fromlist=[mod_name])
except ImportError:
Expand All @@ -101,18 +107,20 @@ def get_func(func_name, src_dir):
raise

try:
_XTRIG_FUNCS[func_name] = getattr(mod_by_name, func_name)
_XTRIG_FUNCS[(mod_name, func_name)] = getattr(mod_by_name, func_name)
MetRonnie marked this conversation as resolved.
Show resolved Hide resolved
except AttributeError:
# Module func_name has no function func_name, nor an entry_point entry.
raise
return _XTRIG_FUNCS[func_name]
return _XTRIG_FUNCS[(mod_name, func_name)]


def run_function(func_name, json_args, json_kwargs, src_dir):
"""Run a Python function in the process pool.

func_name(*func_args, **func_kwargs)

The function is presumed to be in a module of the same name.

Redirect any function stdout to stderr (and workflow log in debug mode).
Return value printed to stdout as a JSON string - allows use of the
existing process pool machinery as-is. src_dir is for local modules.
Expand All @@ -121,7 +129,7 @@ def run_function(func_name, json_args, json_kwargs, src_dir):
func_args = json.loads(json_args)
func_kwargs = json.loads(json_kwargs)
# Find and import then function.
func = get_func(func_name, src_dir)
func = get_func(func_name, func_name, src_dir)
# Redirect stdout to stderr.
orig_stdout = sys.stdout
sys.stdout = sys.stderr
Expand Down
3 changes: 2 additions & 1 deletion cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,9 @@ def validate_xtrigger(

"""
fname: str = fctx.func_name

try:
func = get_func(fname, fdir)
func = get_func(fname, fname, fdir)
except ImportError:
raise XtriggerConfigError(
label,
Expand Down
31 changes: 31 additions & 0 deletions cylc/flow/xtriggers/wall_clock.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
"""xtrigger function to trigger off of a wall clock time."""

from time import time
from cylc.flow.cycling.iso8601 import interval_parse
from cylc.flow.exceptions import WorkflowConfigError


def wall_clock(trigger_time=None):
Expand All @@ -27,3 +29,32 @@
Trigger time as seconds since Unix epoch.
"""
return time() > trigger_time


def validate_config(f_args, f_kwargs, f_signature):
"""Validate and manipulate args parsed from the workflow config.

wall_clock() # zero offset
wall_clock(PT1H)
wall_clock(offset=PT1H)

The offset must be a valid ISO 8601 interval.

If f_args used, convert to f_kwargs for clarity.

"""
n_args = len(f_args)
n_kwargs = len(f_kwargs)

Check warning on line 47 in cylc/flow/xtriggers/wall_clock.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/xtriggers/wall_clock.py#L46-L47

Added lines #L46 - L47 were not covered by tests

if n_args + n_kwargs > 1:
raise WorkflowConfigError(f"xtrigger: too many args: {f_signature}")

Check warning on line 50 in cylc/flow/xtriggers/wall_clock.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/xtriggers/wall_clock.py#L50

Added line #L50 was not covered by tests

if n_args:
f_kwargs["offset"] = f_args[0]

Check warning on line 53 in cylc/flow/xtriggers/wall_clock.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/xtriggers/wall_clock.py#L53

Added line #L53 was not covered by tests
elif not n_kwargs:
f_kwargs["offset"] = "P0Y"

Check warning on line 55 in cylc/flow/xtriggers/wall_clock.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/xtriggers/wall_clock.py#L55

Added line #L55 was not covered by tests

try:
interval_parse(f_kwargs["offset"])
except ValueError:
raise WorkflowConfigError(f"xtrigger: invalid offset: {f_signature}")

Check warning on line 60 in cylc/flow/xtriggers/wall_clock.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/xtriggers/wall_clock.py#L57-L60

Added lines #L57 - L60 were not covered by tests
21 changes: 11 additions & 10 deletions tests/unit/test_subprocpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ def test_xfunction(self):
with the_answer_file.open(mode="w") as f:
f.write("""the_answer = lambda: 42""")
f.flush()
fn = get_func("the_answer", temp_dir)
f_name = "the_answer"
fn = get_func(f_name, f_name, temp_dir)
result = fn()
self.assertEqual(42, result)

Expand All @@ -166,19 +167,18 @@ def test_xfunction_cache(self):
python_dir.mkdir(parents=True)
amandita_file = python_dir / "amandita.py"
with amandita_file.open(mode="w") as f:
f.write("""amandita = lambda: 'chocolate'""")
f.write("""choco = lambda: 'chocolate'""")
f.flush()
fn = get_func("amandita", temp_dir)
m_name = "amandita" # module
f_name = "choco" # function
fn = get_func(m_name, f_name, temp_dir)
result = fn()
self.assertEqual('chocolate', result)

# is in the cache
self.assertTrue('amandita' in _XTRIG_FUNCS)
self.assertTrue((m_name, f_name) in _XTRIG_FUNCS)
# returned from cache
self.assertEqual(fn, get_func("amandita", temp_dir))
del _XTRIG_FUNCS['amandita']
# is not in the cache
self.assertFalse('amandita' in _XTRIG_FUNCS)
self.assertEqual(fn, get_func(m_name, f_name, temp_dir))

def test_xfunction_import_error(self):
"""Test for error on importing a xtrigger function.
Expand All @@ -189,7 +189,7 @@ def test_xfunction_import_error(self):
"""
with TemporaryDirectory() as temp_dir:
with self.assertRaises(ModuleNotFoundError):
get_func("invalid-module-name", temp_dir)
get_func("invalid-module-name", "func-name", temp_dir)

def test_xfunction_attribute_error(self):
"""Test for error on looking for an attribute in a xtrigger script."""
Expand All @@ -200,8 +200,9 @@ def test_xfunction_attribute_error(self):
with the_answer_file.open(mode="w") as f:
f.write("""the_droid = lambda: 'excalibur'""")
f.flush()
f_name = "the_sword"
with self.assertRaises(AttributeError):
get_func("the_sword", temp_dir)
get_func(f_name, f_name, temp_dir)


@pytest.fixture
Expand Down
Loading