Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce support for event-triggered workflows #1271

Merged
merged 45 commits into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
8902299
WIP: Integrate with Argo Events
savingoyal Feb 13, 2023
f1fc273
events
savingoyal Feb 28, 2023
a104866
auto-emit events
savingoyal Mar 1, 2023
fdff141
address sensor deregistration
savingoyal Mar 1, 2023
6521cf0
stuff more information in auto events
savingoyal Mar 1, 2023
d3cf7d2
changes
savingoyal Mar 1, 2023
24b4c99
mo event changes
savingoyal Mar 5, 2023
78bc0e5
flip parameter mappings
savingoyal Mar 5, 2023
a07b758
introduce trigger metadata in current
savingoyal Mar 5, 2023
b32dc9b
black cleanup
savingoyal Mar 5, 2023
1a8a96a
emit triggering metadata to db
savingoyal Mar 5, 2023
f0e1a46
support trigger_on_finish
savingoyal Mar 5, 2023
6e20849
set event ids
savingoyal Mar 5, 2023
02432a4
introduce metaflow trigger
savingoyal Mar 5, 2023
b4f50a8
block sfn and airflow for @trigger
savingoyal Mar 5, 2023
5aa605f
introduce options
savingoyal Mar 5, 2023
51dd683
fix timestamp return type and add checks
savingoyal Mar 5, 2023
0c25574
more checks
savingoyal Mar 5, 2023
16ee555
fix namespaces
savingoyal Mar 6, 2023
20701f3
more changes
savingoyal Mar 6, 2023
116054c
more ux polish
savingoyal Mar 6, 2023
de0b37c
address comments
savingoyal Mar 14, 2023
896af5b
remove references to AND
savingoyal Mar 14, 2023
8df4f4b
address feedback for metaflow trigger
savingoyal Mar 14, 2023
8f8117f
address client changes
savingoyal Mar 14, 2023
2ca19c1
add config vars for webhooks
savingoyal Mar 14, 2023
19689f4
updates to event triggering
savingoyal Mar 28, 2023
09e8582
more changes
savingoyal Apr 11, 2023
3251a67
changes
savingoyal Apr 17, 2023
41c59d3
address feedback
savingoyal Apr 24, 2023
92df095
address changes
savingoyal Apr 24, 2023
129509e
add docs
savingoyal Apr 24, 2023
a3ef4c5
clean up trigger
savingoyal Apr 24, 2023
e20ab00
introduce window
savingoyal Apr 24, 2023
36dd508
remove hardcodes
savingoyal Apr 24, 2023
a5893fe
address pending comments
savingoyal Apr 24, 2023
37977ee
add todo comments
savingoyal Apr 24, 2023
ab76cce
move back event names
savingoyal Apr 24, 2023
d2ecb1d
clean up code
savingoyal Apr 24, 2023
62f7089
address feedback
savingoyal Apr 25, 2023
4dee37a
support prod namespace
savingoyal Apr 25, 2023
489f735
address comments
savingoyal Apr 25, 2023
797aa6d
address bugs
savingoyal Apr 25, 2023
af232ab
final fix
savingoyal Apr 25, 2023
df1378e
fix: event triggering docs (#1372)
saikonen Apr 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 31 additions & 18 deletions metaflow/client/core.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,31 @@
from __future__ import print_function
from datetime import datetime

import json
import os
import tarfile
import json
from io import BytesIO
from collections import namedtuple
from datetime import datetime
from io import BytesIO
from itertools import chain
from typing import (
Any,
Dict,
FrozenSet,
Iterable,
List,
Optional,
Tuple,
)
from typing import Any, Dict, FrozenSet, Iterable, List, Optional, Tuple

from metaflow.metaflow_environment import MetaflowEnvironment
from metaflow.current import current
from metaflow.events import Trigger
from metaflow.exception import (
MetaflowNotFound,
MetaflowNamespaceMismatch,
MetaflowInternalError,
MetaflowInvalidPathspec,
MetaflowNamespaceMismatch,
MetaflowNotFound,
)
from metaflow.includefile import IncludedFile
from metaflow.metaflow_config import DEFAULT_METADATA, MAX_ATTEMPTS
from metaflow.metaflow_environment import MetaflowEnvironment
from metaflow.plugins import ENVIRONMENTS, METADATA_PROVIDERS
from metaflow.unbounded_foreach import CONTROL_TASK_TAG
from metaflow.util import cached_property, resolve_identity, to_unicode, is_stringish
from metaflow.util import cached_property, is_stringish, resolve_identity, to_unicode

from .filecache import FileCache
from .. import INFO_FILE
from .filecache import FileCache

try:
# python2
Expand Down Expand Up @@ -356,7 +350,8 @@ def __iter__(self) -> Iterable["MetaflowObject"]:
Iterator over all children
"""
query_filter = {}
# skip namespace filtering if _namespace_check is False

# skip namespace filtering if _namespace_check is unset.
if self._namespace_check and current_namespace:
savingoyal marked this conversation as resolved.
Show resolved Hide resolved
query_filter = {"any_tags": current_namespace}

Expand Down Expand Up @@ -1901,6 +1896,24 @@ def replace_tags(self, tags_to_remove: Iterable[str], tags_to_add: Iterable[str]
self._user_tags = frozenset(final_user_tags)
self._tags = frozenset([*self._user_tags, *self._system_tags])

@property
def trigger(self) -> Optional[Trigger]:
"""
Returns a container of events that triggered this run.

This returns None if the run was not triggered by any events.

Returns
-------
Trigger, optional
Container of triggering events
"""
if "start" in self:
meta = self["start"].task.metadata_dict.get("execution-triggers")
if meta:
return Trigger(json.loads(meta))
return None

savingoyal marked this conversation as resolved.
Show resolved Hide resolved

class Flow(MetaflowObject):
"""
Expand Down
162 changes: 162 additions & 0 deletions metaflow/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
from collections import OrderedDict, namedtuple
from datetime import datetime

MetaflowEvent = namedtuple("MetaflowEvent", ["name", "id", "timestamp", "type"])
savingoyal marked this conversation as resolved.
Show resolved Hide resolved
MetaflowEvent.__doc__ = """
Container of metadata that identifies the event that triggered
the `Run` under consideration.

Attributes
----------
name : str
name of the event.
id : str
unique identifier for the event.
timestamp : datetime
savingoyal marked this conversation as resolved.
Show resolved Hide resolved
timestamp recording creation time for the event.
type : str
type for the event - one of `event` or `run`
"""


class Trigger(object):
"""
Defines a container of event triggers' metadata.

"""

def __init__(self, _meta=None):
if _meta is None:
_meta = []

_meta.sort(key=lambda x: x.get("timestamp") or float("-inf"), reverse=True)

self._runs = None
self._events = [
MetaflowEvent(
savingoyal marked this conversation as resolved.
Show resolved Hide resolved
**{
**obj,
# Add timestamp as datetime. Guaranteed to exist for Metaflow
# events - best effort for everything else.
**(
{"timestamp": datetime.fromtimestamp(obj["timestamp"])}
if obj.get("timestamp")
and isinstance(obj.get("timestamp"), int)
else {}
),
}
)
for obj in _meta
]

@classmethod
def from_runs(cls, run_objs):
run_objs.sort(key=lambda x: x.finished_at, reverse=True)
trigger = Trigger(
[
{
"type": "run",
"timestamp": run_obj.finished_at,
"name": "metaflow.%s.%s" % (run_obj.parent.id, run_obj["end"].id),
"id": run_obj.end_task.pathspec,
}
for run_obj in run_objs
]
)
trigger._runs = run_objs
return trigger

@property
def event(self):
savingoyal marked this conversation as resolved.
Show resolved Hide resolved
"""
The `MetaflowEvent` object corresponding to the triggering event.

If multiple events triggered the run, this property is the latest event.

Returns
-------
MetaflowEvent, optional
The latest event that triggered the run, if applicable.
"""
return next(iter(self._events), None)

@property
def events(self):
"""
The list of `MetaflowEvent` objects correspondings to all the triggering events.

Returns
-------
List[MetaflowEvent], optional
List of all events that triggered the run
"""
return list(self._events) or None

@property
def run(self):
"""
The corresponding `Run` object if the triggering event is a Metaflow run.

In case multiple runs triggered the run, this property is the latest run.
Returns `None` if none of the triggering events are a `Run`.

Returns
-------
Run, optional
Latest Run that triggered this run, if applicable.
"""
if self._runs is None:
self.runs
return next(iter(self._runs), None)

@property
def runs(self):
"""
The list of `Run` objects in the triggering events.
Returns `None` if none of the triggering events are `Run` objects.

Returns
-------
List[Run], optional
List of runs that triggered this run, if applicable.
"""
if self._runs is None:
# to avoid circular import
from metaflow import Run

self._runs = [
Run(
# object id is the task pathspec for events that map to run
obj.id[: obj.id.index("/", obj.id.index("/") + 1)],
_namespace_check=False,
)
for obj in self._events
if obj.type == "run"
]

return list(self._runs) or None

def __getitem__(self, key):
savingoyal marked this conversation as resolved.
Show resolved Hide resolved
"""
If triggering events are runs, `key` corresponds to the flow name of the triggering run. Returns a triggering `Run` object corresponding to the key. If triggering events are not runs, `key` corresponds to the event name and a `MetaflowEvent` object is returned.
savingoyal marked this conversation as resolved.
Show resolved Hide resolved
"""
if self.runs:
savingoyal marked this conversation as resolved.
Show resolved Hide resolved
for run in self.runs:
if run.path_components[0] == key:
return run
elif self.events:
for event in self.events:
if event.name == key:
return event
raise KeyError(key)

def __iter__(self):
if self.events:
return iter(self.events)
return iter([])

def __contains__(self, id):
try:
return bool(self.__getitem__(id))
except KeyError:
return False
4 changes: 3 additions & 1 deletion metaflow/integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from metaflow.extension_support.integrations import process_integration_aliases

# To enable an alias `metaflow.alias.get_s3_client` to
# To enable an alias `metaflow.integrations.get_s3_client` to
# `metaflow.plugins.aws.aws_client.get_aws_client`, use the following:
#
# ALIASES_DESC = [("get_s3_client", ".plugins.aws.aws_client.get_aws_client")]
Expand All @@ -17,6 +17,8 @@
# - name: name of the integration alias
# - obj: object it points to
#
ALIASES_DESC = [("ArgoEvent", ".plugins.argo.argo_events.ArgoEvent")]

# Aliases can be enabled or disabled through configuration or extensions:
# - ENABLED_INTEGRATION_ALIAS: list of alias names to enable.
# - TOGGLE_INTEGRATION_ALIAS: if ENABLED_INTEGRATION_ALIAS is not set anywhere
Expand Down
10 changes: 10 additions & 0 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,16 @@
ARGO_WORKFLOWS_KUBERNETES_SECRETS = from_conf("ARGO_WORKFLOWS_KUBERNETES_SECRETS", "")
ARGO_WORKFLOWS_ENV_VARS_TO_SKIP = from_conf("ARGO_WORKFLOWS_ENV_VARS_TO_SKIP", "")

##
# Argo Events Configuration
##
ARGO_EVENTS_SERVICE_ACCOUNT = from_conf("ARGO_EVENTS_SERVICE_ACCOUNT")
ARGO_EVENTS_EVENT_BUS = from_conf("ARGO_EVENTS_EVENT_BUS", "default")
ARGO_EVENTS_EVENT_SOURCE = from_conf("ARGO_EVENTS_EVENT_SOURCE")
ARGO_EVENTS_EVENT = from_conf("ARGO_EVENTS_EVENT")
ARGO_EVENTS_WEBHOOK_URL = from_conf("ARGO_EVENTS_WEBHOOK_URL")


##
# Airflow Configuration
##
Expand Down
21 changes: 12 additions & 9 deletions metaflow/plugins/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from metaflow.extension_support.plugins import (
process_plugins,
merge_lists,
process_plugins,
resolve_plugins,
)

Expand Down Expand Up @@ -56,6 +56,8 @@
("conda_base", ".conda.conda_flow_decorator.CondaFlowDecorator"),
("schedule", ".aws.step_functions.schedule_decorator.ScheduleDecorator"),
("project", ".project_decorator.ProjectDecorator"),
("trigger", ".events_decorator.TriggerDecorator"),
("trigger_on_finish", ".events_decorator.TriggerOnFinishDecorator"),
]

# Add environments here
Expand Down Expand Up @@ -137,28 +139,29 @@ def get_plugin_cli():
AWS_CLIENT_PROVIDERS = resolve_plugins("aws_client_provider")
SECRETS_PROVIDERS = resolve_plugins("secrets_provider")

from .cards.card_modules import MF_EXTERNAL_CARDS

# Cards; due to the way cards were designed, it is harder to make them fit
# in the resolve_plugins mechanism. This should be OK because it is unlikely that
# cards will need to be *removed*. No card should be too specific (for example, no
# card should be something just for Airflow, or Argo or step-functions -- those should
# be added externally).
from .cards.card_modules.basic import (
DefaultCard,
TaskSpecCard,
ErrorCard,
BlankCard,
DefaultCard,
DefaultCardJSON,
ErrorCard,
TaskSpecCard,
)
from .cards.card_modules.test_cards import (
TestErrorCard,
TestTimeoutCard,
TestMockCard,
TestPathSpecCard,
TestEditableCard,
TestEditableCard2,
TestErrorCard,
TestMockCard,
TestNonEditableCard,
TestPathSpecCard,
TestTimeoutCard,
)
from .cards.card_modules import MF_EXTERNAL_CARDS

CARDS = [
DefaultCard,
Expand Down
Loading