From 67e836742de9e897ad5c7a527516fc3c3efce87b Mon Sep 17 00:00:00 2001 From: tylerpotts Date: Tue, 6 Jun 2023 15:36:59 -0500 Subject: [PATCH 01/15] test annotations --- metaflow/metaflow_config.py | 2 ++ metaflow/plugins/kubernetes/kubernetes.py | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/metaflow/metaflow_config.py b/metaflow/metaflow_config.py index c66c0c25d32..05a816e4eae 100644 --- a/metaflow/metaflow_config.py +++ b/metaflow/metaflow_config.py @@ -280,6 +280,8 @@ KUBERNETES_SECRETS = from_conf("KUBERNETES_SECRETS", "") # Default labels for kubernetes pods KUBERNETES_LABELS = from_conf("KUBERNETES_LABELS", "") +# Default annotations for kubernetes pods +KUBERNETES_ANNOTATIONS = from_conf("KUBERNETES_ANNOTATIONS", "") # Default GPU vendor to use by K8S jobs created by Metaflow (supports nvidia, amd) KUBERNETES_GPU_VENDOR = from_conf("KUBERNETES_GPU_VENDOR", "nvidia") # Default container image for K8S diff --git a/metaflow/plugins/kubernetes/kubernetes.py b/metaflow/plugins/kubernetes/kubernetes.py index ccfd7d339ac..1fdf2aba16a 100644 --- a/metaflow/plugins/kubernetes/kubernetes.py +++ b/metaflow/plugins/kubernetes/kubernetes.py @@ -26,6 +26,7 @@ DEFAULT_AWS_CLIENT_PROVIDER, DEFAULT_METADATA, DEFAULT_SECRETS_BACKEND_TYPE, + KUBERNETES_ANNOTATIONS, KUBERNETES_FETCH_EC2_METADATA, KUBERNETES_LABELS, KUBERNETES_SANDBOX_INIT_SCRIPT, @@ -174,6 +175,9 @@ def create_job( job = ( KubernetesClient() .job( + annotations={ + "cluster-autoscaler.kubernetes.io/scale-down-disabled": "true" + }, generate_name="t-", namespace=namespace, service_account=service_account, From 94e65d87de73bc37cca0b852e7098ce1d9f265f0 Mon Sep 17 00:00:00 2001 From: tylerpotts Date: Tue, 6 Jun 2023 16:02:24 -0500 Subject: [PATCH 02/15] add configurable annotations --- metaflow/plugins/argo/argo_workflows.py | 4 +-- metaflow/plugins/kubernetes/kubernetes.py | 30 +++++++++++++++++------ test/unit/test_kubernetes.py | 10 ++++---- 3 files changed, 29 insertions(+), 15 deletions(-) diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index b6429755c73..3c6f7a15e97 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -43,7 +43,7 @@ from metaflow.parameters import deploy_time_eval from metaflow.plugins.kubernetes.kubernetes import ( parse_kube_keyvalue_list, - validate_kube_labels, + validate_kube_labels_or_annotations, ) from metaflow.util import ( compress_list, @@ -217,7 +217,7 @@ def _get_kubernetes_labels(): return {} env_labels = KUBERNETES_LABELS.split(",") env_labels = parse_kube_keyvalue_list(env_labels, False) - validate_kube_labels(env_labels) + validate_kube_labels_or_annotations(env_labels) return env_labels def _get_schedule(self): diff --git a/metaflow/plugins/kubernetes/kubernetes.py b/metaflow/plugins/kubernetes/kubernetes.py index 1fdf2aba16a..cee0ce2b52f 100644 --- a/metaflow/plugins/kubernetes/kubernetes.py +++ b/metaflow/plugins/kubernetes/kubernetes.py @@ -168,6 +168,7 @@ def create_job( persistent_volume_claims=None, tolerations=None, labels=None, + annotations=None, ): if env is None: env = {} @@ -175,9 +176,7 @@ def create_job( job = ( KubernetesClient() .job( - annotations={ - "cluster-autoscaler.kubernetes.io/scale-down-disabled": "true" - }, + annotations=self._get_annotations(annotations), generate_name="t-", namespace=namespace, service_account=service_account, @@ -411,23 +410,36 @@ def _get_labels(extra_labels=None): env_labels = KUBERNETES_LABELS.split(",") if KUBERNETES_LABELS else [] env_labels = parse_kube_keyvalue_list(env_labels, False) labels = {**env_labels, **extra_labels} - validate_kube_labels(labels) + validate_kube_labels_or_annotations(labels) return labels + @staticmethod + def _get_annotations(extra_annotations=None): + if extra_annotations is None: + extra_annotations = {} + env_annotations = ( + KUBERNETES_ANNOTATIONS.split(",") if KUBERNETES_ANNOTATIONS else [] + ) + env_annotations = parse_kube_keyvalue_list(env_annotations, False) + annotations = {**env_annotations, **extra_annotations} + validate_kube_annotations(annotations) + return annotations + -def validate_kube_labels( +def validate_kube_labels_or_annotations( labels: Optional[Dict[str, Optional[str]]], ) -> bool: """Validate label values. - This validates the kubernetes label values. It does not validate the keys. + This validates the kubernetes label/annotation values. It does not validate the keys. Ideally, keys should be static and also the validation rules for keys are more complex than those for values. For full validation rules, see: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set + https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/#syntax-and-character-set """ - def validate_label(s: Optional[str]): + def validate_label_annotation(s: Optional[str]): regex_match = r"^(([A-Za-z0-9][-A-Za-z0-9_.]{0,61})?[A-Za-z0-9])?$" if not s: # allow empty label @@ -442,7 +454,9 @@ def validate_label(s: Optional[str]): ) return True - return all([validate_label(v) for v in labels.values()]) if labels else True + return ( + all([validate_label_annotation(v) for v in labels.values()]) if labels else True + ) def parse_kube_keyvalue_list(items: List[str], requires_both: bool = True): diff --git a/test/unit/test_kubernetes.py b/test/unit/test_kubernetes.py index 2169bfd4e0b..94bf987aa83 100644 --- a/test/unit/test_kubernetes.py +++ b/test/unit/test_kubernetes.py @@ -2,7 +2,7 @@ from metaflow.plugins.kubernetes.kubernetes import ( KubernetesException, - validate_kube_labels, + validate_kube_labels_or_annotations, parse_kube_keyvalue_list, ) @@ -40,8 +40,8 @@ }, ], ) -def test_kubernetes_decorator_validate_kube_labels(labels): - assert validate_kube_labels(labels) +def test_kubernetes_decorator_validate_kube_labels_or_annotations(labels): + assert validate_kube_labels_or_annotations(labels) @pytest.mark.parametrize( @@ -65,10 +65,10 @@ def test_kubernetes_decorator_validate_kube_labels(labels): {"valid": "test", "invalid": "bißchen"}, ], ) -def test_kubernetes_decorator_validate_kube_labels_fail(labels): +def test_kubernetes_decorator_validate_kube_labels_or_annotations_fail(labels): """Fail if label contains invalid characters or is too long""" with pytest.raises(KubernetesException): - validate_kube_labels(labels) + validate_kube_labels_or_annotations(labels) @pytest.mark.parametrize( From 7bbcc1b66258ccdd3dfd25d65d273eec7e123325 Mon Sep 17 00:00:00 2001 From: tylerpotts Date: Tue, 6 Jun 2023 16:07:30 -0500 Subject: [PATCH 03/15] correct function name --- metaflow/plugins/kubernetes/kubernetes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metaflow/plugins/kubernetes/kubernetes.py b/metaflow/plugins/kubernetes/kubernetes.py index cee0ce2b52f..c6dcfce5f4a 100644 --- a/metaflow/plugins/kubernetes/kubernetes.py +++ b/metaflow/plugins/kubernetes/kubernetes.py @@ -422,7 +422,7 @@ def _get_annotations(extra_annotations=None): ) env_annotations = parse_kube_keyvalue_list(env_annotations, False) annotations = {**env_annotations, **extra_annotations} - validate_kube_annotations(annotations) + validate_kube_labels_or_annotations(annotations) return annotations From 2684fa2db1b24b2219d3a6a6a06975f773642611 Mon Sep 17 00:00:00 2001 From: tylerpotts Date: Tue, 6 Jun 2023 16:13:12 -0500 Subject: [PATCH 04/15] make extra annotations update existing ones --- metaflow/plugins/kubernetes/kubernetes.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/metaflow/plugins/kubernetes/kubernetes.py b/metaflow/plugins/kubernetes/kubernetes.py index c6dcfce5f4a..cb4e34b1af2 100644 --- a/metaflow/plugins/kubernetes/kubernetes.py +++ b/metaflow/plugins/kubernetes/kubernetes.py @@ -176,7 +176,6 @@ def create_job( job = ( KubernetesClient() .job( - annotations=self._get_annotations(annotations), generate_name="t-", namespace=namespace, service_account=service_account, @@ -296,6 +295,10 @@ def create_job( } ) + extra_annotations = self._get_annotations(annotations) + if extra_annotations: + annotations.update(extra_annotations) + for name, value in annotations.items(): job.annotation(name, value) From a3074d5cd8ccbcff088a1ef4f34cae63e2925b6d Mon Sep 17 00:00:00 2001 From: tylerpotts Date: Tue, 6 Jun 2023 16:24:36 -0500 Subject: [PATCH 05/15] correct logic for annotation flow --- metaflow/plugins/kubernetes/kubernetes.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/metaflow/plugins/kubernetes/kubernetes.py b/metaflow/plugins/kubernetes/kubernetes.py index cb4e34b1af2..371c762b18f 100644 --- a/metaflow/plugins/kubernetes/kubernetes.py +++ b/metaflow/plugins/kubernetes/kubernetes.py @@ -282,10 +282,14 @@ def create_job( for name, value in env.items(): job.environment_variable(name, value) - annotations = { - "metaflow/user": user, - "metaflow/flow_name": flow_name, - } + annotations = self._get_annotations(annotations) + annotations.update( + { + "metaflow/user": user, + "metaflow/flow_name": flow_name, + } + ) + if current.get("project_name"): annotations.update( { @@ -295,10 +299,6 @@ def create_job( } ) - extra_annotations = self._get_annotations(annotations) - if extra_annotations: - annotations.update(extra_annotations) - for name, value in annotations.items(): job.annotation(name, value) From 0be8b7bac897c4cff2443b00275f4028af137b3c Mon Sep 17 00:00:00 2001 From: tylerpotts Date: Tue, 20 Jun 2023 22:52:20 -0500 Subject: [PATCH 06/15] Add custom annotations for argo workflows --- metaflow/plugins/argo/argo_workflows.py | 31 +++++++++++++++++++------ 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index 3c6f7a15e97..b22b1bba026 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -29,6 +29,7 @@ DATATOOLS_S3ROOT, DEFAULT_METADATA, DEFAULT_SECRETS_BACKEND_TYPE, + KUBERNETES_ANNOTATIONS, KUBERNETES_FETCH_EC2_METADATA, KUBERNETES_LABELS, KUBERNETES_NAMESPACE, @@ -152,6 +153,7 @@ def __init__( self._schedule, self._timezone = self._get_schedule() self.kubernetes_labels = self._get_kubernetes_labels() + self.kubernetes_annotations = self._get_kubernetes_annotations() self._workflow_template = self._compile_workflow_template() self._sensor = self._compile_sensor() @@ -220,6 +222,19 @@ def _get_kubernetes_labels(): validate_kube_labels_or_annotations(env_labels) return env_labels + @staticmethod + def _get_kubernetes_annotations(): + """ + Get Kubernetes annotations from environment variable. + Parses the string into a dict and validates that values adhere to Kubernetes restrictions. + """ + if not KUBERNETES_ANNOTATIONS: + return {} + env_annotations = KUBERNETES_ANNOTATIONS.split(",") + env_annotations = parse_kube_keyvalue_list(env_annotations, False) + validate_kube_labels_or_annotations(env_annotations) + return env_annotations + def _get_schedule(self): schedule = self.flow._flow_decorators.get("schedule") if schedule: @@ -479,13 +494,15 @@ def _compile_workflow_template(self): # (https://github.com/argoproj/argo-workflows/issues/7432) and we are forced to # generate container templates at the top level (in WorkflowSpec) and maintain # references to them within the DAGTask. - - annotations = { - "metaflow/production_token": self.production_token, - "metaflow/owner": self.username, - "metaflow/user": "argo-workflows", - "metaflow/flow_name": self.flow.name, - } + annotations = self.kubernetes_annotations + annotations.update( + { + "metaflow/production_token": self.production_token, + "metaflow/owner": self.username, + "metaflow/user": "argo-workflows", + "metaflow/flow_name": self.flow.name, + } + ) if current.get("project_name"): annotations.update( { From c01d4abc645ec453f5f4c9ac172efb4b9bf6e5c9 Mon Sep 17 00:00:00 2001 From: tylerpotts Date: Mon, 26 Jun 2023 12:18:59 -0500 Subject: [PATCH 07/15] remove unused label/annotation parameters, clean up functions --- metaflow/plugins/kubernetes/kubernetes.py | 55 ++++++++++------------- 1 file changed, 24 insertions(+), 31 deletions(-) diff --git a/metaflow/plugins/kubernetes/kubernetes.py b/metaflow/plugins/kubernetes/kubernetes.py index 371c762b18f..9a05ed38ea6 100644 --- a/metaflow/plugins/kubernetes/kubernetes.py +++ b/metaflow/plugins/kubernetes/kubernetes.py @@ -167,8 +167,6 @@ def create_job( env=None, persistent_volume_claims=None, tolerations=None, - labels=None, - annotations=None, ): if env is None: env = {} @@ -202,7 +200,7 @@ def create_job( retries=0, step_name=step_name, tolerations=tolerations, - labels=self._get_labels(labels), + labels=self._get_labels(), use_tmpfs=use_tmpfs, tmpfs_tempdir=tmpfs_tempdir, tmpfs_size=tmpfs_size, @@ -282,22 +280,7 @@ def create_job( for name, value in env.items(): job.environment_variable(name, value) - annotations = self._get_annotations(annotations) - annotations.update( - { - "metaflow/user": user, - "metaflow/flow_name": flow_name, - } - ) - - if current.get("project_name"): - annotations.update( - { - "metaflow/project_name": current.project_name, - "metaflow/branch_name": current.branch_name, - "metaflow/project_flow_name": current.project_flow_name, - } - ) + annotations = self._get_annotations(user, flow_name, current) for name, value in annotations.items(): job.annotation(name, value) @@ -407,24 +390,34 @@ def wait_for_launch(job): ) @staticmethod - def _get_labels(extra_labels=None): - if extra_labels is None: - extra_labels = {} - env_labels = KUBERNETES_LABELS.split(",") if KUBERNETES_LABELS else [] - env_labels = parse_kube_keyvalue_list(env_labels, False) - labels = {**env_labels, **extra_labels} + def _get_labels(): + env_labels_list = KUBERNETES_LABELS.split(",") if KUBERNETES_LABELS else [] + labels = parse_kube_keyvalue_list(env_labels_list, False) validate_kube_labels_or_annotations(labels) return labels @staticmethod - def _get_annotations(extra_annotations=None): - if extra_annotations is None: - extra_annotations = {} - env_annotations = ( + def _get_annotations(user, flow_name, current): + env_annotations_list = ( KUBERNETES_ANNOTATIONS.split(",") if KUBERNETES_ANNOTATIONS else [] ) - env_annotations = parse_kube_keyvalue_list(env_annotations, False) - annotations = {**env_annotations, **extra_annotations} + annotations = parse_kube_keyvalue_list(env_annotations_list, False) + + annotations.update( + { + "metaflow/user": user, + "metaflow/flow_name": flow_name, + } + ) + + if current.get("project_name"): + annotations.update( + { + "metaflow/project_name": current.project_name, + "metaflow/branch_name": current.branch_name, + "metaflow/project_flow_name": current.project_flow_name, + } + ) validate_kube_labels_or_annotations(annotations) return annotations From 763bd1701588416b7b7f9568584778c61c9fbe75 Mon Sep 17 00:00:00 2001 From: tylerpotts Date: Mon, 26 Jun 2023 12:28:20 -0500 Subject: [PATCH 08/15] add annotations to sensors --- metaflow/plugins/argo/argo_workflows.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index b22b1bba026..7a0ec980638 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -1464,12 +1464,15 @@ def _compile_sensor(self): labels = {"app.kubernetes.io/part-of": "metaflow"} - annotations = { - "metaflow/production_token": self.production_token, - "metaflow/owner": self.username, - "metaflow/user": "argo-workflows", - "metaflow/flow_name": self.flow.name, - } + annotations = self._get_kubernetes_annotations() + annotations.update( + { + "metaflow/production_token": self.production_token, + "metaflow/owner": self.username, + "metaflow/user": "argo-workflows", + "metaflow/flow_name": self.flow.name, + } + ) if current.get("project_name"): annotations.update( { From 86f1412dc55a8e20ca12d08ec327795889662bad Mon Sep 17 00:00:00 2001 From: tylerpotts Date: Wed, 28 Jun 2023 11:20:58 -0500 Subject: [PATCH 09/15] undo reformatting of _get_annotations --- metaflow/plugins/argo/argo_workflows.py | 2 +- metaflow/plugins/kubernetes/kubernetes.py | 36 +++++++++++------------ 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index 7a0ec980638..a07833dae68 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -1464,7 +1464,7 @@ def _compile_sensor(self): labels = {"app.kubernetes.io/part-of": "metaflow"} - annotations = self._get_kubernetes_annotations() + annotations = self.kubernetes_annotations annotations.update( { "metaflow/production_token": self.production_token, diff --git a/metaflow/plugins/kubernetes/kubernetes.py b/metaflow/plugins/kubernetes/kubernetes.py index 9a05ed38ea6..4153426b14d 100644 --- a/metaflow/plugins/kubernetes/kubernetes.py +++ b/metaflow/plugins/kubernetes/kubernetes.py @@ -280,7 +280,23 @@ def create_job( for name, value in env.items(): job.environment_variable(name, value) - annotations = self._get_annotations(user, flow_name, current) + annotations = self._get_annotations() + + annotations.update( + { + "metaflow/user": user, + "metaflow/flow_name": flow_name, + } + ) + + if current.get("project_name"): + annotations.update( + { + "metaflow/project_name": current.project_name, + "metaflow/branch_name": current.branch_name, + "metaflow/project_flow_name": current.project_flow_name, + } + ) for name, value in annotations.items(): job.annotation(name, value) @@ -397,27 +413,11 @@ def _get_labels(): return labels @staticmethod - def _get_annotations(user, flow_name, current): + def _get_annotations(): env_annotations_list = ( KUBERNETES_ANNOTATIONS.split(",") if KUBERNETES_ANNOTATIONS else [] ) annotations = parse_kube_keyvalue_list(env_annotations_list, False) - - annotations.update( - { - "metaflow/user": user, - "metaflow/flow_name": flow_name, - } - ) - - if current.get("project_name"): - annotations.update( - { - "metaflow/project_name": current.project_name, - "metaflow/branch_name": current.branch_name, - "metaflow/project_flow_name": current.project_flow_name, - } - ) validate_kube_labels_or_annotations(annotations) return annotations From 74a334cc169118eaa9afe71a583ed3b94705f50e Mon Sep 17 00:00:00 2001 From: Sakari Ikonen Date: Thu, 29 Jun 2023 15:34:52 +0300 Subject: [PATCH 10/15] rework kubernetes labels support. Also support labels as a dict through decorator now --- metaflow/plugins/argo/argo_workflows.py | 38 ++++++------- metaflow/plugins/kubernetes/kubernetes.py | 45 +-------------- metaflow/plugins/kubernetes/kubernetes_cli.py | 10 +++- .../kubernetes/kubernetes_decorator.py | 57 ++++++++++++++++++- test/unit/test_kubernetes.py | 3 +- 5 files changed, 85 insertions(+), 68 deletions(-) diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index 9056c005c36..c31cfaaefb6 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -30,7 +30,6 @@ DEFAULT_METADATA, DEFAULT_SECRETS_BACKEND_TYPE, KUBERNETES_FETCH_EC2_METADATA, - KUBERNETES_LABELS, KUBERNETES_NAMESPACE, KUBERNETES_NODE_SELECTOR, KUBERNETES_SANDBOX_INIT_SCRIPT, @@ -42,10 +41,6 @@ ) from metaflow.mflog import BASH_SAVE_LOGS, bash_capture_logs, export_mflog_env_vars from metaflow.parameters import deploy_time_eval -from metaflow.plugins.kubernetes.kubernetes import ( - parse_kube_keyvalue_list, - validate_kube_labels, -) from metaflow.util import ( compress_list, dict_to_cli_options, @@ -262,18 +257,21 @@ def trigger(cls, name, parameters=None): except Exception as e: raise ArgoWorkflowsException(str(e)) - @staticmethod - def _get_kubernetes_labels(): + def _get_kubernetes_labels(self): """ - Get Kubernetes labels from environment variable. - Parses the string into a dict and validates that values adhere to Kubernetes restrictions. + Get Kubernetes labels from the start step decorator. """ - if not KUBERNETES_LABELS: - return {} - env_labels = KUBERNETES_LABELS.split(",") - env_labels = parse_kube_keyvalue_list(env_labels, False) - validate_kube_labels(env_labels) - return env_labels + + resources = dict( + [ + deco + for node in self.graph + if node.name == "start" + for deco in node.decorators + if deco.name == "kubernetes" + ][0].attributes + ) + return resources["labels"] or {} def _get_schedule(self): schedule = self.flow._flow_decorators.get("schedule") @@ -656,11 +654,7 @@ def _compile_workflow_template(self): ) # Set common pod metadata. .pod_metadata( - Metadata() - .label("app.kubernetes.io/name", "metaflow-task") - .label("app.kubernetes.io/part-of", "metaflow") - .annotations(annotations) - .labels(self.kubernetes_labels) + Metadata().annotations(annotations).labels(self.kubernetes_labels) ) # Set the entrypoint to flow name .entrypoint(self.flow.name) @@ -1259,13 +1253,15 @@ def _container_templates(self): minutes_between_retries=minutes_between_retries, ) .metadata( - ObjectMeta().annotation("metaflow/step_name", node.name) + ObjectMeta() + .annotation("metaflow/step_name", node.name) # Unfortunately, we can't set the task_id since it is generated # inside the pod. However, it can be inferred from the annotation # set by argo-workflows - `workflows.argoproj.io/outputs` - refer # the field 'task-id' in 'parameters' # .annotation("metaflow/task_id", ...) .annotation("metaflow/attempt", retry_count) + .labels(resources["labels"]) ) # Set emptyDir volume for state management .empty_dir_volume("out") diff --git a/metaflow/plugins/kubernetes/kubernetes.py b/metaflow/plugins/kubernetes/kubernetes.py index fa64bdf60a2..58b46754461 100644 --- a/metaflow/plugins/kubernetes/kubernetes.py +++ b/metaflow/plugins/kubernetes/kubernetes.py @@ -27,7 +27,6 @@ DEFAULT_METADATA, DEFAULT_SECRETS_BACKEND_TYPE, KUBERNETES_FETCH_EC2_METADATA, - KUBERNETES_LABELS, KUBERNETES_SANDBOX_INIT_SCRIPT, S3_ENDPOINT_URL, SERVICE_HEADERS, @@ -201,7 +200,7 @@ def create_job( retries=0, step_name=step_name, tolerations=tolerations, - labels=self._get_labels(labels), + labels=labels, use_tmpfs=use_tmpfs, tmpfs_tempdir=tmpfs_tempdir, tmpfs_size=tmpfs_size, @@ -307,8 +306,6 @@ def create_job( .annotation("metaflow/step_name", step_name) .annotation("metaflow/task_id", task_id) .annotation("metaflow/attempt", attempt) - .label("app.kubernetes.io/name", "metaflow-task") - .label("app.kubernetes.io/part-of", "metaflow") ) return job.create() @@ -406,46 +403,6 @@ def wait_for_launch(job): job_id=self._job.id, ) - @staticmethod - def _get_labels(extra_labels=None): - if extra_labels is None: - extra_labels = {} - env_labels = KUBERNETES_LABELS.split(",") if KUBERNETES_LABELS else [] - env_labels = parse_kube_keyvalue_list(env_labels, False) - labels = {**env_labels, **extra_labels} - validate_kube_labels(labels) - return labels - - -def validate_kube_labels( - labels: Optional[Dict[str, Optional[str]]], -) -> bool: - """Validate label values. - - This validates the kubernetes label values. It does not validate the keys. - Ideally, keys should be static and also the validation rules for keys are - more complex than those for values. For full validation rules, see: - - https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set - """ - - def validate_label(s: Optional[str]): - regex_match = r"^(([A-Za-z0-9][-A-Za-z0-9_.]{0,61})?[A-Za-z0-9])?$" - if not s: - # allow empty label - return True - if not re.search(regex_match, s): - raise KubernetesException( - 'Invalid value: "%s"\n' - "A valid label must be an empty string or one that\n" - " - Consist of alphanumeric, '-', '_' or '.' characters\n" - " - Begins and ends with an alphanumeric character\n" - " - Is at most 63 characters" % s - ) - return True - - return all([validate_label(v) for v in labels.values()]) if labels else True - def parse_kube_keyvalue_list(items: List[str], requires_both: bool = True): try: diff --git a/metaflow/plugins/kubernetes/kubernetes_cli.py b/metaflow/plugins/kubernetes/kubernetes_cli.py index 23bc2d3e601..ccd4ef7dea0 100644 --- a/metaflow/plugins/kubernetes/kubernetes_cli.py +++ b/metaflow/plugins/kubernetes/kubernetes_cli.py @@ -7,7 +7,7 @@ from metaflow._vendor import click from metaflow.exception import METAFLOW_EXIT_DISALLOW_RETRY, CommandException from metaflow.metadata.util import sync_local_metadata_from_datastore -from metaflow.metaflow_config import DATASTORE_LOCAL_DIR, KUBERNETES_LABELS +from metaflow.metaflow_config import DATASTORE_LOCAL_DIR from metaflow.mflog import TASK_LOG_SOURCE from .kubernetes import Kubernetes, KubernetesKilledException, parse_kube_keyvalue_list @@ -105,6 +105,12 @@ def kubernetes(): type=JSONTypeClass(), multiple=False, ) +@click.option( + "--labels", + default=None, + type=JSONTypeClass(), + multiple=False, +) @click.pass_context def step( ctx, @@ -130,6 +136,7 @@ def step( run_time_limit=None, persistent_volume_claims=None, tolerations=None, + labels=None, **kwargs ): def echo(msg, stream="stderr", job_id=None, **kwargs): @@ -244,6 +251,7 @@ def _sync_metadata(): env=env, persistent_volume_claims=persistent_volume_claims, tolerations=tolerations, + labels=labels, ) except Exception as e: traceback.print_exc(chain=False) diff --git a/metaflow/plugins/kubernetes/kubernetes_decorator.py b/metaflow/plugins/kubernetes/kubernetes_decorator.py index 4dcd7fd640a..68476533bbe 100644 --- a/metaflow/plugins/kubernetes/kubernetes_decorator.py +++ b/metaflow/plugins/kubernetes/kubernetes_decorator.py @@ -2,6 +2,8 @@ import os import platform import sys +import re +from typing import Optional, Dict from metaflow import current from metaflow.decorators import StepDecorator @@ -15,6 +17,7 @@ KUBERNETES_FETCH_EC2_METADATA, KUBERNETES_IMAGE_PULL_POLICY, KUBERNETES_GPU_VENDOR, + KUBERNETES_LABELS, KUBERNETES_NAMESPACE, KUBERNETES_NODE_SELECTOR, KUBERNETES_PERSISTENT_VOLUME_CLAIMS, @@ -66,6 +69,8 @@ class KubernetesDecorator(StepDecorator): in Metaflow configuration. tolerations : List[str], default: METAFLOW_KUBERNETES_TOLERATIONS Kubernetes tolerations to use when launching pod in Kubernetes. + labels: Dict[str, str], default: METAFLOW_KUBERNETES_LABELS + Kubernetes labels to use when launching pod in Kubernetes. use_tmpfs: bool, default: False This enables an explicit tmpfs mount for this step. tmpfs_tempdir: bool, default: True @@ -96,6 +101,7 @@ class KubernetesDecorator(StepDecorator): "gpu_vendor": None, "tolerations": None, # e.g., [{"key": "arch", "operator": "Equal", "value": "amd"}, # {"key": "foo", "operator": "Equal", "value": "bar"}] + "labels": None, # e.g. {"test-label": "value", "another-label":"value2"} "use_tmpfs": None, "tmpfs_tempdir": True, "tmpfs_size": None, @@ -158,6 +164,23 @@ def __init__(self, attributes=None, statically_defined=False): except (NameError, ImportError): pass + # Order of label sources for overriding: + # System > Environment variable > Decorator attributes + if not self.attributes["labels"]: + self.attributes["labels"] = {} + + if KUBERNETES_LABELS: + env_labels = parse_kube_keyvalue_list( + KUBERNETES_LABELS.split(",") if KUBERNETES_LABELS else [], False + ) + self.attributes["labels"].update(env_labels) + + system_labels = { + "app.kubernetes.io/name": "metaflow-task", + "app.kubernetes.io/part-of": "metaflow", + } + self.attributes["labels"].update(system_labels) + # If no docker image is explicitly specified, impute a default image. if not self.attributes["image"]: # If metaflow-config specifies a docker image, just use that. @@ -281,6 +304,8 @@ def step_init(self, flow, graph, step, decos, environment, flow_datastore, logge ) ) + validate_kube_labels(self.attributes["labels"]) + def package_init(self, flow, step_name, environment): try: # Kubernetes is a soft dependency. @@ -332,7 +357,7 @@ def runtime_step_cli( "=".join([key, str(val)]) if val else key for key, val in v.items() ] - elif k in ["tolerations", "persistent_volume_claims"]: + elif k in ["tolerations", "persistent_volume_claims", "labels"]: cli_args.command_options[k] = json.dumps(v) else: cli_args.command_options[k] = v @@ -443,3 +468,33 @@ def _save_package_once(cls, flow_datastore, package): cls.package_url, cls.package_sha = flow_datastore.save_data( [package.blob], len_hint=1 )[0] + + +def validate_kube_labels( + labels: Optional[Dict[str, Optional[str]]], +) -> bool: + """Validate label values. + + This validates the kubernetes label values. It does not validate the keys. + Ideally, keys should be static and also the validation rules for keys are + more complex than those for values. For full validation rules, see: + + https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set + """ + + def validate_label(s: Optional[str]): + regex_match = r"^(([A-Za-z0-9][-A-Za-z0-9_.]{0,61})?[A-Za-z0-9])?$" + if not s: + # allow empty label + return True + if not re.search(regex_match, s): + raise KubernetesException( + 'Invalid value: "%s"\n' + "A valid label must be an empty string or one that\n" + " - Consist of alphanumeric, '-', '_' or '.' characters\n" + " - Begins and ends with an alphanumeric character\n" + " - Is at most 63 characters" % s + ) + return True + + return all([validate_label(v) for v in labels.values()]) if labels else True diff --git a/test/unit/test_kubernetes.py b/test/unit/test_kubernetes.py index 2169bfd4e0b..2cab15f946f 100644 --- a/test/unit/test_kubernetes.py +++ b/test/unit/test_kubernetes.py @@ -2,10 +2,11 @@ from metaflow.plugins.kubernetes.kubernetes import ( KubernetesException, - validate_kube_labels, parse_kube_keyvalue_list, ) +from metaflow.plugins.kubernetes.kubernetes_decorator import validate_kube_labels + @pytest.mark.parametrize( "labels", From 13f4cc227af3cdd410971e8bffed32fe2aa9b38f Mon Sep 17 00:00:00 2001 From: Sakari Ikonen Date: Thu, 6 Jul 2023 12:38:49 +0300 Subject: [PATCH 11/15] rework label precedence --- .../kubernetes/kubernetes_decorator.py | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/metaflow/plugins/kubernetes/kubernetes_decorator.py b/metaflow/plugins/kubernetes/kubernetes_decorator.py index 68476533bbe..4220583dda8 100644 --- a/metaflow/plugins/kubernetes/kubernetes_decorator.py +++ b/metaflow/plugins/kubernetes/kubernetes_decorator.py @@ -164,22 +164,24 @@ def __init__(self, attributes=None, statically_defined=False): except (NameError, ImportError): pass - # Order of label sources for overriding: - # System > Environment variable > Decorator attributes - if not self.attributes["labels"]: - self.attributes["labels"] = {} - + # Label source precedence (decreasing): + # - System labels + # - Decorator labels: @kubernetes(labels={}) + # - Environment variable labels: METAFLOW_KUBERNETES_LABELS= + deco_labels = {} + if self.attributes["labels"] is not None: + deco_labels = self.attributes["labels"] + + env_labels = {} if KUBERNETES_LABELS: - env_labels = parse_kube_keyvalue_list( - KUBERNETES_LABELS.split(",") if KUBERNETES_LABELS else [], False - ) - self.attributes["labels"].update(env_labels) + env_labels = parse_kube_keyvalue_list(KUBERNETES_LABELS.split(","), False) system_labels = { "app.kubernetes.io/name": "metaflow-task", "app.kubernetes.io/part-of": "metaflow", } - self.attributes["labels"].update(system_labels) + + self.attributes["labels"] = {**env_labels, **deco_labels, **system_labels} # If no docker image is explicitly specified, impute a default image. if not self.attributes["image"]: From 8aad2d73c56860a0ec8dd92c7b0e8f181efbc561 Mon Sep 17 00:00:00 2001 From: Sakari Ikonen Date: Thu, 6 Jul 2023 13:02:02 +0300 Subject: [PATCH 12/15] label precedence fix for sensors --- metaflow/plugins/argo/argo_workflows.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index c31cfaaefb6..ad6fb9a6f54 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -654,7 +654,11 @@ def _compile_workflow_template(self): ) # Set common pod metadata. .pod_metadata( - Metadata().annotations(annotations).labels(self.kubernetes_labels) + Metadata() + .labels(self.kubernetes_labels) + .label("app.kubernetes.io/name", "metaflow-task") + .label("app.kubernetes.io/part-of", "metaflow") + .annotations(annotations) ) # Set the entrypoint to flow name .entrypoint(self.flow.name) @@ -1553,9 +1557,9 @@ def _compile_sensor(self): ObjectMeta() .name(self.name.replace(".", "-")) .namespace(KUBERNETES_NAMESPACE) + .labels(self.kubernetes_labels) .label("app.kubernetes.io/name", "metaflow-sensor") .label("app.kubernetes.io/part-of", "metaflow") - .labels(self.kubernetes_labels) .annotations(annotations) ) .spec( From a9e4bb400113aa68de0210bd63dd9ead2c2e5a7e Mon Sep 17 00:00:00 2001 From: Sakari Ikonen Date: Fri, 14 Jul 2023 11:52:26 +0300 Subject: [PATCH 13/15] rename kube validator --- metaflow/plugins/kubernetes/kubernetes_decorator.py | 4 ++-- test/unit/test_kubernetes.py | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/metaflow/plugins/kubernetes/kubernetes_decorator.py b/metaflow/plugins/kubernetes/kubernetes_decorator.py index 4220583dda8..6b10f43c95a 100644 --- a/metaflow/plugins/kubernetes/kubernetes_decorator.py +++ b/metaflow/plugins/kubernetes/kubernetes_decorator.py @@ -306,7 +306,7 @@ def step_init(self, flow, graph, step, decos, environment, flow_datastore, logge ) ) - validate_kube_labels(self.attributes["labels"]) + validate_kube_labels_or_annotations(self.attributes["labels"]) def package_init(self, flow, step_name, environment): try: @@ -472,7 +472,7 @@ def _save_package_once(cls, flow_datastore, package): )[0] -def validate_kube_labels( +def validate_kube_labels_or_annotations( labels: Optional[Dict[str, Optional[str]]], ) -> bool: """Validate label values. diff --git a/test/unit/test_kubernetes.py b/test/unit/test_kubernetes.py index 6795d7b6955..607394b2b3d 100644 --- a/test/unit/test_kubernetes.py +++ b/test/unit/test_kubernetes.py @@ -5,7 +5,9 @@ parse_kube_keyvalue_list, ) -from metaflow.plugins.kubernetes.kubernetes_decorator import validate_kube_labels +from metaflow.plugins.kubernetes.kubernetes_decorator import ( + validate_kube_labels_or_annotations, +) @pytest.mark.parametrize( From 1f4ef7be84a99819ffe74a940eb3846bfde9a125 Mon Sep 17 00:00:00 2001 From: Sakari Ikonen Date: Fri, 14 Jul 2023 14:10:25 +0300 Subject: [PATCH 14/15] refactor Kubernetes annotations to the kube decorator --- metaflow/plugins/argo/argo_workflows.py | 41 +++++----------- metaflow/plugins/kubernetes/kubernetes.py | 34 ++++---------- metaflow/plugins/kubernetes/kubernetes_cli.py | 8 ++++ .../kubernetes/kubernetes_decorator.py | 47 ++++++++++++++++++- 4 files changed, 75 insertions(+), 55 deletions(-) diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index ef6179a66b7..cb7370f4e6f 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -29,7 +29,6 @@ DATATOOLS_S3ROOT, DEFAULT_METADATA, DEFAULT_SECRETS_BACKEND_TYPE, - KUBERNETES_ANNOTATIONS, KUBERNETES_FETCH_EC2_METADATA, KUBERNETES_NAMESPACE, KUBERNETES_NODE_SELECTOR, @@ -275,18 +274,20 @@ def _get_kubernetes_labels(self): ) return resources["labels"] or {} - @staticmethod - def _get_kubernetes_annotations(): + def _get_kubernetes_annotations(self): """ - Get Kubernetes annotations from environment variable. - Parses the string into a dict and validates that values adhere to Kubernetes restrictions. + Get Kubernetes annotations from the start step decorator. """ - if not KUBERNETES_ANNOTATIONS: - return {} - env_annotations = KUBERNETES_ANNOTATIONS.split(",") - env_annotations = parse_kube_keyvalue_list(env_annotations, False) - validate_kube_labels_or_annotations(env_annotations) - return env_annotations + resources = dict( + [ + deco + for node in self.graph + if node.name == "start" + for deco in node.decorators + if deco.name == "kubernetes" + ][0].attributes + ) + return resources["annotations"] or {} def _get_schedule(self): schedule = self.flow._flow_decorators.get("schedule") @@ -585,14 +586,6 @@ def _compile_workflow_template(self): "metaflow/flow_name": self.flow.name, } ) - if current.get("project_name"): - annotations.update( - { - "metaflow/project_name": current.project_name, - "metaflow/branch_name": current.branch_name, - "metaflow/project_flow_name": current.project_flow_name, - } - ) return ( WorkflowTemplate() @@ -1550,8 +1543,6 @@ def _compile_sensor(self): "sdk (https://pypi.org/project/kubernetes/) first." ) - labels = {"app.kubernetes.io/part-of": "metaflow"} - annotations = self.kubernetes_annotations annotations.update( { @@ -1561,14 +1552,6 @@ def _compile_sensor(self): "metaflow/flow_name": self.flow.name, } ) - if current.get("project_name"): - annotations.update( - { - "metaflow/project_name": current.project_name, - "metaflow/branch_name": current.branch_name, - "metaflow/project_flow_name": current.project_flow_name, - } - ) return ( Sensor() diff --git a/metaflow/plugins/kubernetes/kubernetes.py b/metaflow/plugins/kubernetes/kubernetes.py index ceb6a84ef07..dc8e448adf4 100644 --- a/metaflow/plugins/kubernetes/kubernetes.py +++ b/metaflow/plugins/kubernetes/kubernetes.py @@ -26,7 +26,6 @@ DEFAULT_AWS_CLIENT_PROVIDER, DEFAULT_METADATA, DEFAULT_SECRETS_BACKEND_TYPE, - KUBERNETES_ANNOTATIONS, KUBERNETES_FETCH_EC2_METADATA, KUBERNETES_SANDBOX_INIT_SCRIPT, S3_ENDPOINT_URL, @@ -168,6 +167,7 @@ def create_job( persistent_volume_claims=None, tolerations=None, labels=None, + annotations=None, ): if env is None: env = {} @@ -202,6 +202,7 @@ def create_job( step_name=step_name, tolerations=tolerations, labels=labels, + annotations=annotations, use_tmpfs=use_tmpfs, tmpfs_tempdir=tmpfs_tempdir, tmpfs_size=tmpfs_size, @@ -286,34 +287,17 @@ def create_job( for name, value in env.items(): job.environment_variable(name, value) - annotations = self._get_annotations() - - annotations.update( - { - "metaflow/user": user, - "metaflow/flow_name": flow_name, - } - ) - - if current.get("project_name"): - annotations.update( - { - "metaflow/project_name": current.project_name, - "metaflow/branch_name": current.branch_name, - "metaflow/project_flow_name": current.project_flow_name, - } - ) + # Add job specific annotations not set in the decorator. + annotations = { + "metaflow/run_id": run_id, + "metaflow/step_name": step_name, + "metaflow/task_id": task_id, + "metaflow/attempt": attempt, + } for name, value in annotations.items(): job.annotation(name, value) - ( - job.annotation("metaflow/run_id", run_id) - .annotation("metaflow/step_name", step_name) - .annotation("metaflow/task_id", task_id) - .annotation("metaflow/attempt", attempt) - ) - return job.create() def wait(self, stdout_location, stderr_location, echo=None): diff --git a/metaflow/plugins/kubernetes/kubernetes_cli.py b/metaflow/plugins/kubernetes/kubernetes_cli.py index ccd4ef7dea0..d1817d21515 100644 --- a/metaflow/plugins/kubernetes/kubernetes_cli.py +++ b/metaflow/plugins/kubernetes/kubernetes_cli.py @@ -111,6 +111,12 @@ def kubernetes(): type=JSONTypeClass(), multiple=False, ) +@click.option( + "--annotations", + default=None, + type=JSONTypeClass(), + multiple=False, +) @click.pass_context def step( ctx, @@ -137,6 +143,7 @@ def step( persistent_volume_claims=None, tolerations=None, labels=None, + annotations=None, **kwargs ): def echo(msg, stream="stderr", job_id=None, **kwargs): @@ -252,6 +259,7 @@ def _sync_metadata(): persistent_volume_claims=persistent_volume_claims, tolerations=tolerations, labels=labels, + annotations=annotations, ) except Exception as e: traceback.print_exc(chain=False) diff --git a/metaflow/plugins/kubernetes/kubernetes_decorator.py b/metaflow/plugins/kubernetes/kubernetes_decorator.py index 6b10f43c95a..9c7ad3e1a48 100644 --- a/metaflow/plugins/kubernetes/kubernetes_decorator.py +++ b/metaflow/plugins/kubernetes/kubernetes_decorator.py @@ -18,6 +18,7 @@ KUBERNETES_IMAGE_PULL_POLICY, KUBERNETES_GPU_VENDOR, KUBERNETES_LABELS, + KUBERNETES_ANNOTATIONS, KUBERNETES_NAMESPACE, KUBERNETES_NODE_SELECTOR, KUBERNETES_PERSISTENT_VOLUME_CLAIMS, @@ -71,6 +72,8 @@ class KubernetesDecorator(StepDecorator): Kubernetes tolerations to use when launching pod in Kubernetes. labels: Dict[str, str], default: METAFLOW_KUBERNETES_LABELS Kubernetes labels to use when launching pod in Kubernetes. + annotations: Dict[str, str], default: METAFLOW_KUBERNETES_ANNOTATIONS + Kubernetes annotations to use when launching pod in Kubernetes. use_tmpfs: bool, default: False This enables an explicit tmpfs mount for this step. tmpfs_tempdir: bool, default: True @@ -102,6 +105,7 @@ class KubernetesDecorator(StepDecorator): "tolerations": None, # e.g., [{"key": "arch", "operator": "Equal", "value": "amd"}, # {"key": "foo", "operator": "Equal", "value": "bar"}] "labels": None, # e.g. {"test-label": "value", "another-label":"value2"} + "annotations": None, # e.g. {"note": "value", "another-note": "value2"} "use_tmpfs": None, "tmpfs_tempdir": True, "tmpfs_size": None, @@ -183,6 +187,41 @@ def __init__(self, attributes=None, statically_defined=False): self.attributes["labels"] = {**env_labels, **deco_labels, **system_labels} + # Annotations + # annotation precedence (decreasing): + # - System annotations + # - Decorator annotations + # - Environment annotations + deco_annotations = {} + if self.attributes["annotations"] is not None: + deco_annotations = self.attributes["annotations"] + + env_annotations = {} + if KUBERNETES_ANNOTATIONS: + env_annotations = parse_kube_keyvalue_list( + KUBERNETES_ANNOTATIONS.split(","), False + ) + + system_annotations = { + "metaflow/user": current.username, + "metaflow/flow_name": current.flow_name, + } + + if current.get("project_name"): + system_annotations.update( + { + "metaflow/project_name": current.project_name, + "metaflow/branch_name": current.branch_name, + "metaflow/project_flow_name": current.project_flow_name, + } + ) + + self.attributes["annotations"] = { + **env_annotations, + **deco_annotations, + **system_annotations, + } + # If no docker image is explicitly specified, impute a default image. if not self.attributes["image"]: # If metaflow-config specifies a docker image, just use that. @@ -307,6 +346,7 @@ def step_init(self, flow, graph, step, decos, environment, flow_datastore, logge ) validate_kube_labels_or_annotations(self.attributes["labels"]) + validate_kube_labels_or_annotations(self.attributes["annotations"]) def package_init(self, flow, step_name, environment): try: @@ -359,7 +399,12 @@ def runtime_step_cli( "=".join([key, str(val)]) if val else key for key, val in v.items() ] - elif k in ["tolerations", "persistent_volume_claims", "labels"]: + elif k in [ + "tolerations", + "persistent_volume_claims", + "labels", + "annotations", + ]: cli_args.command_options[k] = json.dumps(v) else: cli_args.command_options[k] = v From 35bc3ba42f65c1784a6eb001e8a737b4b0aae4c4 Mon Sep 17 00:00:00 2001 From: Sakari Ikonen Date: Fri, 14 Jul 2023 14:31:42 +0300 Subject: [PATCH 15/15] clean up setting annotations in argo workflows --- metaflow/plugins/argo/argo_workflows.py | 49 ++++++++++++------------- 1 file changed, 23 insertions(+), 26 deletions(-) diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index cb7370f4e6f..11e3ea3f29c 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -276,7 +276,7 @@ def _get_kubernetes_labels(self): def _get_kubernetes_annotations(self): """ - Get Kubernetes annotations from the start step decorator. + Get Kubernetes annotations from the start step decorator. Append Argo specific annotations. """ resources = dict( [ @@ -287,7 +287,20 @@ def _get_kubernetes_annotations(self): if deco.name == "kubernetes" ][0].attributes ) - return resources["annotations"] or {} + annotations = {} + if resources["annotations"] is not None: + # make a copy so we do not mess possible start-step specific annotations. + annotations = resources["annotations"].copy() + + annotations.update( + { + "metaflow/production_token": self.production_token, + "metaflow/owner": self.username, + "metaflow/user": "argo-workflows", + "metaflow/flow_name": self.flow.name, + } + ) + return annotations def _get_schedule(self): schedule = self.flow._flow_decorators.get("schedule") @@ -577,15 +590,6 @@ def _compile_workflow_template(self): # (https://github.com/argoproj/argo-workflows/issues/7432) and we are forced to # generate container templates at the top level (in WorkflowSpec) and maintain # references to them within the DAGTask. - annotations = self.kubernetes_annotations - annotations.update( - { - "metaflow/production_token": self.production_token, - "metaflow/owner": self.username, - "metaflow/user": "argo-workflows", - "metaflow/flow_name": self.flow.name, - } - ) return ( WorkflowTemplate() @@ -599,7 +603,7 @@ def _compile_workflow_template(self): .namespace(KUBERNETES_NAMESPACE) .label("app.kubernetes.io/name", "metaflow-flow") .label("app.kubernetes.io/part-of", "metaflow") - .annotations(annotations) + .annotations(self.kubernetes_annotations) ) .spec( WorkflowSpec() @@ -632,7 +636,10 @@ def _compile_workflow_template(self): .label("app.kubernetes.io/name", "metaflow-run") .label("app.kubernetes.io/part-of", "metaflow") .annotations( - {**annotations, **{"metaflow/run_id": "argo-{{workflow.name}}"}} + { + **self.kubernetes_annotations, + **{"metaflow/run_id": "argo-{{workflow.name}}"}, + } ) # TODO: Set dynamic labels using labels_from. Ideally, we would # want to expose run_id as a label. It's easy to add labels, @@ -668,7 +675,7 @@ def _compile_workflow_template(self): .labels(self.kubernetes_labels) .label("app.kubernetes.io/name", "metaflow-task") .label("app.kubernetes.io/part-of", "metaflow") - .annotations(annotations) + .annotations(self.kubernetes_annotations) ) # Set the entrypoint to flow name .entrypoint(self.flow.name) @@ -1543,16 +1550,6 @@ def _compile_sensor(self): "sdk (https://pypi.org/project/kubernetes/) first." ) - annotations = self.kubernetes_annotations - annotations.update( - { - "metaflow/production_token": self.production_token, - "metaflow/owner": self.username, - "metaflow/user": "argo-workflows", - "metaflow/flow_name": self.flow.name, - } - ) - return ( Sensor() .metadata( @@ -1563,7 +1560,7 @@ def _compile_sensor(self): .labels(self.kubernetes_labels) .label("app.kubernetes.io/name", "metaflow-sensor") .label("app.kubernetes.io/part-of", "metaflow") - .annotations(annotations) + .annotations(self.kubernetes_annotations) ) .spec( SensorSpec().template( @@ -1573,7 +1570,7 @@ def _compile_sensor(self): ObjectMeta() .label("app.kubernetes.io/name", "metaflow-sensor") .label("app.kubernetes.io/part-of", "metaflow") - .annotations(annotations) + .annotations(self.kubernetes_annotations) ) .container( # Run sensor in guaranteed QoS. The sensor isn't doing a lot