From dd318b558813907bba88eeee23111c11a6719b9c Mon Sep 17 00:00:00 2001 From: savin Date: Sun, 12 Feb 2023 23:36:36 -0800 Subject: [PATCH] WIP: Integrate with Argo Events --- metaflow/plugins/__init__.py | 1 + metaflow/plugins/argo/argo_client.py | 63 +- .../plugins/argo/argo_events_decorator.py | 48 ++ metaflow/plugins/argo/argo_workflows.py | 547 ++++++++++++++++-- .../plugins/argo/argo_workflows_decorator.py | 3 + metaflow/plugins/argo/test.py | 17 + 6 files changed, 622 insertions(+), 57 deletions(-) create mode 100644 metaflow/plugins/argo/argo_events_decorator.py create mode 100644 metaflow/plugins/argo/test.py diff --git a/metaflow/plugins/__init__.py b/metaflow/plugins/__init__.py index 16767a88237..e9d9df7e221 100644 --- a/metaflow/plugins/__init__.py +++ b/metaflow/plugins/__init__.py @@ -56,6 +56,7 @@ ("conda_base", ".conda.conda_flow_decorator.CondaFlowDecorator"), ("schedule", ".aws.step_functions.schedule_decorator.ScheduleDecorator"), ("project", ".project_decorator.ProjectDecorator"), + ("trigger", ".argo.argo_events_decorator.ArgoEventsDecorator"), ] # Add environments here diff --git a/metaflow/plugins/argo/argo_client.py b/metaflow/plugins/argo/argo_client.py index 0f64ce0b02a..3cbed495d65 100644 --- a/metaflow/plugins/argo/argo_client.py +++ b/metaflow/plugins/argo/argo_client.py @@ -13,13 +13,13 @@ class ArgoClientException(MetaflowException): class ArgoClient(object): def __init__(self, namespace=None): - self._kubernetes_client = KubernetesClient() + self._client = KubernetesClient() self._namespace = namespace or "default" self._group = "argoproj.io" self._version = "v1alpha1" def get_workflow_template(self, name): - client = self._kubernetes_client.get() + client = self._client.get() try: return client.CustomObjectsApi().get_namespaced_custom_object( group=self._group, @@ -38,7 +38,7 @@ def get_workflow_template(self, name): def register_workflow_template(self, name, workflow_template): # Unfortunately, Kubernetes client does not handle optimistic # concurrency control by itself unlike kubectl - client = self._kubernetes_client.get() + client = self._client.get() try: workflow_template["metadata"][ "resourceVersion" @@ -88,7 +88,7 @@ def register_workflow_template(self, name, workflow_template): ) def trigger_workflow_template(self, name, parameters={}): - client = self._kubernetes_client.get() + client = self._client.get() body = { "apiVersion": "argoproj.io/v1alpha1", "kind": "Workflow", @@ -119,7 +119,7 @@ def trigger_workflow_template(self, name, parameters={}): def schedule_workflow_template(self, name, schedule=None, timezone=None): # Unfortunately, Kubernetes client does not handle optimistic # concurrency control by itself unlike kubectl - client = self._kubernetes_client.get() + client = self._client.get() body = { "apiVersion": "argoproj.io/v1alpha1", "kind": "CronWorkflow", @@ -181,3 +181,56 @@ def schedule_workflow_template(self, name, schedule=None, timezone=None): raise ArgoClientException( json.loads(e.body)["message"] if e.body is not None else e.reason ) + + def register_sensor(self, name, sensor=None): + # Unfortunately, Kubernetes client does not handle optimistic + # concurrency control by itself unlike kubectl + client = self._client.get() + try: + sensor["metadata"][ + "resourceVersion" + ] = client.CustomObjectsApi().get_namespaced_custom_object( + group=self._group, + version=self._version, + namespace=self._namespace, + plural="sensors", + name=name, + )[ + "metadata" + ][ + "resourceVersion" + ] + except client.rest.ApiException as e: + if e.status == 404: + try: + return client.CustomObjectsApi().create_namespaced_custom_object( + group=self._group, + version=self._version, + namespace=self._namespace, + plural="sensors", + body=sensor, + ) + except client.rest.ApiException as e: + raise ArgoClientException( + json.loads(e.body)["message"] + if e.body is not None + else e.reason + ) + else: + raise ArgoClientException( + json.loads(e.body)["message"] if e.body is not None else e.reason + ) + # TODO (savin): Implement sensor de-registration + try: + return client.CustomObjectsApi().replace_namespaced_custom_object( + group=self._group, + version=self._version, + namespace=self._namespace, + plural="sensors", + body=sensor, + name=name, + ) + except client.rest.ApiException as e: + raise ArgoClientException( + json.loads(e.body)["message"] if e.body is not None else e.reason + ) diff --git a/metaflow/plugins/argo/argo_events_decorator.py b/metaflow/plugins/argo/argo_events_decorator.py new file mode 100644 index 00000000000..ae58d5164e7 --- /dev/null +++ b/metaflow/plugins/argo/argo_events_decorator.py @@ -0,0 +1,48 @@ +import json +import re + +from metaflow.exception import MetaflowException +from metaflow.decorators import FlowDecorator +from metaflow import current +from metaflow.util import get_username +from metaflow import current +from metaflow.decorators import FlowDecorator +from metaflow.util import is_stringish + + +# TODO: At some point, lift this decorator interface to be a top-level decorator since +# the interface stays consistent for a similar implementation for AWS Step +# Functions and Airflow. +class ArgoEventsDecorator(FlowDecorator): + + name = "trigger" + defaults = { + # TODO (savin): Introduce support for flow-dependencies. + "event": None, + "events": [], + } + + def flow_init( + self, flow, graph, environment, flow_datastore, metadata, logger, echo, options + ): + self.events = [] + if self.attributes["event"]: + # event attribute supports the following formats - + # 1. event='table.prod_db.members' + # 2. event={'name': 'table.prod_db.members', + # 'parameters': {'alpha': 'member_weight'}} + if is_stringish(self.attributes["event"]): + self.events.append({"name": self.attributes["event"]}) + elif isinstance(self.attributes["event"], dict): + if "name" not in dict(self.attributes["event"]): + raise MetaflowException( + "The event attribute for @trigger is missing the name key." + ) + self.events.append(dict(self.attributes["event"])) + else: + raise MetaflowException( + "Incorrect format for event attribute in *@trigger* decorator. " + "Supported formats are string and dictionary - \n@trigger(" + "event='table.prod_db.members') or " + "@trigger(event={'name': 'table.prod_db.members', 'parameters': {'alpha': 'member_weight'}})" + ) diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index e707461c8dc..4b7020ea22b 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -46,16 +46,15 @@ class ArgoWorkflowsSchedulingException(MetaflowException): # List of future enhancements - # 1. Configure Argo metrics. -# 2. Support Argo Events. -# 3. Support resuming failed workflows within Argo Workflows. -# 4. Support gang-scheduled clusters for distributed PyTorch/TF - One option is to +# 2. Support resuming failed workflows within Argo Workflows. +# 3. Support gang-scheduled clusters for distributed PyTorch/TF - One option is to # use volcano - https://github.com/volcano-sh/volcano/tree/master/example/integrations/argo -# 5. Support GitOps workflows. -# 6. Add Metaflow tags to labels/annotations. -# 7. Support Multi-cluster scheduling - https://github.com/argoproj/argo-workflows/issues/3523#issuecomment-792307297 -# 8. Support for workflow notifications. -# 9. Support R lang. -# 10.Ping @savin at slack.outerbounds.co for any feature request. +# 4. Support GitOps workflows. +# 5. Add Metaflow tags to labels/annotations. +# 6. Support Multi-cluster scheduling - https://github.com/argoproj/argo-workflows/issues/3523#issuecomment-792307297 +# 7. Support for workflow notifications. +# 8. Support R lang. +# 9. Ping @savin at slack.outerbounds.co for any feature request. class ArgoWorkflows(object): @@ -122,17 +121,25 @@ def __init__( self.workflow_priority = workflow_priority self.parameters = self._process_parameters() - self._workflow_template = self._compile() - self._cron = self._get_cron() + self._schedule, self._timezone = self._get_schedule() + + self._workflow_template = self._compile_workflow_template() + self._sensor = self._compile_sensor() def __str__(self): return str(self._workflow_template) def deploy(self): try: + # Register workflow template. ArgoClient(namespace=KUBERNETES_NAMESPACE).register_workflow_template( self.name, self._workflow_template.to_json() ) + # Register sensor. + print(str(self._sensor)) + ArgoClient(namespace=KUBERNETES_NAMESPACE).register_sensor( + self.name, self._sensor.to_json() + ) except Exception as e: raise ArgoWorkflowsException(str(e)) @@ -176,28 +183,24 @@ def trigger(cls, name, parameters=None): except Exception as e: raise ArgoWorkflowsException(str(e)) - def _get_cron(self): + def _get_schedule(self): schedule = self.flow._flow_decorators.get("schedule") if schedule: # Remove the field "Year" if it exists schedule = schedule[0] return " ".join(schedule.schedule.split()[:5]), schedule.timezone - return None + return None, None def schedule(self): try: - if self._cron is None: - cron, timezone = None, None - else: - cron, timezone = self._cron ArgoClient(namespace=KUBERNETES_NAMESPACE).schedule_workflow_template( - self.name, cron, timezone + self.name, self._schedule, self._timezone ) except Exception as e: raise ArgoWorkflowsSchedulingException(str(e)) def trigger_explanation(self): - if self._cron: + if self.flow._flow_decorators.get("schedule"): return ( "This workflow triggers automatically via the CronWorkflow *%s*." % self.name @@ -228,8 +231,8 @@ def get_existing_deployment(cls, name): return None def _process_parameters(self): - parameters = [] - has_schedule = self._get_cron() is not None + parameters = {} + has_schedule = self.flow._flow_decorators.get("schedule") is not None seen = set() for var, param in self.flow._get_parameters(): # Throw an exception if the parameter is specified twice. @@ -247,24 +250,29 @@ def _process_parameters(self): # parameters with no defaults. We currently don't have any notion # of data triggers in Argo Workflows. - # TODO: Support Argo Events for data triggering in the near future. if "default" not in param.kwargs and is_required and has_schedule: raise MetaflowException( "The parameter *%s* does not have a default and is required. " "Scheduling such parameters via Argo CronWorkflows is not " "currently supported." % param.name ) + # TODO (savin): Check if the trigger decorator specifies a required but + # no-default parameter. value = deploy_time_eval(param.kwargs.get("default")) # If the value is not required and the value is None, we set the value to # the JSON equivalent of None to please argo-workflows. if not is_required or value is not None: value = json.dumps(value) - parameters.append( - dict(name=param.name, value=value, description=param.kwargs.get("help")) + parameters[param.name] = dict( + name=param.name, + value=value, + description=param.kwargs.get("help"), + is_required=is_required, ) + print(parameters) return parameters - def _compile(self): + def _compile_workflow_template(self): # This method compiles a Metaflow FlowSpec into Argo WorkflowTemplate # # WorkflowTemplate @@ -367,7 +375,7 @@ def _compile(self): .value(parameter["value"]) .description(parameter.get("description")) # TODO: Better handle IncludeFile in Argo Workflows UI. - for parameter in self.parameters + for parameter in self.parameters.values() ] ) ) @@ -602,13 +610,27 @@ def _container_templates(self): task_id = "$METAFLOW_TASK_ID" # Resolve retry strategy. - ( - user_code_retries, - total_retries, - retry_count, - minutes_between_retries, - ) = self._get_retries(node) + max_user_code_retries = 0 + max_error_retries = 0 + minutes_between_retries = "2" + for decorator in node.decorators: + if decorator.name == "retry": + minutes_between_retries = decorator.attributes.get( + "minutes_between_retries", minutes_between_retries + ) + user_code_retries, error_retries = decorator.step_task_retry_count() + max_user_code_retries = max(max_user_code_retries, user_code_retries) + max_error_retries = max(max_error_retries, error_retries) + + user_code_retries = max_user_code_retries + total_retries = max_user_code_retries + max_error_retries + # {{retries}} is only available if retryStrategy is specified + retry_count = ( + "{{retries}}" if max_user_code_retries + max_error_retries else 0 + ) + minutes_between_retries = int(minutes_between_retries) + # Configure log capture. mflog_expr = export_mflog_env_vars( datastore_type=self.flow_datastore.TYPE, stdout_path="$PWD/.logs/mflog_stdout", @@ -681,7 +703,7 @@ def _container_templates(self): # {{foo.bar['param_name']}} "--%s={{workflow.parameters.%s}}" % (parameter["name"], parameter["name"]) - for parameter in self.parameters + for parameter in self.parameters.values() ] ) if self.tags: @@ -858,11 +880,11 @@ def _container_templates(self): # It makes no sense to set env vars to None (shows up as "None" string) # Also we skip some env vars (e.g. in case we want to pull them from KUBERNETES_SECRETS) - env_vars_to_skip = set(ARGO_WORKFLOWS_ENV_VARS_TO_SKIP.split(",")) env = { k: v for k, v in env.items() - if v is not None and k not in env_vars_to_skip + if v is not None + and k not in set(ARGO_WORKFLOWS_ENV_VARS_TO_SKIP.split(",")) } # Create a ContainerTemplate for this node. Ideally, we would have @@ -981,25 +1003,200 @@ def _container_templates(self): ) ) - def _get_retries(self, node): - max_user_code_retries = 0 - max_error_retries = 0 - minutes_between_retries = "2" - for deco in node.decorators: - if deco.name == "retry": - minutes_between_retries = deco.attributes.get( - "minutes_between_retries", minutes_between_retries - ) - user_code_retries, error_retries = deco.step_task_retry_count() - max_user_code_retries = max(max_user_code_retries, user_code_retries) - max_error_retries = max(max_error_retries, error_retries) + def _compile_sensor(self): + # This method compiles a Metaflow @trigger decorator into Argo Events Sensor. + # + # Event payload is assumed as - + # ---------------------------------------------------------------------- + # | name | name of the event | + # | payload | | + # | parameter name... | parameter value | + # | parameter name... | parameter value | + # | parameter name... | parameter value | + # | parameter name... | parameter value | + # ---------------------------------------------------------------------- + # + # + # + # At the moment, every event-triggered workflow template has a dedicated + # sensor (which can potentially be a bit wasteful in scenarios with high + # volume of workflows and low volume of events) - introducing a many-to-one + # sensor-to-workflow-template solution is completely in the realm of + # possibilities (modulo consistency and transactional guarantees). + # + # This implementation side-steps the more prominent/popular usage of event + # sensors where the sensor is responsible for submitting the workflow object + # directly. Instead we construct the equivalent behavior of `argo submit + # --from` to reference an already submitted workflow template. This ensures + # that Metaflow generated Kubernetes objects can be easily reasoned about. + # + # At the moment, Metaflow configures for webhook and NATS event sources. If you + # are interested in the HA story for either - please follow this link + # https://argoproj.github.io/argo-events/eventsources/ha/. + # + # There is some potential for confusion between Metaflow concepts and Argo + # Events concepts - + # 1. Argo Events EventSource define an event name which is different than the + # Metaflow event name - think of Argo Events name as a type of + # event (conceptually like topics in Kafka) while Metaflow event names + # are a field within the Argo Event. + # + # + # TODO: 1. Ensure sensors are not running in a best effort QoS class. + # + # At the moment, there is parity between the labels and annotations for + # workflow templates and sensors - that may or may not be the case in the + # future. + + has_trigger = self.flow._flow_decorators.get("trigger") is not None + # Nothing to do here - let's short circuit and exit. + if not has_trigger: + return None + + events = self.flow._flow_decorators.get("trigger")[0].events + + labels = {"app.kubernetes.io/part-of": "metaflow"} + + annotations = { + "metaflow/production_token": self.production_token, + "metaflow/owner": self.username, + "metaflow/user": "argo-workflows", + "metaflow/flow_name": self.flow.name, + } + if current.get("project_name"): + annotations.update( + { + "metaflow/project_name": current.project_name, + "metaflow/branch_name": current.branch_name, + "metaflow/project_flow_name": current.project_flow_name, + } + ) return ( - max_user_code_retries, - max_user_code_retries + max_error_retries, - # {{retries}} is only available if retryStrategy is specified - "{{retries}}" if max_user_code_retries + max_error_retries else 0, - int(minutes_between_retries), + Sensor() + .metadata( + # Sensor metadata. + ObjectMeta() + .name(self.name) + .namespace(KUBERNETES_NAMESPACE) + .label("app.kubernetes.io/name", "metaflow-sensor") + .label("app.kubernetes.io/part-of", "metaflow") + .annotations(annotations) + ) + .spec( + SensorSpec().template( + # Sensor template. + SensorTemplate() + # TODO (savin): Make this configurable before shipping. + .service_account_name("operate-workflow-sa") + # TODO (savin): Run sensor in guaranteed QoS. + ) + # Set sensor replica to 1 for now. + # TODO (savin): Allow for multiple replicas for HA. + .replicas(1) + # TODO: Support revision history limit to manage old deployments + # .revision_history_limit(...) + # Workflow trigger. + .trigger( + Trigger().template( + TriggerTemplate(self.name) + # Trigger a deployed workflow template + .argo_workflow_trigger( + ArgoWorkflowTrigger() + .source( + { + "resource": { + "apiVersion": "argoproj.io/v1alpha1", + "kind": "Workflow", + "metadata": { + "generateName": "%s-" % self.name, + "namespace": KUBERNETES_NAMESPACE, + }, + "spec": { + "arguments": { + "parameters": [ + Parameter(parameter["name"]) + .value(parameter["value"]) + .to_json() + for parameter in self.parameters.values() + ] + }, + "workflowTemplateRef": { + "name": self.name, + }, + }, + } + } + ) + .parameters( + [ + y + for x in list( + list( + TriggerParameter() + .src( + dependency_name=event["name"], + data_key="body.payload.%s" % k, + # Unfortunately the sensor needs to + # record the default values for + # the parameters - there doesn't + # seem to be any way for us to + # skip + value=self.parameters[parameter_name][ + "value" + ], + ) + .dest( + "spec.arguments.parameters.#(name=%s).value" + % parameter_name + ) + for k, parameter_name in event.get( + "parameters", {} + ).items() + ) + for event in events + ) + for y in x + ] + ) + ) + ) + ) + # Event dependencies + .dependencies( + EventDependency(event["name"]) + .event_name("event") + .event_source_name("metaflow-webhook") + .filters( + # Ensure that event name matches and all parameter fields are + # present in the payload + EventDependencyFilter().exprs( + [ + { + "expr": "name == '%s'" % event["name"], + "fields": [{"name": "name", "path": "body.name"}], + } + ] + + [ + { + "expr": "true == true", # field name is present + "fields": [ + { + "name": "field", + "path": "body.payload.%s" % k, + } + ], + } + for k, parameter_name in event.get( + "parameters", {} + ).items() + if self.parameters[parameter_name]["is_required"] + ] + ) + ) + for event in events + ) + ) ) @@ -1424,3 +1621,249 @@ def to_json(self): def __str__(self): return json.dumps(self.payload, indent=4) + + +class Sensor(object): + # https://github.com/argoproj/argo-events/blob/master/api/sensor.md#argoproj.io/v1alpha1.Sensor + + def __init__(self): + tree = lambda: defaultdict(tree) + self.payload = tree() + self.payload["apiVersion"] = "argoproj.io/v1alpha1" + self.payload["kind"] = "Sensor" + + def metadata(self, object_meta): + self.payload["metadata"] = object_meta.to_json() + return self + + def spec(self, sensor_spec): + self.payload["spec"] = sensor_spec.to_json() + return self + + def to_json(self): + return self.payload + + def __str__(self): + return json.dumps(self.payload, indent=4) + + +class SensorSpec(object): + # https://github.com/argoproj/argo-events/blob/master/api/sensor.md#argoproj.io/v1alpha1.SensorSpec + + def __init__(self): + tree = lambda: defaultdict(tree) + self.payload = tree() + + def replicas(self, replicas=1): + # TODO: Make number of deployment replicas configurable. + self.payload["replicas"] = int(replicas) + return self + + def template(self, sensor_template): + self.payload["template"] = sensor_template.to_json() + return self + + def trigger(self, trigger): + if "triggers" not in self.payload: + self.payload["triggers"] = [] + self.payload["triggers"].append(trigger.to_json()) + return self + + def dependencies(self, dependencies): + if "dependencies" not in self.payload: + self.payload["dependencies"] = [] + for dependency in dependencies: + self.payload["dependencies"].append(dependency.to_json()) + return self + + def event_bus_name(self, event_bus_name): + self.payload["eventBusName"] = event_bus_name + return self + + def to_json(self): + return self.payload + + def __str__(self): + return json.dumps(self.to_json(), indent=4) + + +class SensorTemplate(object): + # https://github.com/argoproj/argo-events/blob/master/api/sensor.md#argoproj.io/v1alpha1.Template + + def __init__(self): + tree = lambda: defaultdict(tree) + self.payload = tree() + + def service_account_name(self, service_account_name): + self.payload["serviceAccountName"] = service_account_name + return self + + # Introduce more fields as needed. Remember to set image_pull_secrets in this + # template when configuring container image pull secrets. + + # TODO (savin): Introduce other fields to ensure container doesn't run in best + # effort QoS + + def to_json(self): + return self.payload + + def __str__(self): + return json.dumps(self.to_json(), indent=4) + + +class EventDependency(object): + # https://github.com/argoproj/argo-events/blob/master/api/sensor.md#argoproj.io/v1alpha1.EventDependency + + def __init__(self, name): + tree = lambda: defaultdict(tree) + self.payload = tree() + self.payload["name"] = name + + def event_source_name(self, event_source_name): + self.payload["eventSourceName"] = event_source_name + return self + + def event_name(self, event_name): + self.payload["eventName"] = event_name + return self + + def filters(self, event_dependency_filter): + self.payload["filters"] = event_dependency_filter.to_json() + return self + + def transform(self, event_dependency_transformer=None): + if event_dependency_transformer: + self.payload["transform"] = event_dependency_transformer + return self + + def filters_logical_operator(self, logical_operator): + self.payload["filtersLogicalOperator"] = logical_operator.to_json() + return self + + def to_json(self): + return self.payload + + def __str__(self): + return json.dumps(self.to_json(), indent=4) + + +class EventDependencyFilter(object): + # https://github.com/argoproj/argo-events/blob/master/api/sensor.md#argoproj.io/v1alpha1.EventDependencyFilter + + def __init__(self): + tree = lambda: defaultdict(tree) + self.payload = tree() + + def exprs(self, exprs): + self.payload["exprs"] = exprs + return self + + def to_json(self): + return self.payload + + def __str__(self): + return json.dumps(self.to_json(), indent=4) + + +class Trigger(object): + # https://github.com/argoproj/argo-events/blob/master/api/sensor.md#argoproj.io/v1alpha1.Trigger + + def __init__(self): + tree = lambda: defaultdict(tree) + self.payload = tree() + + def template(self, trigger_template): + self.payload["template"] = trigger_template.to_json() + return self + + def parameters(self, trigger_parameters): + if "parameters" not in self.payload: + self.payload["parameters"] = [] + for trigger_parameter in trigger_parameters: + self.payload["parameters"].append(trigger_parameter.to_json()) + return self + + def policy(self, trigger_policy): + self.payload["policy"] = trigger_policy.to_json() + return self + + def to_json(self): + return self.payload + + def __str__(self): + return json.dumps(self.to_json(), indent=4) + + +class TriggerTemplate(object): + # https://github.com/argoproj/argo-events/blob/master/api/sensor.md#argoproj.io/v1alpha1.TriggerTemplate + + def __init__(self, name): + tree = lambda: defaultdict(tree) + self.payload = tree() + self.payload["name"] = name + + def argo_workflow_trigger(self, argo_workflow_trigger): + self.payload["argoWorkflow"] = argo_workflow_trigger.to_json() + return self + + def to_json(self): + return self.payload + + def __str__(self): + return json.dumps(self.payload, indent=4) + + +class ArgoWorkflowTrigger(object): + # https://github.com/argoproj/argo-events/blob/master/api/sensor.md#argoproj.io/v1alpha1.ArgoWorkflowTrigger + + def __init__(self): + tree = lambda: defaultdict(tree) + self.payload = tree() + self.payload["operation"] = "submit" + self.payload["group"] = "argoproj.io" + self.payload["version"] = "v1alpha1" + self.payload["resource"] = "workflows" + + def source(self, source): + self.payload["source"] = source + return self + + def parameters(self, trigger_parameters): + if "parameters" not in self.payload: + self.payload["parameters"] = [] + for trigger_parameter in trigger_parameters: + self.payload["parameters"].append(trigger_parameter.to_json()) + return self + + def to_json(self): + return self.payload + + def __str__(self): + return json.dumps(self.payload, indent=4) + + +class TriggerParameter(object): + # https://github.com/argoproj/argo-events/blob/master/api/sensor.md#argoproj.io/v1alpha1.TriggerParameter + + def __init__(self): + tree = lambda: defaultdict(tree) + self.payload = tree() + + def src(self, dependency_name, data_key, value): + self.payload["src"] = { + "dependencyName": dependency_name, + "dataKey": data_key, + "value": value, + "useRawData": True, + } + return self + + def dest(self, dest): + self.payload["dest"] = dest + return self + + def to_json(self): + return self.payload + + def __str__(self): + return json.dumps(self.payload, indent=4) diff --git a/metaflow/plugins/argo/argo_workflows_decorator.py b/metaflow/plugins/argo/argo_workflows_decorator.py index 26e72cfa807..d0d66ad12eb 100644 --- a/metaflow/plugins/argo/argo_workflows_decorator.py +++ b/metaflow/plugins/argo/argo_workflows_decorator.py @@ -34,6 +34,9 @@ def task_pre_step( for k, v in meta.items() ] # Register book-keeping metadata for debugging. + + # TODO (savin): Also register Argo Events metadata if the flow was triggered + # through Argo Events. metadata.register_metadata(run_id, step_name, task_id, entries) def task_finished( diff --git a/metaflow/plugins/argo/test.py b/metaflow/plugins/argo/test.py new file mode 100644 index 00000000000..6f09e1ca327 --- /dev/null +++ b/metaflow/plugins/argo/test.py @@ -0,0 +1,17 @@ +import requests + + +class ArgoEventsClient(object): + + _payload = {} + + @classmethod + def add_to_event(cls, key, value): + cls._payload[key] = str(value) + + @classmethod + def emit_event(cls, name, params): + resp = requests.post( + url, headers={"content-type": "application/json"}, json=body + ) + resp.raise_for_status()