Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds custom annotations via env variables #1442

Merged
merged 17 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@
KUBERNETES_SECRETS = from_conf("KUBERNETES_SECRETS", "")
# Default labels for kubernetes pods
KUBERNETES_LABELS = from_conf("KUBERNETES_LABELS", "")
# Default annotations for kubernetes pods
KUBERNETES_ANNOTATIONS = from_conf("KUBERNETES_ANNOTATIONS", "")
# Default GPU vendor to use by K8S jobs created by Metaflow (supports nvidia, amd)
KUBERNETES_GPU_VENDOR = from_conf("KUBERNETES_GPU_VENDOR", "nvidia")
# Default container image for K8S
Expand Down
50 changes: 35 additions & 15 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
DATATOOLS_S3ROOT,
DEFAULT_METADATA,
DEFAULT_SECRETS_BACKEND_TYPE,
KUBERNETES_ANNOTATIONS,
KUBERNETES_FETCH_EC2_METADATA,
KUBERNETES_LABELS,
KUBERNETES_NAMESPACE,
Expand All @@ -43,7 +44,7 @@
from metaflow.parameters import deploy_time_eval
from metaflow.plugins.kubernetes.kubernetes import (
parse_kube_keyvalue_list,
validate_kube_labels,
validate_kube_labels_or_annotations,
)
from metaflow.util import (
compress_list,
Expand Down Expand Up @@ -152,6 +153,7 @@ def __init__(
self._schedule, self._timezone = self._get_schedule()

self.kubernetes_labels = self._get_kubernetes_labels()
self.kubernetes_annotations = self._get_kubernetes_annotations()
self._workflow_template = self._compile_workflow_template()
self._sensor = self._compile_sensor()

Expand Down Expand Up @@ -217,9 +219,22 @@ def _get_kubernetes_labels():
return {}
env_labels = KUBERNETES_LABELS.split(",")
env_labels = parse_kube_keyvalue_list(env_labels, False)
validate_kube_labels(env_labels)
validate_kube_labels_or_annotations(env_labels)
return env_labels

@staticmethod
def _get_kubernetes_annotations():
"""
Get Kubernetes annotations from environment variable.
Parses the string into a dict and validates that values adhere to Kubernetes restrictions.
"""
if not KUBERNETES_ANNOTATIONS:
return {}
env_annotations = KUBERNETES_ANNOTATIONS.split(",")
env_annotations = parse_kube_keyvalue_list(env_annotations, False)
validate_kube_labels_or_annotations(env_annotations)
return env_annotations

def _get_schedule(self):
schedule = self.flow._flow_decorators.get("schedule")
if schedule:
Expand Down Expand Up @@ -479,13 +494,15 @@ def _compile_workflow_template(self):
# (https://github.com/argoproj/argo-workflows/issues/7432) and we are forced to
# generate container templates at the top level (in WorkflowSpec) and maintain
# references to them within the DAGTask.

annotations = {
"metaflow/production_token": self.production_token,
"metaflow/owner": self.username,
"metaflow/user": "argo-workflows",
"metaflow/flow_name": self.flow.name,
}
annotations = self.kubernetes_annotations
tylerpotts marked this conversation as resolved.
Show resolved Hide resolved
annotations.update(
{
"metaflow/production_token": self.production_token,
"metaflow/owner": self.username,
"metaflow/user": "argo-workflows",
"metaflow/flow_name": self.flow.name,
}
)
if current.get("project_name"):
annotations.update(
{
Expand Down Expand Up @@ -1447,12 +1464,15 @@ def _compile_sensor(self):

labels = {"app.kubernetes.io/part-of": "metaflow"}

annotations = {
"metaflow/production_token": self.production_token,
"metaflow/owner": self.username,
"metaflow/user": "argo-workflows",
"metaflow/flow_name": self.flow.name,
}
annotations = self._get_kubernetes_annotations()
tylerpotts marked this conversation as resolved.
Show resolved Hide resolved
annotations.update(
{
"metaflow/production_token": self.production_token,
"metaflow/owner": self.username,
"metaflow/user": "argo-workflows",
"metaflow/flow_name": self.flow.name,
}
)
if current.get("project_name"):
annotations.update(
{
Expand Down
64 changes: 39 additions & 25 deletions metaflow/plugins/kubernetes/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
DEFAULT_AWS_CLIENT_PROVIDER,
DEFAULT_METADATA,
DEFAULT_SECRETS_BACKEND_TYPE,
KUBERNETES_ANNOTATIONS,
KUBERNETES_FETCH_EC2_METADATA,
KUBERNETES_LABELS,
KUBERNETES_SANDBOX_INIT_SCRIPT,
Expand Down Expand Up @@ -166,7 +167,6 @@ def create_job(
env=None,
persistent_volume_claims=None,
tolerations=None,
labels=None,
):
if env is None:
env = {}
Expand Down Expand Up @@ -200,7 +200,7 @@ def create_job(
retries=0,
step_name=step_name,
tolerations=tolerations,
labels=self._get_labels(labels),
labels=self._get_labels(),
use_tmpfs=use_tmpfs,
tmpfs_tempdir=tmpfs_tempdir,
tmpfs_size=tmpfs_size,
Expand Down Expand Up @@ -280,18 +280,7 @@ def create_job(
for name, value in env.items():
job.environment_variable(name, value)

annotations = {
"metaflow/user": user,
"metaflow/flow_name": flow_name,
}
if current.get("project_name"):
annotations.update(
{
"metaflow/project_name": current.project_name,
"metaflow/branch_name": current.branch_name,
"metaflow/project_flow_name": current.project_flow_name,
}
)
annotations = self._get_annotations(user, flow_name, current)
tylerpotts marked this conversation as resolved.
Show resolved Hide resolved

for name, value in annotations.items():
job.annotation(name, value)
Expand Down Expand Up @@ -401,29 +390,52 @@ def wait_for_launch(job):
)

@staticmethod
def _get_labels(extra_labels=None):
if extra_labels is None:
extra_labels = {}
env_labels = KUBERNETES_LABELS.split(",") if KUBERNETES_LABELS else []
env_labels = parse_kube_keyvalue_list(env_labels, False)
labels = {**env_labels, **extra_labels}
validate_kube_labels(labels)
def _get_labels():
env_labels_list = KUBERNETES_LABELS.split(",") if KUBERNETES_LABELS else []
labels = parse_kube_keyvalue_list(env_labels_list, False)
validate_kube_labels_or_annotations(labels)
return labels

@staticmethod
def _get_annotations(user, flow_name, current):
env_annotations_list = (
KUBERNETES_ANNOTATIONS.split(",") if KUBERNETES_ANNOTATIONS else []
)
annotations = parse_kube_keyvalue_list(env_annotations_list, False)

annotations.update(
{
"metaflow/user": user,
"metaflow/flow_name": flow_name,
}
)

if current.get("project_name"):
annotations.update(
{
"metaflow/project_name": current.project_name,
"metaflow/branch_name": current.branch_name,
"metaflow/project_flow_name": current.project_flow_name,
}
)
validate_kube_labels_or_annotations(annotations)
return annotations


def validate_kube_labels(
def validate_kube_labels_or_annotations(
labels: Optional[Dict[str, Optional[str]]],
) -> bool:
"""Validate label values.

This validates the kubernetes label values. It does not validate the keys.
This validates the kubernetes label/annotation values. It does not validate the keys.
Ideally, keys should be static and also the validation rules for keys are
more complex than those for values. For full validation rules, see:

https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set
https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/#syntax-and-character-set
"""

def validate_label(s: Optional[str]):
def validate_label_annotation(s: Optional[str]):
regex_match = r"^(([A-Za-z0-9][-A-Za-z0-9_.]{0,61})?[A-Za-z0-9])?$"
if not s:
# allow empty label
Expand All @@ -438,7 +450,9 @@ def validate_label(s: Optional[str]):
)
return True

return all([validate_label(v) for v in labels.values()]) if labels else True
return (
all([validate_label_annotation(v) for v in labels.values()]) if labels else True
)


def parse_kube_keyvalue_list(items: List[str], requires_both: bool = True):
Expand Down
10 changes: 5 additions & 5 deletions test/unit/test_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from metaflow.plugins.kubernetes.kubernetes import (
KubernetesException,
validate_kube_labels,
validate_kube_labels_or_annotations,
parse_kube_keyvalue_list,
)

Expand Down Expand Up @@ -40,8 +40,8 @@
},
],
)
def test_kubernetes_decorator_validate_kube_labels(labels):
assert validate_kube_labels(labels)
def test_kubernetes_decorator_validate_kube_labels_or_annotations(labels):
assert validate_kube_labels_or_annotations(labels)


@pytest.mark.parametrize(
Expand All @@ -65,10 +65,10 @@ def test_kubernetes_decorator_validate_kube_labels(labels):
{"valid": "test", "invalid": "bißchen"},
],
)
def test_kubernetes_decorator_validate_kube_labels_fail(labels):
def test_kubernetes_decorator_validate_kube_labels_or_annotations_fail(labels):
"""Fail if label contains invalid characters or is too long"""
with pytest.raises(KubernetesException):
validate_kube_labels(labels)
validate_kube_labels_or_annotations(labels)


@pytest.mark.parametrize(
Expand Down