Skip to content

Commit

Permalink
auto-emit events
Browse files Browse the repository at this point in the history
  • Loading branch information
savingoyal committed Mar 1, 2023
1 parent 7edf947 commit 69473f8
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 31 deletions.
1 change: 0 additions & 1 deletion metaflow/plugins/argo/argo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ class ArgoClientException(MetaflowException):

class ArgoClient(object):
def __init__(self, namespace=None):

self._client = KubernetesClient()
self._namespace = namespace or "default"
self._group = "argoproj.io"
Expand Down
38 changes: 38 additions & 0 deletions metaflow/plugins/argo/argo_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import urllib3
import json


class ArgoEvent(object):
def __init__(self, name, payload={}):
# TODO: Add guardrails for name if any
self.name = name
self._payload = payload

def add_to_payload(self, key, value):
self._payload[key] = str(value)
return self

def publish(self, payload={}, force=False):
# TODO: Emit only iff force or running on Argo Workflows
try:
# Handle scenarios where the URL is incorrect. Currently it hangs around
url = "http://10.10.29.11:12000/event"
request = urllib3.PoolManager().request(
"POST",
url,
headers={"Content-Type": "application/json"},
body=json.dumps(
{
# TODO: Ensure this schema is backwards compatible
"name": self.name,
"payload": {**self._payload, **payload},
}
),
timeout=30.0, # should be enough - still hangs though :(
)
# TODO: log the fact that event has been emitted
# TODO: should these logs be in mflogs or just orchestrator logs??
# TODO: what should happen if event can't be emitted
print(request.data)
except Exception as e:
print("Encountered excpetion while emitting argo event: %" % repr(e))
1 change: 0 additions & 1 deletion metaflow/plugins/argo/argo_events_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# the interface stays consistent for a similar implementation for AWS Step
# Functions and Airflow.
class ArgoEventsDecorator(FlowDecorator):

name = "trigger"
defaults = {
# TODO (savin): Introduce support for flow-dependencies.
Expand Down
5 changes: 4 additions & 1 deletion metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def __init__(
max_workers=None,
workflow_timeout=None,
workflow_priority=None,
auto_emit_argo_events=False,
):
# Some high-level notes -
#
Expand Down Expand Up @@ -119,6 +120,7 @@ def __init__(
self.max_workers = max_workers
self.workflow_timeout = workflow_timeout
self.workflow_priority = workflow_priority
self.auto_emit_argo_events = auto_emit_argo_events

self.parameters = self._process_parameters()
self._schedule, self._timezone = self._get_schedule()
Expand Down Expand Up @@ -684,7 +686,8 @@ def _container_templates(self):
"--event-logger=%s" % self.event_logger.TYPE,
"--monitor=%s" % self.monitor.TYPE,
"--no-pylint",
"--with=argo_workflows_internal",
"--with=argo_workflows_internal:auto-emit-argo-events=%s"
% self.auto_emit_argo_events,
]

if node.name == "start":
Expand Down
36 changes: 25 additions & 11 deletions metaflow/plugins/argo/argo_workflows_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ def cli():
default=None,
type=str,
help="Argo Workflow name. The flow name is used instead if "
"this option is not specified",
"this option is not specified. If this option is specified, "
"then any downstream workflows that need to depend on this "
"workflow via Argo Events should explicitly depend on the "
"event that matches to specified name (@trigger(event=...)) "
"rather than the flow (@trigger(flow=...)).",
)
@click.pass_obj
def argo_workflows(obj, name=None):
Expand Down Expand Up @@ -131,7 +135,7 @@ def argo_workflows(obj, name=None):
)
@click.option(
"--auto-emit-argo-events/--no-auto-emit-argo-events",
default=False,
default=True, # TODO: Default to a value from config
show_default=True,
help="Auto emits Argo Events when the run completes successfully",
)
Expand All @@ -147,7 +151,7 @@ def create(
max_workers=None,
workflow_timeout=None,
workflow_priority=None,
auto_emit_argo_events=None,
auto_emit_argo_events=False,
):
validate_tags(tags)

Expand Down Expand Up @@ -179,6 +183,7 @@ def create(
max_workers,
workflow_timeout,
workflow_priority,
auto_emit_argo_events,
)

if only_json:
Expand All @@ -203,14 +208,14 @@ def create(
obj.echo("What will trigger execution of the workflow:", bold=True)
obj.echo(flow.trigger_explanation(), indent=True)

# response = ArgoWorkflows.trigger(obj.workflow_name)
# run_id = "argo-" + response["metadata"]["name"]
response = ArgoWorkflows.trigger(obj.workflow_name)
run_id = "argo-" + response["metadata"]["name"]

# obj.echo(
# "Workflow *{name}* triggered on Argo Workflows "
# "(run-id *{run_id}*).".format(name=obj.workflow_name, run_id=run_id),
# bold=True,
# )
obj.echo(
"Workflow *{name}* triggered on Argo Workflows "
"(run-id *{run_id}*).".format(name=obj.workflow_name, run_id=run_id),
bold=True,
)


def check_python_version(obj):
Expand Down Expand Up @@ -332,7 +337,15 @@ def resolve_workflow_name(obj, name):


def make_flow(
obj, token, name, tags, namespace, max_workers, workflow_timeout, workflow_priority
obj,
token,
name,
tags,
namespace,
max_workers,
workflow_timeout,
workflow_priority,
auto_emit_argo_events,
):
# TODO: Make this check less specific to Amazon S3 as we introduce
# support for more cloud object stores.
Expand Down Expand Up @@ -378,6 +391,7 @@ def make_flow(
username=get_username(),
workflow_timeout=workflow_timeout,
workflow_priority=workflow_priority,
auto_emit_argo_events=auto_emit_argo_events,
)


Expand Down
23 changes: 23 additions & 0 deletions metaflow/plugins/argo/argo_workflows_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@
from metaflow.decorators import StepDecorator
from metaflow.metadata import MetaDatum

from .argo_events import ArgoEvent


class ArgoWorkflowsInternalDecorator(StepDecorator):
name = "argo_workflows_internal"

defaults = {"auto-emit-argo-events": True}

def task_pre_step(
self,
step_name,
Expand All @@ -23,6 +27,9 @@ def task_pre_step(
inputs,
):
self.task_id = task_id
self.run_id = run_id
self.argo_workflow_template = os.environ["ARGO_WORKFLOW_TEMPLATE"]

meta = {}
meta["argo-workflow-template"] = os.environ["ARGO_WORKFLOW_TEMPLATE"]
meta["argo-workflow-name"] = os.environ["ARGO_WORKFLOW_NAME"]
Expand Down Expand Up @@ -64,3 +71,19 @@ def task_finished(
# by the next task here.
with open("/mnt/out/task_id", "w") as file:
file.write(self.task_id)

# Emit Argo Events given that the flow has succeeded. Given that we only
# emit events when the flow succeeds, we can piggy back on this decorator
# hook which is guaranteed to execute only after rest of the task has
# finished execution.
if step_name == "end" and self.attributes["auto-emit-argo-events"]:
# Auto generated flow level events have the same name as the Argo Workflow
# Template that emitted them (which includes project/branch information).
name = self.argo_workflow_template
pathspec = "%s/%s" % (flow.name, self.run_id)

event = ArgoEvent(name=name)
event.add_to_payload("pathspec", "%s/%s" % (flow.name, self.run_id))
event.add_to_payload("auto-generated-by-metaflow", True)
# TODO: Add more fields
event.publish()
17 changes: 0 additions & 17 deletions metaflow/plugins/argo/test.py

This file was deleted.

0 comments on commit 69473f8

Please sign in to comment.