From daef4fe14a6cd5530299fca275a9d328d933f26e Mon Sep 17 00:00:00 2001 From: Savin Date: Tue, 25 Apr 2023 15:09:30 -0700 Subject: [PATCH] Introduce support for event-triggered workflows (#1271) * WIP: Integrate with Argo Events * events * auto-emit events * address sensor deregistration * stuff more information in auto events * changes * mo event changes * flip parameter mappings * introduce trigger metadata in current * black cleanup * emit triggering metadata to db * support trigger_on_finish * set event ids * introduce metaflow trigger * block sfn and airflow for @trigger * introduce options * fix timestamp return type and add checks * more checks * fix namespaces * more changes * more ux polish * address comments * remove references to AND * address feedback for metaflow trigger * address client changes * add config vars for webhooks * updates to event triggering * more changes * changes * address feedback * address changes * add docs * clean up trigger * introduce window * remove hardcodes * address pending comments * add todo comments * move back event names * clean up code * address feedback * support prod namespace * address comments * address bugs * final fix * fix: event triggering docs (#1372) * explicitly return None for trigger * address review comments * polishing events docstrings * update event decorator types in docstring --------- Co-authored-by: Sakari Ikonen <64256562+saikonen@users.noreply.github.com> --- metaflow/client/core.py | 49 +- metaflow/events.py | 162 +++ metaflow/integrations.py | 4 +- metaflow/metaflow_config.py | 10 + metaflow/plugins/__init__.py | 21 +- metaflow/plugins/airflow/airflow.py | 58 +- metaflow/plugins/airflow/airflow_cli.py | 20 +- metaflow/plugins/argo/argo_client.py | 85 +- metaflow/plugins/argo/argo_events.py | 98 ++ metaflow/plugins/argo/argo_workflows.py | 945 ++++++++++++++++-- metaflow/plugins/argo/argo_workflows_cli.py | 32 +- .../plugins/argo/argo_workflows_decorator.py | 86 ++ metaflow/plugins/argo/process_input_paths.py | 2 +- .../aws/step_functions/step_functions.py | 41 +- metaflow/plugins/events_decorator.py | 298 ++++++ metaflow/plugins/kubernetes/kubernetes.py | 25 +- 16 files changed, 1757 insertions(+), 179 deletions(-) create mode 100644 metaflow/events.py create mode 100644 metaflow/plugins/argo/argo_events.py create mode 100644 metaflow/plugins/events_decorator.py diff --git a/metaflow/client/core.py b/metaflow/client/core.py index 4ffaca05e21..38170f656a6 100644 --- a/metaflow/client/core.py +++ b/metaflow/client/core.py @@ -1,37 +1,31 @@ from __future__ import print_function -from datetime import datetime + +import json import os import tarfile -import json -from io import BytesIO from collections import namedtuple +from datetime import datetime +from io import BytesIO from itertools import chain -from typing import ( - Any, - Dict, - FrozenSet, - Iterable, - List, - Optional, - Tuple, -) +from typing import Any, Dict, FrozenSet, Iterable, List, Optional, Tuple -from metaflow.metaflow_environment import MetaflowEnvironment from metaflow.current import current +from metaflow.events import Trigger from metaflow.exception import ( - MetaflowNotFound, - MetaflowNamespaceMismatch, MetaflowInternalError, MetaflowInvalidPathspec, + MetaflowNamespaceMismatch, + MetaflowNotFound, ) from metaflow.includefile import IncludedFile from metaflow.metaflow_config import DEFAULT_METADATA, MAX_ATTEMPTS +from metaflow.metaflow_environment import MetaflowEnvironment from metaflow.plugins import ENVIRONMENTS, METADATA_PROVIDERS from metaflow.unbounded_foreach import CONTROL_TASK_TAG -from metaflow.util import cached_property, resolve_identity, to_unicode, is_stringish +from metaflow.util import cached_property, is_stringish, resolve_identity, to_unicode -from .filecache import FileCache from .. import INFO_FILE +from .filecache import FileCache try: # python2 @@ -356,7 +350,8 @@ def __iter__(self) -> Iterable["MetaflowObject"]: Iterator over all children """ query_filter = {} - # skip namespace filtering if _namespace_check is False + + # skip namespace filtering if _namespace_check is unset. if self._namespace_check and current_namespace: query_filter = {"any_tags": current_namespace} @@ -1901,6 +1896,24 @@ def replace_tags(self, tags_to_remove: Iterable[str], tags_to_add: Iterable[str] self._user_tags = frozenset(final_user_tags) self._tags = frozenset([*self._user_tags, *self._system_tags]) + @property + def trigger(self) -> Optional[Trigger]: + """ + Returns a container of events that triggered this run. + + This returns None if the run was not triggered by any events. + + Returns + ------- + Trigger, optional + Container of triggering events + """ + if "start" in self: + meta = self["start"].task.metadata_dict.get("execution-triggers") + if meta: + return Trigger(json.loads(meta)) + return None + class Flow(MetaflowObject): """ diff --git a/metaflow/events.py b/metaflow/events.py new file mode 100644 index 00000000000..ef6d216bdac --- /dev/null +++ b/metaflow/events.py @@ -0,0 +1,162 @@ +from collections import OrderedDict, namedtuple +from datetime import datetime + +MetaflowEvent = namedtuple("MetaflowEvent", ["name", "id", "timestamp", "type"]) +MetaflowEvent.__doc__ = """ + Container of metadata that identifies the event that triggered + the `Run` under consideration. + + Attributes + ---------- + name : str + name of the event. + id : str + unique identifier for the event. + timestamp : datetime + timestamp recording creation time for the event. + type : str + type for the event - one of `event` or `run` + """ + + +class Trigger(object): + """ + Defines a container of event triggers' metadata. + + """ + + def __init__(self, _meta=None): + if _meta is None: + _meta = [] + + _meta.sort(key=lambda x: x.get("timestamp") or float("-inf"), reverse=True) + + self._runs = None + self._events = [ + MetaflowEvent( + **{ + **obj, + # Add timestamp as datetime. Guaranteed to exist for Metaflow + # events - best effort for everything else. + **( + {"timestamp": datetime.fromtimestamp(obj["timestamp"])} + if obj.get("timestamp") + and isinstance(obj.get("timestamp"), int) + else {} + ), + } + ) + for obj in _meta + ] + + @classmethod + def from_runs(cls, run_objs): + run_objs.sort(key=lambda x: x.finished_at, reverse=True) + trigger = Trigger( + [ + { + "type": "run", + "timestamp": run_obj.finished_at, + "name": "metaflow.%s.%s" % (run_obj.parent.id, run_obj["end"].id), + "id": run_obj.end_task.pathspec, + } + for run_obj in run_objs + ] + ) + trigger._runs = run_objs + return trigger + + @property + def event(self): + """ + The `MetaflowEvent` object corresponding to the triggering event. + + If multiple events triggered the run, this property is the latest event. + + Returns + ------- + MetaflowEvent, optional + The latest event that triggered the run, if applicable. + """ + return next(iter(self._events), None) + + @property + def events(self): + """ + The list of `MetaflowEvent` objects correspondings to all the triggering events. + + Returns + ------- + List[MetaflowEvent], optional + List of all events that triggered the run + """ + return list(self._events) or None + + @property + def run(self): + """ + The corresponding `Run` object if the triggering event is a Metaflow run. + + In case multiple runs triggered the run, this property is the latest run. + Returns `None` if none of the triggering events are a `Run`. + + Returns + ------- + Run, optional + Latest Run that triggered this run, if applicable. + """ + if self._runs is None: + self.runs + return next(iter(self._runs), None) + + @property + def runs(self): + """ + The list of `Run` objects in the triggering events. + Returns `None` if none of the triggering events are `Run` objects. + + Returns + ------- + List[Run], optional + List of runs that triggered this run, if applicable. + """ + if self._runs is None: + # to avoid circular import + from metaflow import Run + + self._runs = [ + Run( + # object id is the task pathspec for events that map to run + obj.id[: obj.id.index("/", obj.id.index("/") + 1)], + _namespace_check=False, + ) + for obj in self._events + if obj.type == "run" + ] + + return list(self._runs) or None + + def __getitem__(self, key): + """ + If triggering events are runs, `key` corresponds to the flow name of the triggering run. Returns a triggering `Run` object corresponding to the key. If triggering events are not runs, `key` corresponds to the event name and a `MetaflowEvent` object is returned. + """ + if self.runs: + for run in self.runs: + if run.path_components[0] == key: + return run + elif self.events: + for event in self.events: + if event.name == key: + return event + raise KeyError(key) + + def __iter__(self): + if self.events: + return iter(self.events) + return iter([]) + + def __contains__(self, id): + try: + return bool(self.__getitem__(id)) + except KeyError: + return False diff --git a/metaflow/integrations.py b/metaflow/integrations.py index 210a4c8e927..49816c72c56 100644 --- a/metaflow/integrations.py +++ b/metaflow/integrations.py @@ -8,7 +8,7 @@ from metaflow.extension_support.integrations import process_integration_aliases -# To enable an alias `metaflow.alias.get_s3_client` to +# To enable an alias `metaflow.integrations.get_s3_client` to # `metaflow.plugins.aws.aws_client.get_aws_client`, use the following: # # ALIASES_DESC = [("get_s3_client", ".plugins.aws.aws_client.get_aws_client")] @@ -17,6 +17,8 @@ # - name: name of the integration alias # - obj: object it points to # +ALIASES_DESC = [("ArgoEvent", ".plugins.argo.argo_events.ArgoEvent")] + # Aliases can be enabled or disabled through configuration or extensions: # - ENABLED_INTEGRATION_ALIAS: list of alias names to enable. # - TOGGLE_INTEGRATION_ALIAS: if ENABLED_INTEGRATION_ALIAS is not set anywhere diff --git a/metaflow/metaflow_config.py b/metaflow/metaflow_config.py index 7c1a6d1dfa8..bb8c3cab62b 100644 --- a/metaflow/metaflow_config.py +++ b/metaflow/metaflow_config.py @@ -285,6 +285,16 @@ ARGO_WORKFLOWS_KUBERNETES_SECRETS = from_conf("ARGO_WORKFLOWS_KUBERNETES_SECRETS", "") ARGO_WORKFLOWS_ENV_VARS_TO_SKIP = from_conf("ARGO_WORKFLOWS_ENV_VARS_TO_SKIP", "") +## +# Argo Events Configuration +## +ARGO_EVENTS_SERVICE_ACCOUNT = from_conf("ARGO_EVENTS_SERVICE_ACCOUNT") +ARGO_EVENTS_EVENT_BUS = from_conf("ARGO_EVENTS_EVENT_BUS", "default") +ARGO_EVENTS_EVENT_SOURCE = from_conf("ARGO_EVENTS_EVENT_SOURCE") +ARGO_EVENTS_EVENT = from_conf("ARGO_EVENTS_EVENT") +ARGO_EVENTS_WEBHOOK_URL = from_conf("ARGO_EVENTS_WEBHOOK_URL") + + ## # Airflow Configuration ## diff --git a/metaflow/plugins/__init__.py b/metaflow/plugins/__init__.py index 16767a88237..06572f6776f 100644 --- a/metaflow/plugins/__init__.py +++ b/metaflow/plugins/__init__.py @@ -1,6 +1,6 @@ from metaflow.extension_support.plugins import ( - process_plugins, merge_lists, + process_plugins, resolve_plugins, ) @@ -56,6 +56,8 @@ ("conda_base", ".conda.conda_flow_decorator.CondaFlowDecorator"), ("schedule", ".aws.step_functions.schedule_decorator.ScheduleDecorator"), ("project", ".project_decorator.ProjectDecorator"), + ("trigger", ".events_decorator.TriggerDecorator"), + ("trigger_on_finish", ".events_decorator.TriggerOnFinishDecorator"), ] # Add environments here @@ -137,28 +139,29 @@ def get_plugin_cli(): AWS_CLIENT_PROVIDERS = resolve_plugins("aws_client_provider") SECRETS_PROVIDERS = resolve_plugins("secrets_provider") +from .cards.card_modules import MF_EXTERNAL_CARDS + # Cards; due to the way cards were designed, it is harder to make them fit # in the resolve_plugins mechanism. This should be OK because it is unlikely that # cards will need to be *removed*. No card should be too specific (for example, no # card should be something just for Airflow, or Argo or step-functions -- those should # be added externally). from .cards.card_modules.basic import ( - DefaultCard, - TaskSpecCard, - ErrorCard, BlankCard, + DefaultCard, DefaultCardJSON, + ErrorCard, + TaskSpecCard, ) from .cards.card_modules.test_cards import ( - TestErrorCard, - TestTimeoutCard, - TestMockCard, - TestPathSpecCard, TestEditableCard, TestEditableCard2, + TestErrorCard, + TestMockCard, TestNonEditableCard, + TestPathSpecCard, + TestTimeoutCard, ) -from .cards.card_modules import MF_EXTERNAL_CARDS CARDS = [ DefaultCard, diff --git a/metaflow/plugins/airflow/airflow.py b/metaflow/plugins/airflow/airflow.py index 781e65f33aa..8c5c759e37b 100644 --- a/metaflow/plugins/airflow/airflow.py +++ b/metaflow/plugins/airflow/airflow.py @@ -1,55 +1,53 @@ -from io import BytesIO import json import os import random import string import sys from datetime import datetime, timedelta -from metaflow.includefile import FilePathClass +from io import BytesIO import metaflow.util as util +from metaflow import current from metaflow.decorators import flow_decorators from metaflow.exception import MetaflowException +from metaflow.includefile import FilePathClass from metaflow.metaflow_config import ( - SERVICE_HEADERS, - SERVICE_INTERNAL_URL, - CARD_S3ROOT, - DATASTORE_SYSROOT_S3, - DATATOOLS_S3ROOT, - KUBERNETES_SERVICE_ACCOUNT, - KUBERNETES_SECRETS, - AIRFLOW_KUBERNETES_STARTUP_TIMEOUT_SECONDS, - AZURE_STORAGE_BLOB_SERVICE_ENDPOINT, - DATASTORE_SYSROOT_AZURE, - CARD_AZUREROOT, AIRFLOW_KUBERNETES_CONN_ID, AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT, AIRFLOW_KUBERNETES_KUBECONFIG_FILE, - DATASTORE_SYSROOT_GS, + AIRFLOW_KUBERNETES_STARTUP_TIMEOUT_SECONDS, + AWS_SECRETS_MANAGER_DEFAULT_REGION, + AZURE_STORAGE_BLOB_SERVICE_ENDPOINT, + CARD_AZUREROOT, CARD_GSROOT, + CARD_S3ROOT, + DATASTORE_SYSROOT_AZURE, + DATASTORE_SYSROOT_GS, + DATASTORE_SYSROOT_S3, + DATATOOLS_S3ROOT, DEFAULT_SECRETS_BACKEND_TYPE, - AWS_SECRETS_MANAGER_DEFAULT_REGION, + KUBERNETES_SECRETS, + KUBERNETES_SERVICE_ACCOUNT, S3_ENDPOINT_URL, + SERVICE_HEADERS, + SERVICE_INTERNAL_URL, +) +from metaflow.parameters import ( + DelayedEvaluationParameter, + JSONTypeClass, + deploy_time_eval, ) -from metaflow.parameters import DelayedEvaluationParameter, deploy_time_eval -from metaflow.plugins.kubernetes.kubernetes import Kubernetes # TODO: Move chevron to _vendor from metaflow.plugins.cards.card_modules import chevron +from metaflow.plugins.kubernetes.kubernetes import Kubernetes from metaflow.plugins.timeout_decorator import get_run_time_limit_for_task -from metaflow.util import dict_to_cli_options, get_username, compress_list -from metaflow.parameters import JSONTypeClass +from metaflow.util import compress_list, dict_to_cli_options, get_username from . import airflow_utils +from .airflow_utils import AIRFLOW_MACROS, TASK_ID_XCOM_KEY, AirflowTask, Workflow from .exception import AirflowException from .sensors import SUPPORTED_SENSORS -from .airflow_utils import ( - TASK_ID_XCOM_KEY, - AirflowTask, - Workflow, - AIRFLOW_MACROS, -) -from metaflow import current AIRFLOW_DEPLOY_TEMPLATE_FILE = os.path.join(os.path.dirname(__file__), "dag.py") @@ -627,6 +625,14 @@ def _contains_foreach(self): return False def compile(self): + if self.flow._flow_decorators.get("trigger") or self.flow._flow_decorators.get( + "trigger_on_finish" + ): + raise AirflowException( + "Deploying flows with @trigger or @trigger_on_finish decorator(s) " + "to Airflow is not supported currently." + ) + # Visit every node of the flow and recursively build the state machine. def _visit(node, workflow, exit_node=None): kube_deco = dict( diff --git a/metaflow/plugins/airflow/airflow_cli.py b/metaflow/plugins/airflow/airflow_cli.py index 91903037f84..55cedb1fcc4 100644 --- a/metaflow/plugins/airflow/airflow_cli.py +++ b/metaflow/plugins/airflow/airflow_cli.py @@ -1,23 +1,23 @@ +import base64 import os import re import sys -import base64 +from hashlib import sha1 + from metaflow import current, decorators from metaflow._vendor import click from metaflow.exception import MetaflowException, MetaflowInternalError from metaflow.package import MetaflowPackage -from hashlib import sha1 -from metaflow.plugins.kubernetes.kubernetes_decorator import KubernetesDecorator -from metaflow.util import get_username, to_bytes, to_unicode - -from .airflow import Airflow -from .exception import AirflowException, NotSupportedException - from metaflow.plugins.aws.step_functions.production_token import ( load_token, new_token, store_token, ) +from metaflow.plugins.kubernetes.kubernetes_decorator import KubernetesDecorator +from metaflow.util import get_username, to_bytes, to_unicode + +from .airflow import Airflow +from .exception import AirflowException, NotSupportedException class IncorrectProductionToken(MetaflowException): @@ -372,9 +372,9 @@ def _validate_workflow(flow, graph, flow_datastore, metadata, workflow_timeout): seen.add(norm) if "default" not in param.kwargs: raise MetaflowException( - "Parameter *%s* does not have a " - "default value. " + "Parameter *%s* does not have a default value. " "A default value is required for parameters when deploying flows on Airflow." + % param.name ) # check for other compute related decorators. _validate_foreach_constraints(graph) diff --git a/metaflow/plugins/argo/argo_client.py b/metaflow/plugins/argo/argo_client.py index 0f64ce0b02a..0a1a2d73a7f 100644 --- a/metaflow/plugins/argo/argo_client.py +++ b/metaflow/plugins/argo/argo_client.py @@ -12,14 +12,13 @@ class ArgoClientException(MetaflowException): class ArgoClient(object): def __init__(self, namespace=None): - - self._kubernetes_client = KubernetesClient() + self._client = KubernetesClient() self._namespace = namespace or "default" self._group = "argoproj.io" self._version = "v1alpha1" def get_workflow_template(self, name): - client = self._kubernetes_client.get() + client = self._client.get() try: return client.CustomObjectsApi().get_namespaced_custom_object( group=self._group, @@ -38,7 +37,7 @@ def get_workflow_template(self, name): def register_workflow_template(self, name, workflow_template): # Unfortunately, Kubernetes client does not handle optimistic # concurrency control by itself unlike kubectl - client = self._kubernetes_client.get() + client = self._client.get() try: workflow_template["metadata"][ "resourceVersion" @@ -88,7 +87,7 @@ def register_workflow_template(self, name, workflow_template): ) def trigger_workflow_template(self, name, parameters={}): - client = self._kubernetes_client.get() + client = self._client.get() body = { "apiVersion": "argoproj.io/v1alpha1", "kind": "Workflow", @@ -119,7 +118,7 @@ def trigger_workflow_template(self, name, parameters={}): def schedule_workflow_template(self, name, schedule=None, timezone=None): # Unfortunately, Kubernetes client does not handle optimistic # concurrency control by itself unlike kubectl - client = self._kubernetes_client.get() + client = self._client.get() body = { "apiVersion": "argoproj.io/v1alpha1", "kind": "CronWorkflow", @@ -181,3 +180,77 @@ def schedule_workflow_template(self, name, schedule=None, timezone=None): raise ArgoClientException( json.loads(e.body)["message"] if e.body is not None else e.reason ) + + def register_sensor(self, name, sensor=None): + if sensor is None: + sensor = {} + # Unfortunately, Kubernetes client does not handle optimistic + # concurrency control by itself unlike kubectl + client = self._client.get() + if not sensor: + sensor["metadata"] = {} + + try: + sensor["metadata"][ + "resourceVersion" + ] = client.CustomObjectsApi().get_namespaced_custom_object( + group=self._group, + version=self._version, + namespace=self._namespace, + plural="sensors", + name=name, + )[ + "metadata" + ][ + "resourceVersion" + ] + except client.rest.ApiException as e: + # Sensor does not exist and we want to add one + if e.status == 404: + if sensor.get("kind") is None: + return + try: + return client.CustomObjectsApi().create_namespaced_custom_object( + group=self._group, + version=self._version, + namespace=self._namespace, + plural="sensors", + body=sensor, + ) + except client.rest.ApiException as e: + raise ArgoClientException( + json.loads(e.body)["message"] + if e.body is not None + else e.reason + ) + else: + raise ArgoClientException( + json.loads(e.body)["message"] if e.body is not None else e.reason + ) + # Since sensors occupy real resources, delete existing sensor if needed + if sensor.get("kind") is None: + try: + return client.CustomObjectsApi().delete_namespaced_custom_object( + group=self._group, + version=self._version, + namespace=self._namespace, + plural="sensors", + name=name, + ) + except client.rest.ApiException as e: + raise ArgoClientException( + json.loads(e.body)["message"] if e.body is not None else e.reason + ) + try: + return client.CustomObjectsApi().replace_namespaced_custom_object( + group=self._group, + version=self._version, + namespace=self._namespace, + plural="sensors", + body=sensor, + name=name, + ) + except client.rest.ApiException as e: + raise ArgoClientException( + json.loads(e.body)["message"] if e.body is not None else e.reason + ) diff --git a/metaflow/plugins/argo/argo_events.py b/metaflow/plugins/argo/argo_events.py new file mode 100644 index 00000000000..94c7c0d5d15 --- /dev/null +++ b/metaflow/plugins/argo/argo_events.py @@ -0,0 +1,98 @@ +import json +import os +import sys +import time +import urllib +import uuid +from datetime import datetime + +from metaflow.exception import MetaflowException +from metaflow.metaflow_config import ARGO_EVENTS_WEBHOOK_URL + + +class ArgoEventException(MetaflowException): + headline = "Argo Event Exception" + + +class ArgoEvent(object): + def __init__( + self, name, url=ARGO_EVENTS_WEBHOOK_URL, payload=None, access_token=None + ): + # TODO: Introduce support for NATS + self._name = name + self._url = url + self._payload = payload or {} + self._access_token = access_token + + def add_to_payload(self, key, value): + self._payload[key] = str(value) + return self + + def publish(self, payload=None, force=False, ignore_errors=True): + if payload == None: + payload = {} + # Publish event iff forced or running on Argo Workflows + if force or os.environ.get("ARGO_WORKFLOW_TEMPLATE"): + try: + headers = {} + if self._access_token: + # TODO: Test with bearer tokens + headers = {"Authorization": "Bearer {}".format(self._access_token)} + # TODO: do we need to worry about certs? + + # Use urllib to avoid introducing any dependency in Metaflow + request = urllib.request.Request( + self._url, + method="POST", + headers={"Content-Type": "application/json", **headers}, + data=json.dumps( + { + "name": self._name, + "payload": { + # Add default fields here... + "name": self._name, + "id": str(uuid.uuid4()), + "timestamp": int(time.time()), + "utc_date": datetime.utcnow().strftime("%Y%m%d"), + "generated-by-metaflow": True, + **self._payload, + **payload, + }, + } + ).encode("utf-8"), + ) + retries = 3 + backoff_factor = 2 + + for i in range(retries): + try: + urllib.request.urlopen(request, timeout=10.0) + print( + "Argo Event (%s) published." % self._name, file=sys.stderr + ) + break + except urllib.error.HTTPError as e: + # TODO: Retry retryable HTTP error codes + raise e + except urllib.error.URLError as e: + if i == retries - 1: + raise e + else: + time.sleep(backoff_factor**i) + except Exception as e: + msg = "Unable to publish Argo Event (%s): %s" % (self._name, e) + if ignore_errors: + print(msg, file=sys.stderr) + else: + raise ArgoEventException(msg) + else: + msg = ( + "Argo Event (%s) was not published. Use " + + "ArgoEvent(...).publish(..., force=True) " + + "to force publish." + ) % self._name + + if ignore_errors: + print(msg, file=sys.stderr) + else: + raise ArgoEventException(msg) diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index 7f5a2d4404e..51e2aeff0e6 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -1,38 +1,52 @@ +import base64 import json import os +import re import shlex import sys from collections import defaultdict +from hashlib import sha1 from metaflow import current from metaflow.decorators import flow_decorators from metaflow.exception import MetaflowException from metaflow.metaflow_config import ( - SERVICE_HEADERS, - SERVICE_INTERNAL_URL, + ARGO_EVENTS_EVENT, + ARGO_EVENTS_EVENT_BUS, + ARGO_EVENTS_EVENT_SOURCE, + ARGO_EVENTS_SERVICE_ACCOUNT, + ARGO_EVENTS_WEBHOOK_URL, + ARGO_WORKFLOWS_ENV_VARS_TO_SKIP, + ARGO_WORKFLOWS_KUBERNETES_SECRETS, + AWS_SECRETS_MANAGER_DEFAULT_REGION, + AZURE_STORAGE_BLOB_SERVICE_ENDPOINT, + CARD_AZUREROOT, + CARD_GSROOT, CARD_S3ROOT, + DATASTORE_SYSROOT_AZURE, + DATASTORE_SYSROOT_GS, DATASTORE_SYSROOT_S3, DATATOOLS_S3ROOT, DEFAULT_METADATA, + DEFAULT_SECRETS_BACKEND_TYPE, + KUBERNETES_FETCH_EC2_METADATA, KUBERNETES_NAMESPACE, KUBERNETES_NODE_SELECTOR, KUBERNETES_SANDBOX_INIT_SCRIPT, KUBERNETES_SECRETS, - KUBERNETES_FETCH_EC2_METADATA, S3_ENDPOINT_URL, - AZURE_STORAGE_BLOB_SERVICE_ENDPOINT, - DATASTORE_SYSROOT_AZURE, - DATASTORE_SYSROOT_GS, - CARD_AZUREROOT, - CARD_GSROOT, - DEFAULT_SECRETS_BACKEND_TYPE, - AWS_SECRETS_MANAGER_DEFAULT_REGION, - ARGO_WORKFLOWS_KUBERNETES_SECRETS, - ARGO_WORKFLOWS_ENV_VARS_TO_SKIP, + SERVICE_HEADERS, + SERVICE_INTERNAL_URL, ) from metaflow.mflog import BASH_SAVE_LOGS, bash_capture_logs, export_mflog_env_vars from metaflow.parameters import deploy_time_eval -from metaflow.util import compress_list, dict_to_cli_options, to_camelcase +from metaflow.util import ( + compress_list, + dict_to_cli_options, + to_bytes, + to_camelcase, + to_unicode, +) from .argo_client import ArgoClient @@ -47,16 +61,15 @@ class ArgoWorkflowsSchedulingException(MetaflowException): # List of future enhancements - # 1. Configure Argo metrics. -# 2. Support Argo Events. -# 3. Support resuming failed workflows within Argo Workflows. -# 4. Support gang-scheduled clusters for distributed PyTorch/TF - One option is to +# 2. Support resuming failed workflows within Argo Workflows. +# 3. Support gang-scheduled clusters for distributed PyTorch/TF - One option is to # use volcano - https://github.com/volcano-sh/volcano/tree/master/example/integrations/argo -# 5. Support GitOps workflows. -# 6. Add Metaflow tags to labels/annotations. -# 7. Support Multi-cluster scheduling - https://github.com/argoproj/argo-workflows/issues/3523#issuecomment-792307297 -# 8. Support for workflow notifications. -# 9. Support R lang. -# 10.Ping @savin at slack.outerbounds.co for any feature request. +# 4. Support GitOps workflows. +# 5. Add Metaflow tags to labels/annotations. +# 6. Support Multi-cluster scheduling - https://github.com/argoproj/argo-workflows/issues/3523#issuecomment-792307297 +# 7. Support for workflow notifications. +# 8. Support R lang. +# 9. Ping @savin at slack.outerbounds.co for any feature request. class ArgoWorkflows(object): @@ -79,6 +92,7 @@ def __init__( max_workers=None, workflow_timeout=None, workflow_priority=None, + auto_emit_argo_events=False, ): # Some high-level notes - # @@ -121,16 +135,21 @@ def __init__( self.max_workers = max_workers self.workflow_timeout = workflow_timeout self.workflow_priority = workflow_priority + self.auto_emit_argo_events = auto_emit_argo_events self.parameters = self._process_parameters() - self._workflow_template = self._compile() - self._cron = self._get_cron() + self.triggers, self.trigger_options = self._process_triggers() + self._schedule, self._timezone = self._get_schedule() + + self._workflow_template = self._compile_workflow_template() + self._sensor = self._compile_sensor() def __str__(self): return str(self._workflow_template) def deploy(self): try: + # Register workflow template. ArgoClient(namespace=KUBERNETES_NAMESPACE).register_workflow_template( self.name, self._workflow_template.to_json() ) @@ -177,32 +196,61 @@ def trigger(cls, name, parameters=None): except Exception as e: raise ArgoWorkflowsException(str(e)) - def _get_cron(self): + def _get_schedule(self): schedule = self.flow._flow_decorators.get("schedule") if schedule: # Remove the field "Year" if it exists schedule = schedule[0] return " ".join(schedule.schedule.split()[:5]), schedule.timezone - return None + return None, None def schedule(self): try: - if self._cron is None: - cron, timezone = None, None - else: - cron, timezone = self._cron ArgoClient(namespace=KUBERNETES_NAMESPACE).schedule_workflow_template( - self.name, cron, timezone + self.name, self._schedule, self._timezone + ) + # Register sensor. Unfortunately, Argo Events Sensor names don't allow for + # dots (sensors run into an error) which rules out self.name :( + # Metaflow will overwrite any existing sensor. + ArgoClient(namespace=KUBERNETES_NAMESPACE).register_sensor( + self.name.replace(".", "-"), + self._sensor.to_json() if self._sensor else {}, ) except Exception as e: raise ArgoWorkflowsSchedulingException(str(e)) def trigger_explanation(self): - if self._cron: + # Trigger explanation for cron workflows + if self.flow._flow_decorators.get("schedule"): return ( "This workflow triggers automatically via the CronWorkflow *%s*." % self.name ) + + # Trigger explanation for @trigger + elif self.flow._flow_decorators.get("trigger"): + return ( + "This workflow triggers automatically when the upstream %s " + "is/are published." + % self.list_to_prose( + [event["name"] for event in self.triggers], "event" + ) + ) + + # Trigger explanation for @trigger_on_finish + elif self.flow._flow_decorators.get("trigger_on_finish"): + return ( + "This workflow triggers automatically when the upstream %s succeed(s)" + % self.list_to_prose( + [ + # Truncate prefix `metaflow.` and suffix `.end` from event name + event["name"][len("metaflow.") : -len(".end")] + for event in self.triggers + ], + "flow", + ) + ) + else: return "No triggers defined. You need to launch this workflow manually." @@ -229,8 +277,8 @@ def get_existing_deployment(cls, name): return None def _process_parameters(self): - parameters = [] - has_schedule = self._get_cron() is not None + parameters = {} + has_schedule = self.flow._flow_decorators.get("schedule") is not None seen = set() for var, param in self.flow._get_parameters(): # Throw an exception if the parameter is specified twice. @@ -248,7 +296,6 @@ def _process_parameters(self): # parameters with no defaults. We currently don't have any notion # of data triggers in Argo Workflows. - # TODO: Support Argo Events for data triggering in the near future. if "default" not in param.kwargs and is_required and has_schedule: raise MetaflowException( "The parameter *%s* does not have a default and is required. " @@ -257,15 +304,130 @@ def _process_parameters(self): ) value = deploy_time_eval(param.kwargs.get("default")) # If the value is not required and the value is None, we set the value to - # the JSON equivalent of None to please argo-workflows. + # the JSON equivalent of None to please argo-workflows. Unfortunately it + # has the side effect of casting the parameter value to string null during + # execution - which needs to be fixed imminently. if not is_required or value is not None: value = json.dumps(value) - parameters.append( - dict(name=param.name, value=value, description=param.kwargs.get("help")) + parameters[param.name] = dict( + name=param.name, + value=value, + description=param.kwargs.get("help"), + is_required=is_required, ) return parameters - def _compile(self): + def _process_triggers(self): + # Impute triggers for Argo Workflow Template specified through @trigger and + # @trigger_on_finish decorators + + # Disallow usage of @trigger and @trigger_on_finish together for now. + if self.flow._flow_decorators.get("trigger") and self.flow._flow_decorators.get( + "trigger_on_finish" + ): + raise ArgoWorkflowsException( + "Argo Workflows doesn't support both *@trigger* and " + "*@trigger_on_finish* decorators concurrently yet. Use one or the " + "other for now." + ) + triggers = [] + options = None + + # @trigger decorator + if self.flow._flow_decorators.get("trigger"): + # Parameters are not duplicated, and exist in the flow. + # additionally, convert them to lower case since metaflow parameters are + # case insensitive. + seen = set() + params = set( + [param.name.lower() for var, param in self.flow._get_parameters()] + ) + for event in self.flow._flow_decorators.get("trigger")[0].triggers: + parameters = {} + # TODO: Add a check to guard against names starting with numerals(?) + if not re.match(r"^[A-Za-z0-9_.-]+$", event["name"]): + raise ArgoWorkflowsException( + "Invalid event name *%s* in *@trigger* decorator. Only " + "alphanumeric characters, underscores(_), dashes(-) and " + "dots(.) are allowed." % event["name"] + ) + for key, value in event.get("parameters", {}).items(): + if not re.match(r"^[A-Za-z0-9_]+$", value): + raise ArgoWorkflowsException( + "Invalid event payload key *%s* for event *%s* in " + "*@trigger* decorator. Only alphanumeric characters and " + "underscores(_) are allowed." % (value, event["name"]) + ) + if key.lower() not in params: + raise ArgoWorkflowsException( + "Parameter *%s* defined in the event mappings for " + "*@trigger* decorator not found in the flow." % key + ) + if key.lower() in seen: + raise ArgoWorkflowsException( + "Duplicate entries for parameter *%s* defined in the " + "event mappings for *@trigger* decorator." % key.lower() + ) + seen.add(key.lower()) + parameters[key.lower()] = value + event["parameters"] = parameters + event["type"] = "event" + triggers.extend(self.flow._flow_decorators.get("trigger")[0].triggers) + + # Set automatic parameter mapping iff only a single event dependency is + # specified with no explicit parameter mapping + if len(triggers) == 1 and not triggers[0].get("parameters"): + triggers[0]["parameters"] = dict(zip(params, params)) + options = self.flow._flow_decorators.get("trigger")[0].options + + # @trigger_on_finish decorator + if self.flow._flow_decorators.get("trigger_on_finish"): + for event in self.flow._flow_decorators.get("trigger_on_finish")[ + 0 + ].triggers: + # Actual filters are deduced here since we don't have access to + # the current object in the @trigger_on_finish decorator. + triggers.append( + { + "name": "metaflow.%s.end" + % ".".join( + v + for v in [ + event.get("project") or current.get("project_name"), + event.get("branch") or current.get("branch_name"), + event["flow"], + ] + if v + ), + "filters": { + "auto-generated-by-metaflow": True, + "project_name": event.get("project") + or current.get("project_name"), + "branch_name": event.get("branch") + or current.get("branch_name"), + # TODO: Add a time filters to guard against cached events + }, + "type": "run", + "flow": event["flow"], + } + ) + options = self.flow._flow_decorators.get("trigger_on_finish")[0].options + + for event in triggers: + # Assign a sanitized name since we need this at many places to please + # Argo Events sensors. There is a slight possibility of name collision + # but quite unlikely for us to worry about at this point. + event["sanitized_name"] = event["name"] + if any([x in event["name"] for x in [".", "-"]]): + event["sanitized_name"] = "%s_%s" % ( + event["name"].replace(".", "").replace("-", ""), + to_unicode( + base64.b32encode(sha1(to_bytes(event["name"])).digest()) + )[:4].lower(), + ) + return triggers, options + + def _compile_workflow_template(self): # This method compiles a Metaflow FlowSpec into Argo WorkflowTemplate # # WorkflowTemplate @@ -292,8 +454,6 @@ def _compile(self): # generate container templates at the top level (in WorkflowSpec) and maintain # references to them within the DAGTask. - labels = {"app.kubernetes.io/part-of": "metaflow"} - annotations = { "metaflow/production_token": self.production_token, "metaflow/owner": self.username, @@ -339,6 +499,8 @@ def _compile(self): # the size of the generated YAML by a tiny bit. # .automount_service_account_token() # TODO: Support ImagePullSecrets for Argo & Kubernetes + # Not strictly needed since a very valid workaround exists + # https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/#add-imagepullsecrets-to-a-service-account # .image_pull_secrets(...) # Limit workflow parallelism .parallelism(self.max_workers) @@ -368,7 +530,17 @@ def _compile(self): .value(parameter["value"]) .description(parameter.get("description")) # TODO: Better handle IncludeFile in Argo Workflows UI. - for parameter in self.parameters + for parameter in self.parameters.values() + ] + + [ + # Introduce non-required parameters for argo events so + # that the entire event payload can be accessed within the + # run. The parameter name is hashed to ensure that + # there won't be any collisions with Metaflow parameters. + Parameter(event["sanitized_name"]) + .value(json.dumps(None)) # None in Argo Workflows world. + .description("auto-set by metaflow. safe to ignore.") + for event in self.triggers ] ) ) @@ -603,13 +775,27 @@ def _container_templates(self): task_id = "$METAFLOW_TASK_ID" # Resolve retry strategy. - ( - user_code_retries, - total_retries, - retry_count, - minutes_between_retries, - ) = self._get_retries(node) + max_user_code_retries = 0 + max_error_retries = 0 + minutes_between_retries = "2" + for decorator in node.decorators: + if decorator.name == "retry": + minutes_between_retries = decorator.attributes.get( + "minutes_between_retries", minutes_between_retries + ) + user_code_retries, error_retries = decorator.step_task_retry_count() + max_user_code_retries = max(max_user_code_retries, user_code_retries) + max_error_retries = max(max_error_retries, error_retries) + + user_code_retries = max_user_code_retries + total_retries = max_user_code_retries + max_error_retries + # {{retries}} is only available if retryStrategy is specified + retry_count = ( + "{{retries}}" if max_user_code_retries + max_error_retries else 0 + ) + minutes_between_retries = int(minutes_between_retries) + # Configure log capture. mflog_expr = export_mflog_env_vars( datastore_type=self.flow_datastore.TYPE, stdout_path="$PWD/.logs/mflog_stdout", @@ -663,7 +849,8 @@ def _container_templates(self): "--event-logger=%s" % self.event_logger.TYPE, "--monitor=%s" % self.monitor.TYPE, "--no-pylint", - "--with=argo_workflows_internal", + "--with=argo_workflows_internal:auto-emit-argo-events=%s" + % self.auto_emit_argo_events, ] if node.name == "start": @@ -682,7 +869,7 @@ def _container_templates(self): # {{foo.bar['param_name']}} "--%s={{workflow.parameters.%s}}" % (parameter["name"], parameter["name"]) - for parameter in self.parameters + for parameter in self.parameters.values() ] ) if self.tags: @@ -749,8 +936,8 @@ def _container_templates(self): ): raise ArgoWorkflowsException( "Multi-namespace Kubernetes execution of flows in Argo Workflows " - "is not currently supported. \nStep *%s* is trying to override the " - "default Kubernetes namespace *%s*." + "is not currently supported. \nStep *%s* is trying to override " + "the default Kubernetes namespace *%s*." % (node.name, KUBERNETES_NAMESPACE) ) @@ -790,6 +977,15 @@ def _container_templates(self): "METAFLOW_RUNTIME_ENVIRONMENT": "kubernetes", "METAFLOW_OWNER": self.username, }, + **{ + # Configuration for Argo Events + # TODO: Move this to @kubernetes decorator instead. + "METAFLOW_ARGO_EVENTS_EVENT": ARGO_EVENTS_EVENT, + "METAFLOW_ARGO_EVENTS_EVENT_BUS": ARGO_EVENTS_EVENT_BUS, + "METAFLOW_ARGO_EVENTS_EVENT_SOURCE": ARGO_EVENTS_EVENT_SOURCE, + "METAFLOW_ARGO_EVENTS_SERVICE_ACCOUNT": ARGO_EVENTS_SERVICE_ACCOUNT, + "METAFLOW_ARGO_EVENTS_WEBHOOK_URL": ARGO_EVENTS_WEBHOOK_URL, + }, **{ # Some optional values for bookkeeping "METAFLOW_FLOW_NAME": self.flow.name, @@ -811,23 +1007,31 @@ def _container_templates(self): # support Metaflow sandboxes env["METAFLOW_INIT_SCRIPT"] = KUBERNETES_SANDBOX_INIT_SCRIPT - # secrets stuff + # support for @secret env["METAFLOW_DEFAULT_SECRETS_BACKEND_TYPE"] = DEFAULT_SECRETS_BACKEND_TYPE env[ "METAFLOW_AWS_SECRETS_MANAGER_DEFAULT_REGION" ] = AWS_SECRETS_MANAGER_DEFAULT_REGION - # Azure stuff + # support for Azure env[ "METAFLOW_AZURE_STORAGE_BLOB_SERVICE_ENDPOINT" ] = AZURE_STORAGE_BLOB_SERVICE_ENDPOINT env["METAFLOW_DATASTORE_SYSROOT_AZURE"] = DATASTORE_SYSROOT_AZURE env["METAFLOW_CARD_AZUREROOT"] = CARD_AZUREROOT - # GCP stuff + # support for GCP env["METAFLOW_DATASTORE_SYSROOT_GS"] = DATASTORE_SYSROOT_GS env["METAFLOW_CARD_GSROOT"] = CARD_GSROOT + # Map Argo Events payload (if any) to environment variables + if self.triggers: + for event in self.triggers: + env[ + "METAFLOW_ARGO_EVENT_PAYLOAD_%s_%s" + % (event["type"], event["sanitized_name"]) + ] = ("{{workflow.parameters.%s}}" % event["sanitized_name"]) + metaflow_version = self.environment.get_environment_info() metaflow_version["flow_name"] = self.graph.name metaflow_version["production_token"] = self.production_token @@ -860,11 +1064,11 @@ def _container_templates(self): # It makes no sense to set env vars to None (shows up as "None" string) # Also we skip some env vars (e.g. in case we want to pull them from KUBERNETES_SECRETS) - env_vars_to_skip = set(ARGO_WORKFLOWS_ENV_VARS_TO_SKIP.split(",")) env = { k: v for k, v in env.items() - if v is not None and k not in env_vars_to_skip + if v is not None + and k not in set(ARGO_WORKFLOWS_ENV_VARS_TO_SKIP.split(",")) } # Tmpfs variables @@ -1010,27 +1214,358 @@ def _container_templates(self): ) ) - def _get_retries(self, node): - max_user_code_retries = 0 - max_error_retries = 0 - minutes_between_retries = "2" - for deco in node.decorators: - if deco.name == "retry": - minutes_between_retries = deco.attributes.get( - "minutes_between_retries", minutes_between_retries - ) - user_code_retries, error_retries = deco.step_task_retry_count() - max_user_code_retries = max(max_user_code_retries, user_code_retries) - max_error_retries = max(max_error_retries, error_retries) + def _compile_sensor(self): + # This method compiles a Metaflow @trigger decorator into Argo Events Sensor. + # + # Event payload is assumed as - + # ---------------------------------------------------------------------- + # | name | name of the event | + # | payload | | + # | parameter name... | parameter value | + # | parameter name... | parameter value | + # | parameter name... | parameter value | + # | parameter name... | parameter value | + # ---------------------------------------------------------------------- + # + # + # + # At the moment, every event-triggered workflow template has a dedicated + # sensor (which can potentially be a bit wasteful in scenarios with high + # volume of workflows and low volume of events) - introducing a many-to-one + # sensor-to-workflow-template solution is completely in the realm of + # possibilities (modulo consistency and transactional guarantees). + # + # This implementation side-steps the more prominent/popular usage of event + # sensors where the sensor is responsible for submitting the workflow object + # directly. Instead we construct the equivalent behavior of `argo submit + # --from` to reference an already submitted workflow template. This ensures + # that Metaflow generated Kubernetes objects can be easily reasoned about. + # + # At the moment, Metaflow configures for webhook and NATS event sources. If you + # are interested in the HA story for either - please follow this link + # https://argoproj.github.io/argo-events/eventsources/ha/. + # + # There is some potential for confusion between Metaflow concepts and Argo + # Events concepts, particularly for event names. Argo Events EventSource + # define an event name which is different than the Metaflow event name - think + # of Argo Events name as a type of event (conceptually like topics in Kafka) + # while Metaflow event names are a field within the Argo Event. + # + # + # At the moment, there is parity between the labels and annotations for + # workflow templates and sensors - that may or may not be the case in the + # future. + # + # Unfortunately, there doesn't seem to be a way to create a sensor filter + # where one (or more) fields across multiple events have the same value. + # Imagine a scenario where we want to trigger a flow iff both the dependent + # events agree on the same date field. Unfortunately, there isn't any way in + # Argo Events (as of apr'23) to ensure that. + + # Nothing to do here - let's short circuit and exit. + if not self.triggers: + return {} + + # Ensure proper configuration is available for Argo Events + if ARGO_EVENTS_EVENT is None: + raise ArgoWorkflowsException( + "An Argo Event name hasn't been configured for your deployment yet. " + "Please see this article for more details on event names - " + "https://argoproj.github.io/argo-events/eventsources/naming/. " + "It is very likely that all events for your deployment share the " + "same name. You can configure it by executing " + "`metaflow configure events` or setting METAFLOW_ARGO_EVENTS_EVENT " + "in your configuration. If in doubt, reach out for support at " + "http://chat.metaflow.org" + ) + # Unfortunately argo events requires knowledge of event source today. + # Hopefully, some day this requirement can be removed and events can be truly + # impervious to their source and destination. + if ARGO_EVENTS_EVENT_SOURCE is None: + raise ArgoWorkflowsException( + "An Argo Event Source name hasn't been configured for your deployment " + "yet. Please see this article for more details on event names - " + "https://argoproj.github.io/argo-events/eventsources/naming/. " + "You can configure it by executing `metaflow configure events` or " + "setting METAFLOW_ARGO_EVENTS_EVENT_SOURCE in your configuration. If " + "in doubt, reach out for support at http://chat.metaflow.org" + ) + # Service accounts are a hard requirement since we utilize the + # argoWorkflow trigger for resource sensors today. + if ARGO_EVENTS_SERVICE_ACCOUNT is None: + raise ArgoWorkflowsException( + "An Argo Event service account hasn't been configured for your " + "deployment yet. Please see this article for more details on event " + "names - https://argoproj.github.io/argo-events/service-accounts/. " + "You can configure it by executing `metaflow configure events` or " + "setting METAFLOW_ARGO_EVENTS_SERVICE_ACCOUNT in your configuration. " + "If in doubt, reach out for support at http://chat.metaflow.org" + ) + + try: + # Kubernetes is a soft dependency for generating Argo objects. + # We can very well remove this dependency for Argo with the downside of + # adding a bunch more json bloat classes (looking at you... V1Container) + from kubernetes import client as kubernetes_sdk + except (NameError, ImportError): + raise MetaflowException( + "Could not import Python package 'kubernetes'. Install kubernetes " + "sdk (https://pypi.org/project/kubernetes/) first." + ) + + labels = {"app.kubernetes.io/part-of": "metaflow"} + + annotations = { + "metaflow/production_token": self.production_token, + "metaflow/owner": self.username, + "metaflow/user": "argo-workflows", + "metaflow/flow_name": self.flow.name, + } + if current.get("project_name"): + annotations.update( + { + "metaflow/project_name": current.project_name, + "metaflow/branch_name": current.branch_name, + "metaflow/project_flow_name": current.project_flow_name, + } + ) return ( - max_user_code_retries, - max_user_code_retries + max_error_retries, - # {{retries}} is only available if retryStrategy is specified - "{{retries}}" if max_user_code_retries + max_error_retries else 0, - int(minutes_between_retries), + Sensor() + .metadata( + # Sensor metadata. + ObjectMeta() + .name(self.name.replace(".", "-")) + .namespace(KUBERNETES_NAMESPACE) + .label("app.kubernetes.io/name", "metaflow-sensor") + .label("app.kubernetes.io/part-of", "metaflow") + .annotations(annotations) + ) + .spec( + SensorSpec().template( + # Sensor template. + SensorTemplate() + .metadata( + ObjectMeta() + .label("app.kubernetes.io/name", "metaflow-sensor") + .label("app.kubernetes.io/part-of", "metaflow") + .annotations(annotations) + ) + .container( + # Run sensor in guaranteed QoS. The sensor isn't doing a lot + # of work so we roll with minimal resource allocation. It is + # likely that in subsequent releases we will agressively lower + # sensor resources to pack more of them on a single node. + to_camelcase( + kubernetes_sdk.V1Container( + name="main", + resources=kubernetes_sdk.V1ResourceRequirements( + requests={ + "cpu": "100m", + "memory": "250Mi", + }, + limits={ + "cpu": "100m", + "memory": "250Mi", + }, + ), + ) + ) + ) + .service_account_name(ARGO_EVENTS_SERVICE_ACCOUNT) + # TODO (savin): Handle bypassing docker image rate limit errors. + ) + # Set sensor replica to 1 for now. + # TODO (savin): Allow for multiple replicas for HA. + .replicas(1) + # TODO: Support revision history limit to manage old deployments + # .revision_history_limit(...) + .event_bus_name(ARGO_EVENTS_EVENT_BUS) + # Workflow trigger. + .trigger( + Trigger().template( + TriggerTemplate(self.name) + # Trigger a deployed workflow template + .argo_workflow_trigger( + ArgoWorkflowTrigger() + .source( + { + "resource": { + "apiVersion": "argoproj.io/v1alpha1", + "kind": "Workflow", + "metadata": { + "generateName": "%s-" % self.name, + "namespace": KUBERNETES_NAMESPACE, + }, + "spec": { + "arguments": { + "parameters": [ + Parameter(parameter["name"]) + .value(parameter["value"]) + .to_json() + for parameter in self.parameters.values() + ] + # Also consume event data + + [ + Parameter(event["sanitized_name"]) + .value(json.dumps(None)) + .to_json() + for event in self.triggers + ] + }, + "workflowTemplateRef": { + "name": self.name, + }, + }, + } + } + ) + .parameters( + [ + y + for x in list( + list( + TriggerParameter() + .src( + dependency_name=event["sanitized_name"], + # Technically, we don't need to create + # a payload carry-on and can stuff + # everything within the body. + data_key="body.payload.%s" % v, + # Unfortunately the sensor needs to + # record the default values for + # the parameters - there doesn't seem + # to be any way for us to skip + value=self.parameters[parameter_name][ + "value" + ], + ) + .dest( + # this undocumented (mis?)feature in + # argo-events allows us to reference + # parameters by name rather than index + "spec.arguments.parameters.#(name=%s).value" + % parameter_name + ) + for parameter_name, v in event.get( + "parameters", {} + ).items() + ) + for event in self.triggers + ) + for y in x + ] + + [ + # Map event payload to parameters for current + TriggerParameter() + .src( + dependency_name=event["sanitized_name"], + data_key="body.payload", + value=json.dumps(None), + ) + .dest( + "spec.arguments.parameters.#(name=%s).value" + % event["sanitized_name"] + ) + for event in self.triggers + ] + ) + # Reset trigger conditions ever so often by wiping + # away event tracking history on a schedule. + # @trigger(options={"reset_at": {"cron": , "timezone": }}) + # timezone is IANA standard, e.g. America/Los_Angeles + # TODO: Introduce "end_of_day", "end_of_hour" .. + ).conditions_reset( + cron=self.trigger_options.get("reset_at", {}).get("cron"), + timezone=self.trigger_options.get("reset_at", {}).get( + "timezone" + ), + ) + ) + ) + # Event dependencies. As of Mar' 23, Argo Events docs suggest using + # Jetstream event bus rather than NATS streaming bus since the later + # doesn't support multiple combos of the same event name and event + # source name. + .dependencies( + # Event dependencies don't entertain dots + EventDependency(event["sanitized_name"]).event_name( + ARGO_EVENTS_EVENT + ) + # TODO: Alternatively fetch this from @trigger config options + .event_source_name(ARGO_EVENTS_EVENT_SOURCE).filters( + # Ensure that event name matches and all required parameter + # fields are present in the payload. There is a possibility of + # dependency on an event where none of the fields are required. + # At the moment, this event is required but the restriction + # can be removed if needed. + EventDependencyFilter().exprs( + [ + { + "expr": "name == '%s'" % event["name"], + "fields": [ + {"name": "name", "path": "body.payload.name"} + ], + } + ] + + [ + { + "expr": "true == true", # field name is present + "fields": [ + { + "name": "field", + "path": "body.payload.%s" % v, + } + ], + } + for parameter_name, v in event.get( + "parameters", {} + ).items() + # only for required parameters + if self.parameters[parameter_name]["is_required"] + ] + + [ + { + "expr": "field == '%s'" % v, # trigger_on_finish + "fields": [ + { + "name": "field", + "path": "body.payload.%s" % filter_key, + } + ], + } + for filter_key, v in event.get("filters", {}).items() + if v + ] + ) + ) + for event in self.triggers + ) + ) ) + def list_to_prose(self, items, singular): + items = ["*%s*" % item for item in items] + item_count = len(items) + plural = singular + "s" + item_type = singular + if item_count == 1: + result = items[0] + elif item_count == 2: + result = "%s and %s" % (items[0], items[1]) + item_type = plural + elif item_count > 2: + result = "%s and %s" % ( + ", ".join(items[0 : item_count - 1]), + items[item_count - 1], + ) + item_type = plural + else: + result = "" + if result: + result = "%s %s" % (result, item_type) + return result + # Helper classes to assist with JSON-foo. This can very well replaced with an explicit # dependency on argo-workflows Python SDK if this method turns out to be painful. @@ -1477,3 +2012,265 @@ def to_json(self): def __str__(self): return json.dumps(self.payload, indent=4) + + +class Sensor(object): + # https://github.com/argoproj/argo-events/blob/master/api/sensor.md#argoproj.io/v1alpha1.Sensor + + def __init__(self): + tree = lambda: defaultdict(tree) + self.payload = tree() + self.payload["apiVersion"] = "argoproj.io/v1alpha1" + self.payload["kind"] = "Sensor" + + def metadata(self, object_meta): + self.payload["metadata"] = object_meta.to_json() + return self + + def spec(self, sensor_spec): + self.payload["spec"] = sensor_spec.to_json() + return self + + def to_json(self): + return self.payload + + def __str__(self): + return json.dumps(self.payload, indent=4) + + +class SensorSpec(object): + # https://github.com/argoproj/argo-events/blob/master/api/sensor.md#argoproj.io/v1alpha1.SensorSpec + + def __init__(self): + tree = lambda: defaultdict(tree) + self.payload = tree() + + def replicas(self, replicas=1): + # TODO: Make number of deployment replicas configurable. + self.payload["replicas"] = int(replicas) + return self + + def template(self, sensor_template): + self.payload["template"] = sensor_template.to_json() + return self + + def trigger(self, trigger): + if "triggers" not in self.payload: + self.payload["triggers"] = [] + self.payload["triggers"].append(trigger.to_json()) + return self + + def dependencies(self, dependencies): + if "dependencies" not in self.payload: + self.payload["dependencies"] = [] + for dependency in dependencies: + self.payload["dependencies"].append(dependency.to_json()) + return self + + def event_bus_name(self, event_bus_name): + self.payload["eventBusName"] = event_bus_name + return self + + def to_json(self): + return self.payload + + def __str__(self): + return json.dumps(self.to_json(), indent=4) + + +class SensorTemplate(object): + # https://github.com/argoproj/argo-events/blob/master/api/sensor.md#argoproj.io/v1alpha1.Template + + def __init__(self): + tree = lambda: defaultdict(tree) + self.payload = tree() + + def service_account_name(self, service_account_name): + self.payload["serviceAccountName"] = service_account_name + return self + + def metadata(self, object_meta): + self.payload["metadata"] = object_meta.to_json() + return self + + def container(self, container): + # Luckily this can simply be V1Container and we are spared from writing more + # boilerplate - https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1Container.md. + self.payload["container"] = container + return self + + def to_json(self): + return self.payload + + def __str__(self): + return json.dumps(self.to_json(), indent=4) + + +class EventDependency(object): + # https://github.com/argoproj/argo-events/blob/master/api/sensor.md#argoproj.io/v1alpha1.EventDependency + + def __init__(self, name): + tree = lambda: defaultdict(tree) + self.payload = tree() + self.payload["name"] = name + + def event_source_name(self, event_source_name): + self.payload["eventSourceName"] = event_source_name + return self + + def event_name(self, event_name): + self.payload["eventName"] = event_name + return self + + def filters(self, event_dependency_filter): + self.payload["filters"] = event_dependency_filter.to_json() + return self + + def transform(self, event_dependency_transformer=None): + if event_dependency_transformer: + self.payload["transform"] = event_dependency_transformer + return self + + def filters_logical_operator(self, logical_operator): + self.payload["filtersLogicalOperator"] = logical_operator.to_json() + return self + + def to_json(self): + return self.payload + + def __str__(self): + return json.dumps(self.to_json(), indent=4) + + +class EventDependencyFilter(object): + # https://github.com/argoproj/argo-events/blob/master/api/sensor.md#argoproj.io/v1alpha1.EventDependencyFilter + + def __init__(self): + tree = lambda: defaultdict(tree) + self.payload = tree() + + def exprs(self, exprs): + self.payload["exprs"] = exprs + return self + + def context(self, event_context): + self.payload["context"] = event_context + return self + + def to_json(self): + return self.payload + + def __str__(self): + return json.dumps(self.to_json(), indent=4) + + +class Trigger(object): + # https://github.com/argoproj/argo-events/blob/master/api/sensor.md#argoproj.io/v1alpha1.Trigger + + def __init__(self): + tree = lambda: defaultdict(tree) + self.payload = tree() + + def template(self, trigger_template): + self.payload["template"] = trigger_template.to_json() + return self + + def parameters(self, trigger_parameters): + if "parameters" not in self.payload: + self.payload["parameters"] = [] + for trigger_parameter in trigger_parameters: + self.payload["parameters"].append(trigger_parameter.to_json()) + return self + + def policy(self, trigger_policy): + self.payload["policy"] = trigger_policy.to_json() + return self + + def to_json(self): + return self.payload + + def __str__(self): + return json.dumps(self.to_json(), indent=4) + + +class TriggerTemplate(object): + # https://github.com/argoproj/argo-events/blob/master/api/sensor.md#argoproj.io/v1alpha1.TriggerTemplate + + def __init__(self, name): + tree = lambda: defaultdict(tree) + self.payload = tree() + self.payload["name"] = name + + def argo_workflow_trigger(self, argo_workflow_trigger): + self.payload["argoWorkflow"] = argo_workflow_trigger.to_json() + return self + + def conditions_reset(self, cron, timezone): + if cron: + self.payload["conditionsReset"] = [ + {"byTime": {"cron": cron, "timezone": timezone}} + ] + return self + + def to_json(self): + return self.payload + + def __str__(self): + return json.dumps(self.payload, indent=4) + + +class ArgoWorkflowTrigger(object): + # https://github.com/argoproj/argo-events/blob/master/api/sensor.md#argoproj.io/v1alpha1.ArgoWorkflowTrigger + + def __init__(self): + tree = lambda: defaultdict(tree) + self.payload = tree() + self.payload["operation"] = "submit" + self.payload["group"] = "argoproj.io" + self.payload["version"] = "v1alpha1" + self.payload["resource"] = "workflows" + + def source(self, source): + self.payload["source"] = source + return self + + def parameters(self, trigger_parameters): + if "parameters" not in self.payload: + self.payload["parameters"] = [] + for trigger_parameter in trigger_parameters: + self.payload["parameters"].append(trigger_parameter.to_json()) + return self + + def to_json(self): + return self.payload + + def __str__(self): + return json.dumps(self.payload, indent=4) + + +class TriggerParameter(object): + # https://github.com/argoproj/argo-events/blob/master/api/sensor.md#argoproj.io/v1alpha1.TriggerParameter + + def __init__(self): + tree = lambda: defaultdict(tree) + self.payload = tree() + + def src(self, dependency_name, data_key, value): + self.payload["src"] = { + "dependencyName": dependency_name, + "dataKey": data_key, + "value": value, + # explicitly set it to false to ensure proper deserialization + "useRawData": False, + } + return self + + def dest(self, dest): + self.payload["dest"] = dest + return self + + def to_json(self): + return self.payload + + def __str__(self): + return json.dumps(self.payload, indent=4) diff --git a/metaflow/plugins/argo/argo_workflows_cli.py b/metaflow/plugins/argo/argo_workflows_cli.py index 0249344f457..29e8d1f21fb 100644 --- a/metaflow/plugins/argo/argo_workflows_cli.py +++ b/metaflow/plugins/argo/argo_workflows_cli.py @@ -8,11 +8,9 @@ from metaflow import JSONType, current, decorators, parameters from metaflow._vendor import click -from metaflow.metaflow_config import SERVICE_VERSION_CHECK, UI_URL from metaflow.exception import MetaflowException, MetaflowInternalError +from metaflow.metaflow_config import SERVICE_VERSION_CHECK, UI_URL from metaflow.package import MetaflowPackage -from metaflow.plugins.environment_decorator import EnvironmentDecorator -from metaflow.plugins.kubernetes.kubernetes_decorator import KubernetesDecorator # TODO: Move production_token to utils from metaflow.plugins.aws.step_functions.production_token import ( @@ -20,8 +18,10 @@ new_token, store_token, ) -from metaflow.util import get_username, to_bytes, to_unicode +from metaflow.plugins.environment_decorator import EnvironmentDecorator +from metaflow.plugins.kubernetes.kubernetes_decorator import KubernetesDecorator from metaflow.tagging_util import validate_tags +from metaflow.util import get_username, to_bytes, to_unicode from .argo_workflows import ArgoWorkflows @@ -55,7 +55,7 @@ def cli(): default=None, type=str, help="Argo Workflow name. The flow name is used instead if " - "this option is not specified", + "this option is not specified.", ) @click.pass_obj def argo_workflows(obj, name=None): @@ -129,6 +129,12 @@ def argo_workflows(obj, name=None): "are processed first if Argo Workflows controller is configured to process limited " "number of workflows in parallel", ) +@click.option( + "--auto-emit-argo-events/--no-auto-emit-argo-events", + default=True, # TODO: Default to a value from config + show_default=True, + help="Auto emits Argo Events when the run completes successfully", +) @click.pass_obj def create( obj, @@ -141,6 +147,7 @@ def create( max_workers=None, workflow_timeout=None, workflow_priority=None, + auto_emit_argo_events=False, ): validate_tags(tags) @@ -172,10 +179,12 @@ def create( max_workers, workflow_timeout, workflow_priority, + auto_emit_argo_events, ) if only_json: obj.echo_always(str(flow), err=False, no_bold=True) + # TODO: Support echo-ing Argo Events Sensor template else: flow.deploy() obj.echo( @@ -196,6 +205,8 @@ def create( obj.echo("What will trigger execution of the workflow:", bold=True) obj.echo(flow.trigger_explanation(), indent=True) + # TODO: Print events emitted by execution of this flow + # response = ArgoWorkflows.trigger(obj.workflow_name) # run_id = "argo-" + response["metadata"]["name"] @@ -325,7 +336,15 @@ def resolve_workflow_name(obj, name): def make_flow( - obj, token, name, tags, namespace, max_workers, workflow_timeout, workflow_priority + obj, + token, + name, + tags, + namespace, + max_workers, + workflow_timeout, + workflow_priority, + auto_emit_argo_events, ): # TODO: Make this check less specific to Amazon S3 as we introduce # support for more cloud object stores. @@ -371,6 +390,7 @@ def make_flow( username=get_username(), workflow_timeout=workflow_timeout, workflow_priority=workflow_priority, + auto_emit_argo_events=auto_emit_argo_events, ) diff --git a/metaflow/plugins/argo/argo_workflows_decorator.py b/metaflow/plugins/argo/argo_workflows_decorator.py index 26e72cfa807..8c0abede698 100644 --- a/metaflow/plugins/argo/argo_workflows_decorator.py +++ b/metaflow/plugins/argo/argo_workflows_decorator.py @@ -1,13 +1,21 @@ import json import os +import time +from metaflow import current from metaflow.decorators import StepDecorator +from metaflow.events import Trigger from metaflow.metadata import MetaDatum +from metaflow.metaflow_config import ARGO_EVENTS_WEBHOOK_URL + +from .argo_events import ArgoEvent class ArgoWorkflowsInternalDecorator(StepDecorator): name = "argo_workflows_internal" + defaults = {"auto-emit-argo-events": True} + def task_pre_step( self, step_name, @@ -23,10 +31,47 @@ def task_pre_step( inputs, ): self.task_id = task_id + self.run_id = run_id + + triggers = [] + # Expose event triggering metadata through current singleton + for key, payload in os.environ.items(): + if key.startswith("METAFLOW_ARGO_EVENT_PAYLOAD_"): + if payload != "null": # Argo-Workflow's None + try: + payload = json.loads(payload) + except (TypeError, ValueError) as e: + # There could be arbitrary events that Metaflow doesn't know of + payload = {} + triggers.append( + { + "timestamp": payload.get("timestamp"), + "id": payload.get("id"), + "name": payload.get("name"), # will exist since filter + "type": key[len("METAFLOW_ARGO_EVENT_PAYLOAD_") :].split( + "_", 1 + )[ + 0 + ] # infer type from env var key + # Add more event metadata here in the future + } + ) + meta = {} + if triggers: + # Enable current.trigger + current._update_env({"trigger": Trigger(triggers)}) + # Luckily there aren't many events for us to be concerned about the + # size of the metadata field yet! However we don't really need this + # metadata outside of the start step so we can save a few bytes in the + # db. + if step_name == "start": + meta["execution-triggers"] = json.dumps(triggers) + meta["argo-workflow-template"] = os.environ["ARGO_WORKFLOW_TEMPLATE"] meta["argo-workflow-name"] = os.environ["ARGO_WORKFLOW_NAME"] meta["argo-workflow-namespace"] = os.environ["ARGO_WORKFLOW_NAMESPACE"] + meta["auto-emit-argo-events"] = self.attributes["auto-emit-argo-events"] entries = [ MetaDatum( field=k, value=v, type=k, tags=["attempt_id:{0}".format(retry_count)] @@ -61,3 +106,44 @@ def task_finished( # by the next task here. with open("/mnt/out/task_id", "w") as file: file.write(self.task_id) + + # Emit Argo Events given that the flow has succeeded. Given that we only + # emit events when the task succeeds, we can piggy back on this decorator + # hook which is guaranteed to execute only after rest of the task has + # finished execution. + + if self.attributes["auto-emit-argo-events"]: + # Event name is set to metaflow.project.branch.step so that users can + # place explicit dependencies on namespaced events. Also, argo events + # sensors don't allow for filtering against absent fields - which limits + # our ability to subset non-project namespaced events. + # TODO: Check length limits for fields in Argo Events + event = ArgoEvent( + name="metaflow.%s.%s" + % (current.get("project_flow_name", flow.name), step_name) + ) + # There should only be one event generated even when the task is retried. + # Take care to only add to the list and not modify existing values. + event.add_to_payload("id", current.pathspec) + event.add_to_payload("pathspec", current.pathspec) + event.add_to_payload("flow_name", flow.name) + event.add_to_payload("run_id", self.run_id) + event.add_to_payload("step_name", step_name) + event.add_to_payload("task_id", self.task_id) + # Add @project decorator related fields. These are used to subset + # @trigger_on_finish related filters. + for key in ( + "project_name", + "branch_name", + "is_user_branch", + "is_production", + "project_flow_name", + ): + if current.get(key): + event.add_to_payload(key, current.get(key)) + # Add more fields here... + event.add_to_payload("auto-generated-by-metaflow", True) + # Keep in mind that any errors raised here will fail the run but the task + # will still be marked as success. That's why we explicitly swallow any + # errors and instead print them to std.err. + event.publish(ignore_errors=True) diff --git a/metaflow/plugins/argo/process_input_paths.py b/metaflow/plugins/argo/process_input_paths.py index e94b5a2669f..c74b8e73e6d 100644 --- a/metaflow/plugins/argo/process_input_paths.py +++ b/metaflow/plugins/argo/process_input_paths.py @@ -1,5 +1,5 @@ -import sys import re +import sys def process_input_paths(input_paths): diff --git a/metaflow/plugins/aws/step_functions/step_functions.py b/metaflow/plugins/aws/step_functions/step_functions.py index 0d2b793e8e4..5119e34e422 100644 --- a/metaflow/plugins/aws/step_functions/step_functions.py +++ b/metaflow/plugins/aws/step_functions/step_functions.py @@ -1,33 +1,33 @@ -import os -from collections import defaultdict -import sys import hashlib import json -import time -import string +import os import random +import string +import sys +import time import uuid +from collections import defaultdict -from metaflow.exception import MetaflowException, MetaflowInternalError -from metaflow.plugins.aws.batch.batch_decorator import BatchDecorator -from metaflow.plugins.resources_decorator import ResourcesDecorator -from metaflow.plugins.retry_decorator import RetryDecorator -from metaflow.parameters import deploy_time_eval +from metaflow import R from metaflow.decorators import flow_decorators -from metaflow.util import compress_list, dict_to_cli_options, to_pascalcase +from metaflow.exception import MetaflowException, MetaflowInternalError from metaflow.metaflow_config import ( - SFN_IAM_ROLE, EVENTS_SFN_ACCESS_IAM_ROLE, + S3_ENDPOINT_URL, SFN_DYNAMO_DB_TABLE, SFN_EXECUTION_LOG_GROUP_ARN, - S3_ENDPOINT_URL, + SFN_IAM_ROLE, ) -from metaflow import R +from metaflow.parameters import deploy_time_eval +from metaflow.plugins.aws.batch.batch_decorator import BatchDecorator +from metaflow.plugins.resources_decorator import ResourcesDecorator +from metaflow.plugins.retry_decorator import RetryDecorator +from metaflow.util import compress_list, dict_to_cli_options, to_pascalcase -from .step_functions_client import StepFunctionsClient -from .event_bridge_client import EventBridgeClient -from ..batch.batch import Batch from ..aws_utils import compute_resource_attributes +from ..batch.batch import Batch +from .event_bridge_client import EventBridgeClient +from .step_functions_client import StepFunctionsClient class StepFunctionsException(MetaflowException): @@ -227,6 +227,13 @@ def get_existing_deployment(cls, name): return None def _compile(self): + if self.flow._flow_decorators.get("trigger") or self.flow._flow_decorators.get( + "trigger_on_finish" + ): + raise StepFunctionsException( + "Deploying flows with @trigger or @trigger_on_finish decorator(s) " + "to AWS Step Functions is not supported currently." + ) # Visit every node of the flow and recursively build the state machine. def _visit(node, workflow, exit_node=None): if node.parallel_foreach: diff --git a/metaflow/plugins/events_decorator.py b/metaflow/plugins/events_decorator.py new file mode 100644 index 00000000000..90ab6c9293c --- /dev/null +++ b/metaflow/plugins/events_decorator.py @@ -0,0 +1,298 @@ +import json +import time +import re + +from metaflow import current +from metaflow.decorators import FlowDecorator +from metaflow.exception import MetaflowException +from metaflow.util import is_stringish + +# TODO: Support dynamic parameter mapping through a context object that exposes +# flow name and user name similar to parameter context + + +class TriggerDecorator(FlowDecorator): + """ + Specifies the event(s) that this flow depends on. + + ``` + @trigger(event='foo') + ``` + or + ``` + @trigger(events=['foo', 'bar']) + ``` + + Additionally, you can specify the parameter mappings + to map event payload to Metaflow parameters for the flow. + ``` + @trigger(event={'name':'foo', 'parameters':{'my_param': 'event_field'}) + ``` + or + ``` + @trigger(events=[{'name':'foo', 'parameters':{'my_param_1': 'event_field_1'}, + {'name':'bar', 'parameters':{'my_param_2': 'event_field_2'}]) + ``` + + Parameters + ---------- + event : Union[str, dict], optional + Event dependency for this flow. + events : List[Union[str, dict]], optional + Events dependency for this flow. + options : dict, optional + Backend-specific configuration for tuning eventing behavior. + """ + + name = "trigger" + defaults = { + "event": None, + "events": [], + "options": {}, + } + + def flow_init( + self, + flow_name, + graph, + environment, + flow_datastore, + metadata, + logger, + echo, + options, + ): + self.triggers = [] + if sum(map(bool, (self.attributes["event"], self.attributes["events"]))) > 1: + raise MetaflowException( + "Specify only one of *event* or *events* " + "attributes in *@trigger* decorator." + ) + elif self.attributes["event"]: + # event attribute supports the following formats - + # 1. event='table.prod_db.members' + # 2. event={'name': 'table.prod_db.members', + # 'parameters': {'alpha': 'member_weight'}} + if is_stringish(self.attributes["event"]): + self.triggers.append({"name": str(self.attributes["event"])}) + elif isinstance(self.attributes["event"], dict): + if "name" not in dict(self.attributes["event"]): + raise MetaflowException( + "The *event* attribute for *@trigger* is missing the " + "*name* key." + ) + self.triggers.append(self.attributes["event"]) + else: + raise MetaflowException( + "Incorrect format for *event* attribute in *@trigger* decorator. " + "Supported formats are string and dictionary - \n" + "@trigger(event='foo') or @trigger(event={'name': 'foo', " + "'parameters': {'alpha': 'beta'}})" + ) + elif self.attributes["events"]: + # events attribute supports the following formats - + # 1. events=[{'name': 'table.prod_db.members', + # 'parameters': {'alpha': 'member_weight'}}, + # {'name': 'table.prod_db.metadata', + # 'parameters': {'beta': 'grade'}}] + if isinstance(self.attributes["events"], list): + for event in self.attributes["events"]: + if is_stringish(event): + self.triggers.append({"name": str(event)}) + elif isinstance(event, dict): + if "name" not in dict(event): + raise MetaflowException( + "One or more events in *events* attribute for " + "*@trigger* are missing the *name* key." + ) + self.triggers.append(event) + else: + raise MetaflowException( + "One or more events in *events* attribute in *@trigger* " + "decorator have an incorrect format. Supported format " + "is dictionary - \n" + "@trigger(events=[{'name': 'foo', 'parameters': {'alpha': " + "'beta'}}, {'name': 'bar', 'parameters': " + "{'gamma': 'kappa'}}])" + ) + else: + raise MetaflowException( + "Incorrect format for *events* attribute in *@trigger* decorator. " + "Supported format is list - \n" + "@trigger(events=[{'name': 'foo', 'parameters': {'alpha': " + "'beta'}}, {'name': 'bar', 'parameters': " + "{'gamma': 'kappa'}}])" + ) + + if not self.triggers: + raise MetaflowException("No event(s) specified in *@trigger* decorator.") + + # same event shouldn't occur more than once + names = [x["name"] for x in self.triggers] + if len(names) != len(set(names)): + raise MetaflowException( + "Duplicate event names defined in *@trigger* decorator." + ) + + self.options = self.attributes["options"] + + # TODO: Handle scenario for local testing using --trigger. + + +class TriggerOnFinishDecorator(FlowDecorator): + """ + Specifies the flow(s) that this flow depends on. + + ``` + @trigger_on_finish(flow='FooFlow') + ``` + or + ``` + @trigger_on_finish(flows=['FooFlow', 'BarFlow']) + ``` + This decorator respects the @project decorator and triggers the flow + when upstream runs within the same namespace complete successfully + + Additionally, you can specify project aware upstream flow dependencies + by specifying the fully qualified project_flow_name. + ``` + @trigger_on_finish(flow='my_project.branch.my_branch.FooFlow') + ``` + or + ``` + @trigger_on_finish(flows=['my_project.branch.my_branch.FooFlow', 'BarFlow']) + ``` + + Parameters + ---------- + flow : str, optional + Upstream flow dependency for this flow. + flows : List[str], optional + Upstream flow dependencies for this flow. + options : dict, optional + Backend-specific configuration for tuning eventing behavior. + """ + + name = "trigger_on_finish" + defaults = { + "flow": None, # flow_name or project_flow_name + "flows": [], # flow_names or project_flow_names + "options": {}, + } + options = { + "trigger": dict( + multiple=True, + default=None, + help="Specify run pathspec for testing @trigger_on_finish locally.", + ), + } + + def flow_init( + self, + flow_name, + graph, + environment, + flow_datastore, + metadata, + logger, + echo, + options, + ): + self.triggers = [] + if sum(map(bool, (self.attributes["flow"], self.attributes["flows"]))) > 1: + raise MetaflowException( + "Specify only one of *flow* or *flows* " + "attributes in *@trigger_on_finish* decorator." + ) + elif self.attributes["flow"]: + # flow supports the format @trigger_on_finish(flow='FooFlow') + if is_stringish(self.attributes["flow"]): + self.triggers.append( + { + "fq_name": self.attributes["flow"], + } + ) + else: + raise MetaflowException( + "Incorrect type for *flow* attribute in *@trigger_on_finish* " + " decorator. Supported type is string - \n" + "@trigger_on_finish(flow='FooFlow')" + ) + elif self.attributes["flows"]: + # flows attribute supports the following formats - + # 1. flows=['FooFlow', 'BarFlow'] + if isinstance(self.attributes["flows"], list): + for flow in self.attributes["flows"]: + if is_stringish(flow): + self.triggers.append( + { + "fq_name": flow, + } + ) + else: + raise MetaflowException( + "One or more flows in *flows* attribute in " + "*@trigger_on_finish* decorator have an incorrect type. " + "Supported type is string - \n" + "@trigger_on_finish(flows=['FooFlow', 'BarFlow']" + ) + else: + raise MetaflowException( + "Incorrect type for *flows* attribute in *@trigger_on_finish* " + "decorator. Supported type is list - \n" + "@trigger_on_finish(flows=['FooFlow', 'BarFlow']" + ) + + if not self.triggers: + raise MetaflowException( + "No flow(s) specified in *@trigger_on_finish* decorator." + ) + + # Make triggers @project aware + for trigger in self.triggers: + if trigger["fq_name"].count(".") == 0: + # fully qualified name is just the flow name + trigger["flow"] = trigger["fq_name"] + elif trigger["fq_name"].count(".") in (2, 3): + # fully qualified name is of the format - project.branch.flow_name + trigger["project"], tail = trigger["fq_name"].split(".", maxsplit=1) + trigger["branch"], trigger["flow"] = tail.rsplit(".", maxsplit=1) + else: + raise MetaflowException( + "Incorrect format for *flow* in *@trigger_on_finish* " + "decorator. Specify either just the *flow_name* or a fully " + "qualified name like *project_name.branch_name.flow_name*." + ) + # TODO: Also sanity check project and branch names + if not re.match(r"^[A-Za-z0-9_]+$", trigger["flow"]): + raise MetaflowException( + "Invalid flow name *%s* in *@trigger_on_finish* " + "decorator. Only alphanumeric characters and " + "underscores(_) are allowed." % trigger["flow"] + ) + + self.options = self.attributes["options"] + + # Handle scenario for local testing using --trigger. + self._option_values = options + if options["trigger"]: + from metaflow import Run + from metaflow.events import Trigger + + run_objs = [] + for run_pathspec in options["trigger"]: + if len(run_pathspec.split("/")) != 2: + raise MetaflowException( + "Incorrect format for run pathspec for *--trigger*. " + "Supported format is flow_name/run_id." + ) + run_obj = Run(run_pathspec, _namespace_check=False) + if not run_obj.successful: + raise MetaflowException( + "*--trigger* does not support runs that are not successful yet." + ) + run_objs.append(run_obj) + current._update_env({"trigger": Trigger.from_runs(run_objs)}) + + def get_top_level_options(self): + return list(self._option_values.items()) diff --git a/metaflow/plugins/kubernetes/kubernetes.py b/metaflow/plugins/kubernetes/kubernetes.py index 2f9a24d9698..2c5a2bae569 100644 --- a/metaflow/plugins/kubernetes/kubernetes.py +++ b/metaflow/plugins/kubernetes/kubernetes.py @@ -7,30 +7,31 @@ from metaflow import current, util from metaflow.exception import MetaflowException from metaflow.metaflow_config import ( - SERVICE_HEADERS, - SERVICE_INTERNAL_URL, + ARGO_EVENTS_WEBHOOK_URL, + AWS_SECRETS_MANAGER_DEFAULT_REGION, + AZURE_STORAGE_BLOB_SERVICE_ENDPOINT, + CARD_AZUREROOT, + CARD_GSROOT, CARD_S3ROOT, + DATASTORE_SYSROOT_AZURE, + DATASTORE_SYSROOT_GS, DATASTORE_SYSROOT_S3, DATATOOLS_S3ROOT, DEFAULT_AWS_CLIENT_PROVIDER, DEFAULT_METADATA, + DEFAULT_SECRETS_BACKEND_TYPE, KUBERNETES_SANDBOX_INIT_SCRIPT, KUBERNETES_FETCH_EC2_METADATA, S3_ENDPOINT_URL, - AZURE_STORAGE_BLOB_SERVICE_ENDPOINT, - DATASTORE_SYSROOT_AZURE, - CARD_AZUREROOT, - CARD_GSROOT, - DATASTORE_SYSROOT_GS, - DEFAULT_SECRETS_BACKEND_TYPE, - AWS_SECRETS_MANAGER_DEFAULT_REGION, + SERVICE_HEADERS, + SERVICE_INTERNAL_URL, ) from metaflow.mflog import ( BASH_SAVE_LOGS, bash_capture_logs, export_mflog_env_vars, - tail_logs, get_log_tailer, + tail_logs, ) from .kubernetes_client import KubernetesClient @@ -157,7 +158,6 @@ def create_job( env=None, tolerations=None, ): - if env is None: env = {} @@ -238,6 +238,9 @@ def create_job( .environment_variable( "METAFLOW_INIT_SCRIPT", KUBERNETES_SANDBOX_INIT_SCRIPT ) + .environment_variable( + "METAFLOW_ARGO_EVENTS_WEBHOOK_URL", ARGO_EVENTS_WEBHOOK_URL + ) # Skip setting METAFLOW_DATASTORE_SYSROOT_LOCAL because metadata sync # between the local user instance and the remote Kubernetes pod # assumes metadata is stored in DATASTORE_LOCAL_DIR on the Kubernetes