Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

External trigger "plugin" functions. #2423

Merged
merged 18 commits into from
Jul 2, 2018
Merged

Conversation

hjoliver
Copy link
Member

@hjoliver hjoliver commented Sep 9, 2017

Close #2339. Close #1364

Trigger tasks off of anything by having the suite daemon call arbitrary "xtrigger functions" that return True/False and some result - e.g. path to a new external dataset - to broadcast to the cycle point dependent task.

This should replace in-task-proxy clock-triggers, client/server external triggers, and polling tasks.

  • xtrigger functions are called in the process pool with asynchronous callback (except for clock xtriggers which are guaranteed to be quick)
  • a function won't be called again before the previous call comes back
  • once an xtrigger is satisfied, the result is remembered and the function not called again
  • if a task has a clock xtrigger, don't bother calling other xtriggers until the clock time is up [not compatible with use of clock triggers in conditional expressions]
  • xtrigger functions are tracked by unique "call signature" (function name and args). If multiple tasks depend on the same xtrigger, it gets shared among them
  • function call frequency is configurable, default PT10S
  • variables like suite name, run directory, task cycle point, etc. can be substituted into function arg values via string templates (example below)
  • to extend functionality (trigger off of other external events) simply put a new function in /lib/cylc/xtriggers/ (or anywhere in sys.path) - and use it in the suite definition immediately

@hjoliver hjoliver added this to the soon milestone Sep 9, 2017
@hjoliver hjoliver self-assigned this Sep 9, 2017
@hjoliver
Copy link
Member Author

hjoliver commented Sep 9, 2017

I've implemented some xtrigger functions under lib/cylc/xtriggers/, with example suites under dev/suites/xtrigger/:

  • dummy(...) - a test trigger that sleeps while, and "fails" a few times before succeeding
  • wall_clock(...) - trigger off of the wall clock (can replace the in-task-proxy clock triggers)
  • suite_state(...) - trigger off of a task in another suite (can replace suite-state polling tasks)
  • file_exists(...) - trigger if a specific file exists
  • match_file(...) - trigger if a file is found to match some pattern (and report the matched file)
  • xrandom(..) - random trigger with configurable success rate.

[UPDATE: file triggering functions removed for the moment - a general solution to that problem is tricky]

@hjoliver
Copy link
Member Author

hjoliver commented Sep 11, 2017

Satisfied xtriggers are now stored and reloaded from the DB on restart, so the corresponding functions won't be called again (for the same function signature). This is needed because some xtriggers will only report in satisfied once. E.g. for triggering off of a message in a broker like RabbitMQ (but not Kafka) in which a message disappears once consumed; and for triggering off of detection of a new external dataset that does not have cycle point in the filename (e.g. satellite datasets) so the xtrigger function probably needs to rename the file on detection to ensure that it doesn't get re-detected in the next suite cycle.

@hjoliver hjoliver changed the title Pluggable in-daemon external trigger functions. External trigger "plugin" functions. Sep 12, 2017
@oliver-sanders
Copy link
Member

Looks good! Having external triggers appear in suite visualisations should make things much clearer.

This opens up using clock triggers with conditional statements which could be interesting e.g:

early | ( @clock_0 & data ) => process

In combination with #2334:

( ensemble:succeed-90% & @clock_0 ) | ( ensemble:succeed-80% & @clock_1 ) => process

Would it be possible to move a little more configuration into the graph by specifying selected arguments there? E.G:

[[[PT3H]]]
    graph = @clock => foo
[[[PT6H]]]
    graph = foo | @clock(+PT3H) => bar

@hjoliver
Copy link
Member Author

@oliver-sanders - good suggestions, I'll address them as soon as possible. Unfortunately these new triggers can't be used in conditional expressions yet, but I hope it is easy to do - it may need #2413

@hjoliver hjoliver force-pushed the ext-trig-plug branch 2 times, most recently from 3a7ac10 to ae67f59 Compare October 1, 2017 09:53
@hjoliver
Copy link
Member Author

hjoliver commented Oct 2, 2017

@oliver-sanders - after a bit of thought I'd prefer to leave your suggestions to possible future enhancements.

  • xtriggers in conditional expressions: this is obviously desirable, but I think it requires unifying the various trigger types Universal trigger handling #2413. This PR is still a significant improvement without that (and validation catches any attempt to do it)
  • allowing some xtrigger function args in the graph: this may be desirable as a small efficiency enhancement in suites with lots of xtriggers, as you'd only have to define the function call once. However, it doesn't really add more info to the graph if you use labels carefully (e.g. clock_PT0H => foo vs clock(PT0H) => foo) and on the downside it breaks the useful one-to-one mapping between labels and unique function signatures, which is used to qualify functional results in case a single task has multiple xtriggers of the same type, e.g.:
x1 = xtrig(file=a)
x2 = xtrig(file=b)
...
graph = "x1 & x2 => foo"

if the function xtrig(file=a) returns {name: 'bob'} and xtrig(file=b) returns {name: 'alice'} then the suite daemon will broadcast to task foo environment, x1_name=bob and x2_name=alice. If instead we had this:

x = xtrig()
...
graph = "x(file=a) & x(file=b) => foo"

then it's not clear how to pass function results on to triggered tasks.

@hjoliver
Copy link
Member Author

hjoliver commented Oct 5, 2017

Revised example suite:

[cylc]
    cycle point format = %Y
[scheduling]
    initial cycle point = 2020
    [[xtriggers]]
         suite_up = suite_state(suite=upstream, task=foo, point=%(point)s):PT10S
         clock_0 = wall_clock(offset=PT0H) 
   [[dependencies]]
        [[[P1Y]]]
           graph = "@clock_0 & @suite_up => FAM"
[runtime]
    [[FAM]]
    [[f_1, f_2, f_3, f_4, f_5]]
          inherit = FAM

Note:

  • xtrigger configuration is a direct representation of the function call.
  • The optional :INTERVAL in label = func(args):INTERVAL determines how frequently the suite daemon calls the function until it returns True.
  • clock triggers are treated differently - they are checked synchronously (not in the process pool with a callback), they don't have a configurable re-check delay, and the cycle point of the waiting task is automatically passed to the function.

bar

@hjoliver
Copy link
Member Author

hjoliver commented Oct 5, 2017

An interesting quirk to note: unlike tasks, xtriggers do not necessarily have an associated cycle point (hence they are not labeled with cycle point in the graph - see image above - but for convenience they are plotted per cycle point). Whether or not they are cycle-point specific depends on whether or not the cycle point of the dependent task is passed to the function (see the template %(point)s in the example suite above). The daemon groups xtriggers by unique function call signature, so if the cycle point does not appear in the argument list, then a single call will satisfy all cycle points as the suite runs. Similarly, if task name appears in the argument list (via template) xtrigger calls will be task-name specific (i.e. separate calls will be made to satisfy tasks with different names).

@hjoliver
Copy link
Member Author

hjoliver commented Oct 5, 2017

I will add some tests, then request a code review...

@hjoliver
Copy link
Member Author

hjoliver commented Oct 12, 2017

A note on the trigger function interface. The function signature is arbitrary, except that only keyword args are allowed. The suite-specific use of a trigger function is defined by the suite.rc syntax. Some suite and task variables (such as cycle point) are made available for use in the function args (in the suite.rc) - these are what can make a trigger cycle-point-, or task-name-, or whatever-, specific (because the template values are determined by the dependent task in each case). To illustrate, use of task name in the arg list below makes family members trigger at different random times off of (seemingly) the same xtrigger (in fact, on evaluation it creates 5 different xtriggers):

[scheduling]
   [[xtriggers]]
        rnd_by_name = xrandom(percent=50, secs=2, _=%(name)s)
   [[dependencies]]
        graph = "@rnd_by_name => FAM" 
[runtime]
   [[FAM]]
   [[f_1, f_2, f_3, f_4, f_5]]
         inherit = FAM

@hjoliver
Copy link
Member Author

hjoliver commented Nov 6, 2017

Branch rebased onto master.

@hjoliver hjoliver force-pushed the ext-trig-plug branch 3 times, most recently from 0507ef2 to 2c056c4 Compare November 6, 2017 22:01
@hjoliver
Copy link
Member Author

hjoliver commented Nov 6, 2017

@matthewrmshin @oliver-sanders - I think this is ready for review. I have not added user guide documentation yet in case review results in significant changes (although I'm pretty confident that the suite.rc interface is good) ... so let me know if the functionality and usage is not clear after reading the above comments, the docstring in lib/cylc/xtrigger_mgr.py, functions in lib/cylc/xtriggers/ and suites in /dev/suites/xtrigger/ and tests/xtriggers/.

@matthewrmshin
Copy link
Contributor

We can ignore Codacy's failure here. The SQL statement is safe in the context it is used - adding a class level constant to the statement. The use of pickle is also safe in the context it is used. (I wonder if we should switch to using JSON instead of pickle for (de-)serializing data? I don't think Codacy complains against JSON.)

@hjoliver
Copy link
Member Author

hjoliver commented Nov 7, 2017

Re codacy: yes, I guess JSoN is not a security risk like pickle, although slower (where that matters).

BTW I still intend to go through all issues flagged by codacy and "ignore" or address them (particularly security ones) ... but have not found the time for that yet.

Copy link
Contributor

@matthewrmshin matthewrmshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some initial comments.

bin/cylc-help Outdated
@@ -292,6 +292,7 @@ admin_commands['profile-battery'] = ['profile-battery']
admin_commands['import-examples'] = ['import-examples']
admin_commands['upgrade-run-dir'] = ['upgrade-run-dir']
admin_commands['check-software'] = ['check-software']
admin_commands['wrap-func'] = ['wrap-func']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More like call-func than wrap-func?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, "call" suggests a normal function call, doesn't it? This wraps a function into a minimal executable program, to execute in the process pool.

Copy link
Member Author

@hjoliver hjoliver Jun 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to function-run - less ambiguous, and no abbrev similarity to other commands (cylc warranty was a problem with wrap-func; and cylc run-func is a no-go thanks to cylc run).

bin/cylc-submit Outdated
db_mgr = SuiteDatabaseManager()
b_mgr = BroadcastMgr(db_mgr)
te_mgr = TaskEventsManager(suite, pool, db_mgr, b_mgr)
task_job_mgr = TaskJobManager(suite, pool, db_mgr, suite_srv_mgr, te_mgr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change still necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean exactly by "this change"? Oh, maybe just that the te_mgr and b_bgr variables are unnecessary as only used once? I'll change that...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

func_name = sys.argv[1]
func_args = json.loads(sys.argv[2])
func_kwargs = json.loads(sys.argv[3])
run_func(func_name, func_args, func_kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be good to move the json-load => call function => json-dump logic to a location under lib/ (in the future?). (See also protocols such as JSON-RPC.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could do (pref "for the future" as I haven't time to look at JSON-RPC right now)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've taken your advice on move into lib/cylc.

@@ -29,8 +29,9 @@
from isodatetime.parsers import TimePointParser, DurationParser

from cylc.cfgspec.utils import (
coerce_interval, coerce_interval_list, DurationFloat)
coerce_interval, coerce_xtrig, coerce_interval_list, DurationFloat)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may move this in #2609!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I thought you might 👍

IllegalValueError
)
IllegalValueError)
from cylc.mp_pool import SuiteFuncContext
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awful name SuiteFuncContext. Probably my fault with SuiteProcContext!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, and yes, just following the precedent!

import time
import json
import traceback
from signal import signal, alarm, SIGALRM
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New imports still used?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'll remove those. That was from the original function timeout mechanism, not needed now after your process pool refactor.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -306,6 +309,13 @@ def put_task_event_timers(self, task_events_mgr):
"delay": timer.delay,
"timeout": timer.timeout})

def put_xtriggers(self, sat_xtrig):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add docstring?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

xclock_satisfied = False
else:
xclock_satisfied = True
return xclock_satisfied and all(self.xtriggers.values())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    return self.xclock and self.xclock[1] and all(self.xtriggers.values())

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed (slightly differently - "None and bool" returns None)

if sig not in self.all_xtrig:
rem.append(sig)
for r in rem:
del self.sat_xtrig[r]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be?

for sig in list(self.sat_xtrig):
    if sig not in self.all_xtrig:
        del self.sat_xtrig[sig]

# OR:
self.sat_xtrig = [sig for sig in self.sat_xtrig if sig in self.all_xtrig]

Copy link
Member Author

@hjoliver hjoliver Jun 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. (The list comprehension doesn't work as sat_xtrig is a dict - and can't use dict comprehensions till we drop support for Python 2.6)

@hjoliver
Copy link
Member Author

Latest review comments addressed.

@hjoliver
Copy link
Member Author

Done (I hope). Documentation completed (I'm sure it could be improved, but I'm pretty happy with it).

Copy link
Contributor

@matthewrmshin matthewrmshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test OK in my environment.

@matthewrmshin
Copy link
Contributor

Good to have this in early for the next-release cycle.

Copy link
Member

@oliver-sanders oliver-sanders left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything seems in good shape and well documented. A few notes probably comments for future work, I don't actually care about spelling (second comment).


# Function return value to stdout: for compat with the command process pool.

import importlib
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused import?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you forgot to fetch my updates. This command has changed name to cylc-function-run, and the unused import removed.

@@ -3621,7 +3625,7 @@ \subsubsection{Trigger Types}
graph = "foo => bar"
\end{lstlisting}

{\em The sequential declaration is deprecated} however, in favour of explicit
{\em The sequential declaration is deprecated} however, in favor of explicit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'm a conflicted mess about US-NZ/EN spelling. I lived in the US for 10 years, and US spelling tends to be used in software, so I tend to "correct" myself in both directions randomly. I suppose we should decided on one convention and stick to it.

cycle point:
{\em NOTE: this is the original Cylc clock-triggering mechanism in which the
time offset relative to cycle point is stored as an attribute of the tasks that
depend on it. It is deprecated in favour of the newer built in
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Back to en/nz spelling again.

clock_1 = wall_clock(PT1H)
\end{lstlisting}

Finally, a zero-offset clock trigger does not need to be declared under
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice (fast show gif).

The return value of the \lstinline=suite_state= trigger function looks like
this:
\begin{lstlisting}
results = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A note for future work, it would be good if this interface could handle the (granted niche) situation where there might be more than one upstream task e.g:

    @global_model & @local_model => process

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can handle this already (if I understand your point correctly) because we split AND expressions into separate triggers. It's just conditional OR expressions that are currently off-limits for external triggers.


The following string templates are available for use, if the trigger function
needs any of this information, in function arguments in the suite configuration:
\begin{myitemize}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than using string templates we could automatically provide kwargs if referenced.

It would be good if we could ditch the requirement for string templating. In the rose macro system user defined macros can include certain arguments in their signature. When rose calls said functions it inspects their signature and automatically provides such arguments is present.

For cylc that might mean the following function:

def my_xtrigger(user_parameter, dependent_id, point, debug=False):
    pass

Could potentially be called like so:

[scheduling]
    [[xtriggers]]
        my_trig = my_xtrigger('foo')

Rather than

[scheduling]
    [[xtriggers]]
        my_trig = my_xtrigger('foo', dependent_id='%(id)s', point='%(point)s')

The dependent task id, point and debug arguments being automatically provided by cylc as they are present in the function signature.

If a function does not implement any debug logic it can assume a signature omitting the debug arg:

def my_xtrigger(user_param):
    pass

Here is a crude example of automatically provided kwargs, note that a cleaner interface (to __code__) is available via the inspect module:

def foo(foo, bar, baz=1, pub=True):
    print 'foo: %s' % foo
    print 'bar: %s' % bar
    print 'baz: %s' % baz
    print 'pub: %s' % pub


data = {
    'bar': True,
    'baz': 42,
}


def call_function(function_name, data, args, kwargs):
    for arg in function_name.__code__.co_varnames:
        if arg in data:
            kwargs[arg] = data[arg]

    return function_name(*args, **kwargs)


call_function(foo, data, ('foot',), {})

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the explanation I was looking for earlier. It does appear to be a better way, but it will have to be another PR at this point. I have at least done a quick fix to provide suite debug mode by string template, like the other suite and task attributes, instead of automatically adding a debug kwarg in suite debug mode (which breaks a function that does not take a debug arg).

\section{External Event Triggers}
\label{External Event Triggers}

{\em WARNING: this is a new capability - its suite configuration interface may
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we should push the deprecation of the old style clock/external triggers until this interface has settled a bit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, fair enough. I'll changed the text to say we don't recommend wholesale conversion until the interface has stabilized.

Copy link
Member

@oliver-sanders oliver-sanders left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this branch is already being used I'm OK with pushing this in now with the caveat that the interface may well change in the future (as documented). I'll leave you to merge.

@matthewrmshin
Copy link
Contributor

(The latest change looks OK visually, but given that we have got a bit of changes here, I'd like to run the test battery in my normal work environment to make sure it does not break.)

@hjoliver
Copy link
Member Author

hjoliver commented Jun 30, 2018

(@matthewrmshin - yes that's why I didn't hit "merge". To explain, the final changes are:

  • get_func (dynamic import by name) is not needed for clock triggers because the function name is fixed (wall_clock)
  • therefore get_func is only needed now in lib/cylc/mp_pool.py, so I moved it there
  • also get_func was not working for local xtrigger import from <suite-dir>/lib/python so I fixed that, and have now modified a test to ensure that this is tested
    • I'm not terribly fond the way I'm getting suite source dir into the process pool (a new cylc function-run command arg) but that detail is not exposed to users, so we can change later if there's a better way)
  • a couple more doc tweaks

- xclock doesn't need get_func (wall_clock function is built-in)
- fix xtrigger module import via get_func, for local lib/python
- xtrig debug mode via string template
- fix unnecessary lambda in kafka example.
- test: load local xtrigger automatically.
- final branch doc tweaks.
@hjoliver
Copy link
Member Author

hjoliver commented Jul 1, 2018

(conflicts fixed after merge from master - docs only).

@matthewrmshin
Copy link
Contributor

Many tests in the test battery are hanging in my environment. I'll try again and see what is going on.

@hjoliver
Copy link
Member Author

hjoliver commented Jul 2, 2018

Damn, that's weird, Travis CI was all good before the final doc tweak.

@hjoliver
Copy link
Member Author

hjoliver commented Jul 2, 2018

Tests OK on my VM (no remote tests, but that shouldn't matter on this branch).

@matthewrmshin
Copy link
Contributor

Ran test battery again, and it was all OK. The first attempt probably hit some local glitches.

@hjoliver
Copy link
Member Author

hjoliver commented Jul 2, 2018

The final issue flagged by Codacy is not relevant - this branch necessarily adds one more DB insert (and its construction from strings is not a problem in this context). Travis CI tests passed (final commit didn't run it, as docs only). MERGING...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants