Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xtriggers: re-implement as async functions #3497

Open
4 tasks done
oliver-sanders opened this issue Feb 5, 2020 · 8 comments
Open
4 tasks done

xtriggers: re-implement as async functions #3497

oliver-sanders opened this issue Feb 5, 2020 · 8 comments
Labels
efficiency For notable efficiency improvements
Milestone

Comments

@oliver-sanders
Copy link
Member

oliver-sanders commented Feb 5, 2020

Supersedes the same idea from #2917

The Problem:

At the moment XTriggers are run by the subprocess pool, consequences:

  • Each XTrigger call involves a subprocess call to a script.
  • Python is invoked for each xtrigger call.
  • XTriggers are re-imported for each xtrigger call.
  • Missing XTriggers [can] cause runtime errors

Botching Bodging the XTrigger loading mechanism for #3465 seems nasty.

Close #2917

The Proposal:

Convert XTriggers to asynchronous functions and call them directly from the main loop piggy-backing on the main loop plugin functionality introduced in #3492.

This will turn XTriggers into a blocking stage of the main loop (where they would effectively be running in an unlimited thread pool) something like this:

async def main_loop(self):
    # ...
    await asyncio.gather(*[
        trigger(args)
        for trigger in xtriggers
    ])
    # ...
    await asyncio.sleep()

Consequences:

  • This should be an efficiency boost as we are calling the XTrigger directly rather than going through SubProcPool, Subprocess, Python and importlib.
  • XTriggers would not be re-imported whilst the suite is running, if you want to change XTrigger code you need to restart your suite.
  • XTriggers would get loaded on suite start, missing xtriggers could not cause runtime errors.
  • XTriggers gain the logging, timing and standardisation offered by main loop plugins.
  • The responsibility for performing non-blocking IO lies with the XTrigger function writer.

Bonus Marks:

If we move to asynchronous XTriggers we gain the ability to have long-running XTrigger functions with minimal overheads.

This means we can very easily and efficiently implement a push interface for XTriggers (in addition to the pre existing pill interface):

async def push(*args):
    socket = ...
    await msg = socket.recv(). # could take hours, doesn’t matter
    return json.loads(msg)

Super Bonus Marks:

Another long-lived interface which would also be fairly straight forward to implement is ‘yield’ I.e a single long-lived asynchronous function which yields cycle points and data as and when it becomes available:

async def yields(*args)
    socket = ...
    while True:
        # listen for messages
        msg = await socket.recv()
        data = json.loads(msg)
        # yield XTrigger values to Cylc Flow when they arrive
        yield (
            data[‘time’], # cycle point
            data
        )

A nicer solution for Kafka / message brokers?

Hyper bonus marks:

Once you’ve made it as far as a yield interface you have effectively achieved the cycle driver interface I’ve been harking on about for the last two years. Use coroutines to provide a cycling interface rather than botching bodging external triggers into a cycling regime which doesn’t fit, they can become a first class cycling object in their own right:

async def drives(*args)
    while True:
        await data = event()
        # kicks off a new cycle point at time()
        # this provides a solution to observation type workflows where data doesn’t arrive on regular sequences
        yield (time(), data)

This is a little more involved, requiring a major abstraction of the cycling interface and classes and beyond the scope of Cylc8 but worth keeping in mind so as to keep doors open.

Questions:

  • Push interface a good idea? The overhead is Just one thread per active XTrigger.
  • Yield interface a good idea? Really nice for some applications. Awesome USP for Cylc.
  • Is there any reason we would want XTriggers to be re-imported whilst the suite is running?
  • Are we happy with an async implementation?

The backbone of the work has already been done with main loop plugins (XTriggers are just a special case) but are there any hitches @cylc/core ?

Pull requests welcome!

@oliver-sanders oliver-sanders added the question Flag this as a question for the next Cylc project meeting. label Feb 5, 2020
@oliver-sanders oliver-sanders added this to the cylc-8.0.0 milestone Feb 5, 2020
@oliver-sanders oliver-sanders self-assigned this Feb 5, 2020
@oliver-sanders oliver-sanders added the efficiency For notable efficiency improvements label Feb 5, 2020
@hjoliver
Copy link
Member

hjoliver commented Mar 19, 2020

Supersedes #2917 !

@oliver-sanders
Copy link
Member Author

Didn't spot that!

@dwsutherland
Copy link
Member

I think this is blocking (hence why WFS => UIS is run in the thread pool executor):

async def push(*args):
    socket = ...
    await msg = socket.recv(). # could take hours, doesn’t matter
    return json.loads(msg)

However, there's an option to make it socket.recv() non-blocking (then you can loop over it like a queue).. And I hadn't tried asyncio.gathers(iterable_of_asyncfuncs) because the the iterable_of_asyncfuncs changed size for the UIS (amongst other things).

I may be wrong (?)..

@oliver-sanders
Copy link
Member Author

oliver-sanders commented Mar 20, 2020

There'll be a way of making it non-blocking, it's an untested illustrative example.

[edit] I think this example is non-blocking providing your socket implementation doesn't have additional blocking parts. But as an aside putting the xtrigger runner into another process wouldn't be a bad shout to reduce the potential for xtriggers blocking the Scheduler's main loop similar to how they can currently block the subprocpool's main loop by saturating the pool.

@oliver-sanders oliver-sanders removed the question Flag this as a question for the next Cylc project meeting. label Aug 4, 2020
@oliver-sanders oliver-sanders changed the title XTriggers: re-implement as async functions xtriggers: re-implement as async functions Oct 29, 2020
@oliver-sanders oliver-sanders modified the milestones: cylc-8.0.0, 8.x Jun 25, 2021
@hjoliver
Copy link
Member

Comment from a NIWA colleague:

#3497: looks good, especially if taken as far as the yield interface which would allow cylc external triggering via something like rabitmq etc which would allow multiple cylc suites to collaborate using a pub/sub pattern rather than the current inter suite polling

@oliver-sanders
Copy link
Member Author

Discussed another potential use case here recently where an integer cycling workflow is driven by an external system.

@oliver-sanders oliver-sanders removed their assignment Feb 22, 2022
@oliver-sanders
Copy link
Member Author

Note, currently the only way to achieve external event driven cycling in Cylc is using (deprecated?) extriggers (not to be mistaken for xtriggers).

# An example of a CI-type workflow which kicks off a new cycle of tasks every time    
# an external-trigger is received    
     
# Start the workflow as normal:    
# $ cylc vip <path/to/this/workflow>    
     
# Then kick off a cycle specifying any desired environment variables e.g:    
# $ ./bin/trigger <workflow-id> WORLD=earth    
     
[scheduling]    
    cycling mode = integer    
    initial cycle point = 1    
    runahead limit = P5  # max number of cycles which can run in parallel    
    [[special tasks]]    
        # register the external trigger and tell it which task to run    
        external-trigger = build("build")                                                                                          
    [[graph]]    
        P1 = """    
            build => a => b => c    
        """    
     
[runtime]    
    [[build, a, b, c]]    
        script = """    
            echo "Hello $CYLC_TASK_NAME!"    
        """    

@oliver-sanders
Copy link
Member Author

oliver-sanders commented Jun 14, 2024

An idea I've been mulling for a while, for discussion when this issue is picked up...

Implement these async xtriggers as tasks, not xtriggers, i.e:

[runtime]
  [[@file_watcher]]  # @ prefix not necessary
    run mode = xtrigger  # using Tim's lovely new "run mode" feature developed for skip mode
    [[[xtrigger]]]
      function = cylc.site_extensions.ops_file_watcher
      # xtrigger args
      cycle = ops/global//<cycle>
      mode = operational
    [[[environment]]]
      ANSWER = 42
    [[[outputs]]]
      file1 = file1
      file2 = file2
      file3 = file3

Why:

  • Tasks can have configuration, e.g. env vars, this configuration can be broadcasted allowing dynamically configured xtriggers.
  • Tasks can have (multiple) outputs e.g. one xtrigger can monitor multiple things.
  • Tasks can be triggered / removed / suicide-triggered.
  • Tasks can be set / killed / polled (may open the door to remote xtriggers).
  • Task outputs can be used in conditional expressions.
  • Tasks can be parametrised.
  • Task spawning is easier to understand (or at least more likely to be understood by users), sequential xtriggers can be achieved in the usual way (no need for an alternative mechanism).

What this doesn't solve (passively) is event-driven cycling (see 1, 2). This is a problem async xtriggers would be perfectly capable of solving which is currently only possible via ext-triggers. A solution which leaves this door open would be highly desirable:

[scheduling]
  [[graph]]
    @file_watcher = """
      foo => bar => baz
    """

But I haven't got any good suggestions for that ATM.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
efficiency For notable efficiency improvements
Projects
None yet
Development

No branches or pull requests

3 participants