Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add labels and fix argo #1360

Merged
merged 11 commits into from
May 23, 2023
2 changes: 2 additions & 0 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@
KUBERNETES_NODE_SELECTOR = from_conf("KUBERNETES_NODE_SELECTOR", "")
KUBERNETES_TOLERATIONS = from_conf("KUBERNETES_TOLERATIONS", "")
KUBERNETES_SECRETS = from_conf("KUBERNETES_SECRETS", "")
# Default labels for kubernetes pods
KUBERNETES_LABELS = from_conf("KUBERNETES_LABELS", "")
# Default GPU vendor to use by K8S jobs created by Metaflow (supports nvidia, amd)
KUBERNETES_GPU_VENDOR = from_conf("KUBERNETES_GPU_VENDOR", "nvidia")
# Default container image for K8S
Expand Down
10 changes: 6 additions & 4 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -887,20 +887,22 @@ def _container_templates(self):
.retry_strategy(
times=total_retries,
minutes_between_retries=minutes_between_retries,
)
.metadata(
).metadata(
ObjectMeta().annotation("metaflow/step_name", node.name)
# Unfortunately, we can't set the task_id since it is generated
# inside the pod. However, it can be inferred from the annotation
# set by argo-workflows - `workflows.argoproj.io/outputs` - refer
# the field 'task-id' in 'parameters'
# .annotation("metaflow/task_id", ...)
.annotation("metaflow/attempt", retry_count)
# Set labels
.labels(resources.get("labels"))
dhpollack marked this conversation as resolved.
Show resolved Hide resolved
)
# Set emptyDir volume for state management
.empty_dir_volume("out")
# Set node selectors
.node_selectors(resources.get("node_selector"))
# Set tolerations
.tolerations(resources.get("tolerations"))
# Set container
.container(
Expand Down Expand Up @@ -1062,7 +1064,7 @@ def label(self, key, value):
def labels(self, labels):
if "labels" not in self.payload:
self.payload["labels"] = {}
self.payload["labels"].update(labels)
self.payload["labels"].update(labels or {})
saikonen marked this conversation as resolved.
Show resolved Hide resolved
return self

def name(self, name):
Expand Down Expand Up @@ -1171,7 +1173,7 @@ def label(self, key, value):
def labels(self, labels):
if "labels" not in self.payload:
self.payload["labels"] = {}
self.payload["labels"].update(labels)
self.payload["labels"].update(labels or {})
return self

def labels_from(self, labels_from):
Expand Down
6 changes: 5 additions & 1 deletion metaflow/plugins/kubernetes/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from metaflow.metaflow_config import (
SERVICE_HEADERS,
SERVICE_INTERNAL_URL,
CARD_AZUREROOT,
CARD_GSROOT,
CARD_S3ROOT,
DATASTORE_SYSROOT_S3,
DATATOOLS_S3ROOT,
Expand All @@ -29,8 +31,8 @@
BASH_SAVE_LOGS,
bash_capture_logs,
export_mflog_env_vars,
tail_logs,
get_log_tailer,
tail_logs,
)

from .kubernetes_client import KubernetesClient
Expand Down Expand Up @@ -152,6 +154,7 @@ def create_job(
run_time_limit=None,
env=None,
tolerations=None,
labels=None,
):

if env is None:
Expand Down Expand Up @@ -185,6 +188,7 @@ def create_job(
retries=0,
step_name=step_name,
tolerations=tolerations,
labels=labels,
)
.environment_variable("METAFLOW_CODE_SHA", code_package_sha)
.environment_variable("METAFLOW_CODE_URL", code_package_url)
Expand Down
17 changes: 15 additions & 2 deletions metaflow/plugins/kubernetes/kubernetes_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import time
import traceback

from metaflow import util, JSONTypeClass
from metaflow import JSONTypeClass, util
from metaflow._vendor import click
from metaflow.exception import METAFLOW_EXIT_DISALLOW_RETRY, CommandException
from metaflow.metadata.util import sync_local_metadata_from_datastore
Expand Down Expand Up @@ -91,6 +91,12 @@ def kubernetes():
type=JSONTypeClass(),
multiple=False,
)
@click.option(
"--labels",
multiple=True,
default=None,
help="Labels for Kubernetes pod.",
)
@click.pass_context
def step(
ctx,
Expand All @@ -110,6 +116,7 @@ def step(
gpu_vendor=None,
run_time_limit=None,
tolerations=None,
labels=None,
**kwargs
):
def echo(msg, stream="stderr", job_id=None, **kwargs):
Expand Down Expand Up @@ -175,7 +182,12 @@ def echo(msg, stream="stderr", job_id=None, **kwargs):
stderr_location = ds.get_log_location(TASK_LOG_SOURCE, "stderr")

# `node_selector` is a tuple of strings, convert it to a dictionary
node_selector = KubernetesDecorator.parse_node_selector(node_selector)
node_selector = KubernetesDecorator.parse_kube_keyvalue_list(node_selector)

# `labels` is a tuple of strings or a tuple with a single comma separated string
# convert it to a dict
labels = KubernetesDecorator.parse_kube_keyvalue_list(labels, False)
KubernetesDecorator.validate_kube_labels(labels)

def _sync_metadata():
if ctx.obj.metadata.TYPE == "local":
Expand Down Expand Up @@ -218,6 +230,7 @@ def _sync_metadata():
run_time_limit=run_time_limit,
env=env,
tolerations=tolerations,
labels=labels,
)
except Exception as e:
traceback.print_exc(chain=False)
Expand Down
110 changes: 96 additions & 14 deletions metaflow/plugins/kubernetes/kubernetes_decorator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import hashlib
import json
import os
import platform
import re
import sys
from typing import Dict, List, Optional, Union

from metaflow.decorators import StepDecorator
from metaflow.exception import MetaflowException
Expand All @@ -12,11 +15,12 @@
KUBERNETES_CONTAINER_IMAGE,
KUBERNETES_CONTAINER_REGISTRY,
KUBERNETES_GPU_VENDOR,
KUBERNETES_LABELS,
KUBERNETES_NAMESPACE,
KUBERNETES_NODE_SELECTOR,
KUBERNETES_TOLERATIONS,
KUBERNETES_SERVICE_ACCOUNT,
KUBERNETES_SECRETS,
KUBERNETES_SERVICE_ACCOUNT,
KUBERNETES_FETCH_EC2_METADATA,
)
from metaflow.plugins.resources_decorator import ResourcesDecorator
Expand Down Expand Up @@ -65,6 +69,8 @@ class KubernetesDecorator(StepDecorator):
in Metaflow configuration.
tolerations : List[str], default: METAFLOW_KUBERNETES_TOLERATIONS
Kubernetes tolerations to use when launching pod in Kubernetes.
labels : Dict[str, str], default: METAFLOW_KUBERNETES_LABELS
Kubernetes labels to use when launching pod in Kubernetes.
"""

name = "kubernetes"
Expand All @@ -76,6 +82,7 @@ class KubernetesDecorator(StepDecorator):
"service_account": None,
"secrets": None, # e.g., mysecret
"node_selector": None, # e.g., kubernetes.io/os=linux
"labels": None, # e.g., my_label=my_value
"namespace": None,
"gpu": None, # value of 0 implies that the scheduled node should not have GPUs
"gpu_vendor": None,
Expand All @@ -99,9 +106,17 @@ def __init__(self, attributes=None, statically_defined=False):
self.attributes["node_selector"] = KUBERNETES_NODE_SELECTOR
if not self.attributes["tolerations"] and KUBERNETES_TOLERATIONS:
self.attributes["tolerations"] = json.loads(KUBERNETES_TOLERATIONS)
if not self.attributes["labels"] and KUBERNETES_LABELS:
self.attributes["labels"] = KUBERNETES_LABELS

if isinstance(self.attributes["labels"], str):
self.attributes["labels"] = self.parse_kube_keyvalue_list(
self.attributes["labels"].split(","), False
)
self.validate_kube_labels(self.attributes["labels"])

if isinstance(self.attributes["node_selector"], str):
self.attributes["node_selector"] = self.parse_node_selector(
self.attributes["node_selector"] = self.parse_kube_keyvalue_list(
self.attributes["node_selector"].split(",")
)

Expand Down Expand Up @@ -280,10 +295,11 @@ def runtime_step_cli(
for k, v in self.attributes.items():
if k == "namespace":
cli_args.command_options["k8s_namespace"] = v
elif k == "node_selector" and v:
cli_args.command_options[k] = ",".join(
["=".join([key, str(val)]) for key, val in v.items()]
)
elif k in {"node_selector", "labels"} and v:
cli_args.command_options[k] = [
"=".join([key, str(val)]) if val else key
for key, val in v.items()
]
elif k == "tolerations":
cli_args.command_options[k] = json.dumps(v)
else:
Expand Down Expand Up @@ -391,14 +407,80 @@ def _save_package_once(cls, flow_datastore, package):
[package.blob], len_hint=1
)[0]

@classmethod
def _parse_decorator_spec(cls, deco_spec: str):
if not deco_spec:
return cls()

valid_options = "|".join(cls.defaults.keys())
deco_spec_parts = []
for part in re.split(""",(?=[\s\w]+[{}]=)""".format(valid_options), deco_spec):
name, val = part.split("=", 1)
if name in {"labels", "node_selector"}:
try:
tmp_vals = json.loads(val.strip().replace('\\"', '"'))
for val_i in tmp_vals.values():
if not (val_i is None or isinstance(val_i, str)):
raise KubernetesException(
"All values must be string or null."
)
except json.JSONDecodeError:
if val.startswith("{"):
raise KubernetesException(
"Malform json detected in %s" % str(val)
)
both = name == "node_selector"
val = json.dumps(
cls.parse_kube_keyvalue_list(val.split(","), both),
separators=(",", ":"),
)
deco_spec_parts.append("=".join([name, val]))
deco_spec_parsed = ",".join(deco_spec_parts)
return super()._parse_decorator_spec(deco_spec_parsed)

@staticmethod
def parse_node_selector(node_selector: list):
def parse_kube_keyvalue_list(items: List[str], requires_both: bool = True):
try:
return {
str(k.split("=", 1)[0]): str(k.split("=", 1)[1])
for k in node_selector or []
}
ret = {}
for item_str in items:
item = item_str.split("=", 1)
if requires_both:
item[1] # raise IndexError
if str(item[0]) in ret:
raise KubernetesException("Duplicate key found: %s" % str(item[0]))
ret[str(item[0])] = str(item[1]) if len(item) > 1 else None
return ret
except KubernetesException as e:
raise e
except (AttributeError, IndexError):
raise KubernetesException(
"Unable to parse node_selector: %s" % node_selector
)
raise KubernetesException("Unable to parse kubernetes list: %s" % items)

@staticmethod
def validate_kube_labels(
labels: Optional[Dict[str, Optional[str]]],
) -> bool:
"""Validate label values.

This validates the kubernetes label values. It does not validate the keys.
Ideally, keys should be static and also the validation rules for keys are
more complex than those for values. For full validation rules, see:

https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set
"""

def validate_label(s: Optional[str]):
regex_match = r"^(([A-Za-z0-9][-A-Za-z0-9_.]{0,61})?[A-Za-z0-9])?$"
if not s:
# allow empty label
return True
if not re.search(regex_match, s):
raise KubernetesException(
'Invalid value: "%s"\n'
"A valid label must be an empty string or one that\n"
" - Consist of alphanumeric, '-', '_' or '.' characters\n"
" - Begins and ends with an alphanumeric character\n"
" - Is at most 63 characters" % s
)
return True

return all([validate_label(v) for v in labels.values()]) if labels else True
94 changes: 94 additions & 0 deletions test/unit/test_kubernetes_decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import pytest

from metaflow.plugins.kubernetes.kubernetes import KubernetesException
from metaflow.plugins.kubernetes.kubernetes_decorator import KubernetesDecorator


@pytest.mark.parametrize(
"labels",
[
None,
{"label": "value"},
{"label1": "val1", "label2": "val2"},
{"label1": "val1", "label2": None},
{"label": "a"},
{"label": ""},
{
"label": (
"1234567890"
"1234567890"
"1234567890"
"1234567890"
"1234567890"
"1234567890"
"123"
)
},
{
"label": (
"1234567890"
"1234567890"
"1234-_.890"
"1234567890"
"1234567890"
"1234567890"
"123"
)
},
],
)
def test_kubernetes_decorator_validate_kube_labels(labels):
assert KubernetesDecorator.validate_kube_labels(labels)


@pytest.mark.parametrize(
"labels",
[
{"label": "a-"},
{"label": ".a"},
{"label": "test()"},
{
"label": (
"1234567890"
"1234567890"
"1234567890"
"1234567890"
"1234567890"
"1234567890"
"1234"
)
},
{"label": "(){}??"},
{"valid": "test", "invalid": "bißchen"},
],
)
def test_kubernetes_decorator_validate_kube_labels_fail(labels):
"""Fail if label contains invalid characters or is too long"""
with pytest.raises(KubernetesException):
KubernetesDecorator.validate_kube_labels(labels)


@pytest.mark.parametrize(
"items,requires_both,expected",
[
(["key=value"], True, {"key": "value"}),
(["key=value"], False, {"key": "value"}),
(["key"], False, {"key": None}),
(["key=value", "key2=value2"], True, {"key": "value", "key2": "value2"}),
],
)
def test_kubernetes_parse_keyvalue_list(items, requires_both, expected):
ret = KubernetesDecorator.parse_kube_keyvalue_list(items, requires_both)
assert ret == expected


@pytest.mark.parametrize(
"items,requires_both",
[
(["key=value", "key=value2"], True),
(["key"], True),
],
)
def test_kubernetes_parse_keyvalue_list(items, requires_both):
with pytest.raises(KubernetesException):
KubernetesDecorator.parse_kube_keyvalue_list(items, requires_both)