From eed0f0400fa415a1f37b09e8fc62bba5f25f2e32 Mon Sep 17 00:00:00 2001 From: savin Date: Mon, 15 May 2023 14:34:45 -0700 Subject: [PATCH 1/3] Support ArgoEvent object with @kubernetes --- metaflow/plugins/argo/argo_workflows.py | 2 +- metaflow/plugins/kubernetes/kubernetes.py | 16 ++++------------ 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index ee79f9d4cdb..882bfdbf601 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -980,7 +980,7 @@ def _container_templates(self): "METAFLOW_OWNER": self.username, }, **{ - # Configuration for Argo Events. Keep these in sync with the + # Configuration for Argo Events. Keep these in sync with the # environment variables for @kubernetes decorator. "METAFLOW_ARGO_EVENTS_EVENT": ARGO_EVENTS_EVENT, "METAFLOW_ARGO_EVENTS_EVENT_BUS": ARGO_EVENTS_EVENT_BUS, diff --git a/metaflow/plugins/kubernetes/kubernetes.py b/metaflow/plugins/kubernetes/kubernetes.py index 464c8bddfbe..6b5a82b7a64 100644 --- a/metaflow/plugins/kubernetes/kubernetes.py +++ b/metaflow/plugins/kubernetes/kubernetes.py @@ -252,19 +252,11 @@ def create_job( ) # Set environment variables to support metaflow.integrations.ArgoEvent - job.environment_variable( - "METAFLOW_ARGO_EVENTS_WEBHOOK_URL", ARGO_EVENTS_WEBHOOK_URL - ) + job.environment_variable("METAFLOW_ARGO_EVENTS_WEBHOOK_URL", ARGO_EVENTS_WEBHOOK_URL) job.environment_variable("METAFLOW_ARGO_EVENTS_EVENT", ARGO_EVENTS_EVENT) - job.environment_variable( - "METAFLOW_ARGO_EVENTS_EVENT_BUS", ARGO_EVENTS_EVENT_BUS - ) - job.environment_variable( - "METAFLOW_ARGO_EVENTS_EVENT_SOURCE", ARGO_EVENTS_EVENT_SOURCE - ) - job.environment_variable( - "METAFLOW_ARGO_EVENTS_SERVICE_ACCOUNT", ARGO_EVENTS_SERVICE_ACCOUNT - ) + job.environment_variable("METAFLOW_ARGO_EVENTS_EVENT_BUS", ARGO_EVENTS_EVENT_BUS) + job.environment_variable("METAFLOW_ARGO_EVENTS_EVENT_SOURCE",ARGO_EVENTS_EVENT_SOURCE) + job.environment_variable("METAFLOW_ARGO_EVENTS_SERVICE_ACCOUNT",ARGO_EVENTS_SERVICE_ACCOUNT) tmpfs_enabled = use_tmpfs or (tmpfs_size and not use_tmpfs) if tmpfs_enabled and tmpfs_tempdir: From 89813925c6ad4486f75bdae3273873e1f6f8f8fd Mon Sep 17 00:00:00 2001 From: savin Date: Mon, 15 May 2023 14:36:27 -0700 Subject: [PATCH 2/3] apply black --- metaflow/plugins/argo/argo_workflows.py | 2 +- metaflow/plugins/kubernetes/kubernetes.py | 16 ++++++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index 882bfdbf601..ee79f9d4cdb 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -980,7 +980,7 @@ def _container_templates(self): "METAFLOW_OWNER": self.username, }, **{ - # Configuration for Argo Events. Keep these in sync with the + # Configuration for Argo Events. Keep these in sync with the # environment variables for @kubernetes decorator. "METAFLOW_ARGO_EVENTS_EVENT": ARGO_EVENTS_EVENT, "METAFLOW_ARGO_EVENTS_EVENT_BUS": ARGO_EVENTS_EVENT_BUS, diff --git a/metaflow/plugins/kubernetes/kubernetes.py b/metaflow/plugins/kubernetes/kubernetes.py index 6b5a82b7a64..464c8bddfbe 100644 --- a/metaflow/plugins/kubernetes/kubernetes.py +++ b/metaflow/plugins/kubernetes/kubernetes.py @@ -252,11 +252,19 @@ def create_job( ) # Set environment variables to support metaflow.integrations.ArgoEvent - job.environment_variable("METAFLOW_ARGO_EVENTS_WEBHOOK_URL", ARGO_EVENTS_WEBHOOK_URL) + job.environment_variable( + "METAFLOW_ARGO_EVENTS_WEBHOOK_URL", ARGO_EVENTS_WEBHOOK_URL + ) job.environment_variable("METAFLOW_ARGO_EVENTS_EVENT", ARGO_EVENTS_EVENT) - job.environment_variable("METAFLOW_ARGO_EVENTS_EVENT_BUS", ARGO_EVENTS_EVENT_BUS) - job.environment_variable("METAFLOW_ARGO_EVENTS_EVENT_SOURCE",ARGO_EVENTS_EVENT_SOURCE) - job.environment_variable("METAFLOW_ARGO_EVENTS_SERVICE_ACCOUNT",ARGO_EVENTS_SERVICE_ACCOUNT) + job.environment_variable( + "METAFLOW_ARGO_EVENTS_EVENT_BUS", ARGO_EVENTS_EVENT_BUS + ) + job.environment_variable( + "METAFLOW_ARGO_EVENTS_EVENT_SOURCE", ARGO_EVENTS_EVENT_SOURCE + ) + job.environment_variable( + "METAFLOW_ARGO_EVENTS_SERVICE_ACCOUNT", ARGO_EVENTS_SERVICE_ACCOUNT + ) tmpfs_enabled = use_tmpfs or (tmpfs_size and not use_tmpfs) if tmpfs_enabled and tmpfs_tempdir: From 3899708c8631ffbbac163dae3b4d77a7e151cdbf Mon Sep 17 00:00:00 2001 From: savin Date: Mon, 15 May 2023 15:16:50 -0700 Subject: [PATCH 3/3] Introduce Argo Workflows UI Url in the deployment post-text --- metaflow/metaflow_config.py | 1 + metaflow/plugins/argo/argo_workflows_cli.py | 17 ++++++++++++++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/metaflow/metaflow_config.py b/metaflow/metaflow_config.py index 2cbb214bd4e..a0f0831ee14 100644 --- a/metaflow/metaflow_config.py +++ b/metaflow/metaflow_config.py @@ -302,6 +302,7 @@ ARGO_EVENTS_EVENT = from_conf("ARGO_EVENTS_EVENT") ARGO_EVENTS_WEBHOOK_URL = from_conf("ARGO_EVENTS_WEBHOOK_URL") +ARGO_WORKFLOWS_UI_URL = from_conf("ARGO_WORKFLOWS_UI_URL") ## # Airflow Configuration diff --git a/metaflow/plugins/argo/argo_workflows_cli.py b/metaflow/plugins/argo/argo_workflows_cli.py index 0ef28b5f339..4fd01c74486 100644 --- a/metaflow/plugins/argo/argo_workflows_cli.py +++ b/metaflow/plugins/argo/argo_workflows_cli.py @@ -9,7 +9,12 @@ from metaflow import JSONType, current, decorators, parameters from metaflow._vendor import click from metaflow.exception import MetaflowException, MetaflowInternalError -from metaflow.metaflow_config import SERVICE_VERSION_CHECK, UI_URL +from metaflow.metaflow_config import ( + SERVICE_VERSION_CHECK, + UI_URL, + ARGO_WORKFLOWS_UI_URL, + KUBERNETES_NAMESPACE, +) from metaflow.package import MetaflowPackage # TODO: Move production_token to utils @@ -201,6 +206,16 @@ def create( "due to Kubernetes naming conventions\non Argo Workflows. The " "original flow name is stored in the workflow annotation.\n" ) + if ARGO_WORKFLOWS_UI_URL: + obj.echo( + "See it in the Argo Workflows UI here - \n" + "%s/workflow-templates/%s/%s\n" + % ( + ARGO_WORKFLOWS_UI_URL.rstrip("/"), + KUBERNETES_NAMESPACE, + obj.workflow_name, + ) + ) flow.schedule() obj.echo("What will trigger execution of the workflow:", bold=True) obj.echo(flow.trigger_explanation(), indent=True)