Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Kubernetes annotations #2868

Merged
merged 5 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/source/user_guide/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ The [tutorials](/getting_started/tutorials.md) provide comprehensive step-by-ste
- A list of [Persistent Volume Claims](https://kubernetes.io/docs/concepts/storage/persistent-volumes/) (PVC) to be mounted into the container that executes the component. Format: `/mnt/path=existing-pvc-name`. Entries that are empty (`/mnt/path=`) or malformed are ignored. Entries with a PVC name considered to be an [invalid Kubernetes resource name](https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names) will raise a validation error after pipeline submission or export.
- The referenced PVCs must exist in the Kubernetes namespace where the pipeline nodes are executed.
- Data volumes are not mounted when the pipeline is executed locally.
- **Kubernetes pod annotations**
- A list of [annotations](https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/#attaching-metadata-to-objects) to be attached to the pod that executes the node.
- Format: `annotation-key=annotation-value`. Entries that are empty (`annotation-key=`) are ignored. Entries with a key considered to be [invalid](https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/#syntax-and-character-set) will raise a validation error after pipeline submission or export.
- Annotations are ignored when the pipeline is executed locally.

- Properties that apply to every generic pipeline node. In this release the following properties are supported:
- **Object storage path prefix**. Elyra stores pipeline input and output artifacts in a cloud object storage bucket. By default these artifacts are located in the `/<pipeline-instance-name>` path. The example below depicts the artifact location for several pipelines in the `pipeline-examples` bucket:
![artifacts default storage layout on object storage](../images/user_guide/pipelines/node-artifacts-on-object-storage.png)
Expand Down Expand Up @@ -153,6 +158,11 @@ The [tutorials](/getting_started/tutorials.md) provide comprehensive step-by-ste
- Data volumes are not mounted when the pipeline is executed locally.
- Example: `/mnt/vol1=data-pvc`

**Kubernetes Pod Annotations**
- Optional. A list of [annotations](https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/#attaching-metadata-to-objects) to be attached to the pod that executes the node.
- Format: `annotation-key=annotation-value`. Entries that are empty (`annotation-key=`) are ignored. Entries with a key considered to be [invalid](https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/#syntax-and-character-set) will raise a validation error after pipeline submission or export.
- Annotations are ignored when the pipeline is executed locally.

5. Associate each node with a comment to document its purpose.

![Add comment to node](../images/user_guide/pipelines/add-comment-to-node.gif)
Expand Down
9 changes: 9 additions & 0 deletions elyra/kfp/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from kubernetes.client.models import V1VolumeMount

from elyra._version import __version__
from elyra.pipeline.pipeline import KubernetesAnnotation
from elyra.pipeline.pipeline import KubernetesSecret
from elyra.pipeline.pipeline import VolumeMount

Expand Down Expand Up @@ -91,6 +92,7 @@ def __init__(
workflow_engine: Optional[str] = "argo",
volume_mounts: Optional[List[VolumeMount]] = None,
kubernetes_secrets: Optional[List[KubernetesSecret]] = None,
kubernetes_pod_annotations: Optional[List[KubernetesAnnotation]] = None,
**kwargs,
):
"""Create a new instance of ContainerOp.
Expand All @@ -116,6 +118,7 @@ def __init__(
workflow_engine: Kubeflow workflow engine, defaults to 'argo'
volume_mounts: data volumes to be mounted
kubernetes_secrets: secrets to be made available as environment variables
kubernetes_pod_annotations: annotations to be applied to the pod
kwargs: additional key value pairs to pass e.g. name, image, sidecars & is_exit_handler.
See Kubeflow pipelines ContainerOp definition for more parameters or how to use
https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.dsl.html#kfp.dsl.ContainerOp
Expand Down Expand Up @@ -144,6 +147,7 @@ def __init__(
self.gpu_limit = gpu_limit
self.volume_mounts = volume_mounts # optional data volumes to be mounted to the pod
self.kubernetes_secrets = kubernetes_secrets # optional secrets to be made available as env vars
self.kubernetes_pod_annotations = kubernetes_pod_annotations # optional annotations

argument_list = []

Expand Down Expand Up @@ -267,6 +271,11 @@ def __init__(
)
)

# add user-provided annotations to pod
if self.kubernetes_pod_annotations:
for annotation in self.kubernetes_pod_annotations:
self.add_pod_annotation(annotation.key, annotation.value)

# If crio volume size is found then assume kubeflow pipelines environment is using CRI-o as
# its container runtime
if self.emptydir_volume_size:
Expand Down
58 changes: 46 additions & 12 deletions elyra/pipeline/airflow/processor_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ def _cc_pipeline(self, pipeline: Pipeline, pipeline_name: str, pipeline_instance
"doc": operation.doc,
"volumes": operation.mounted_volumes,
"secrets": operation.kubernetes_secrets,
"kubernetes_pod_annotations": operation.kubernetes_pod_annotations,
}

if runtime_image_pull_secret is not None:
Expand Down Expand Up @@ -447,6 +448,7 @@ def _cc_pipeline(self, pipeline: Pipeline, pipeline_name: str, pipeline_instance
"is_generic_operator": operation.is_generic,
"doc": operation.doc,
"volumes": operation.mounted_volumes,
"kubernetes_pod_annotations": operation.kubernetes_pod_annotations,
}

target_ops.append(target_op)
Expand Down Expand Up @@ -499,6 +501,7 @@ def create_pipeline_file(
"render_executor_config_for_custom_op": AirflowPipelineProcessor.render_executor_config_for_custom_op,
"render_secrets_for_generic_op": AirflowPipelineProcessor.render_secrets_for_generic_op,
"render_secrets_for_cos": AirflowPipelineProcessor.render_secrets_for_cos,
"render_executor_config_for_generic_op": AirflowPipelineProcessor.render_executor_config_for_generic_op,
}
template.globals.update(rendering_functions)

Expand Down Expand Up @@ -636,18 +639,49 @@ def render_executor_config_for_custom_op(op: Dict) -> Dict[str, Dict[str, List]]

:returns: a dict defining the volumes and mounts to be rendered in the DAG
"""
executor_config = {"KubernetesExecutor": {"volumes": [], "volume_mounts": []}}
for volume in op.get("volumes", []):
# Define volumes and volume mounts
executor_config["KubernetesExecutor"]["volumes"].append(
{
"name": volume.pvc_name,
"persistentVolumeClaim": {"claimName": volume.pvc_name},
}
)
executor_config["KubernetesExecutor"]["volume_mounts"].append(
{"mountPath": volume.path, "name": volume.pvc_name, "read_only": False}
)

executor_config = {"KubernetesExecutor": {}}

# Handle volume mounts
if op.get("volumes"):
executor_config["KubernetesExecutor"]["volumes"] = []
executor_config["KubernetesExecutor"]["volume_mounts"] = []
for volume in op.get("volumes", []):
# Add volume and volume mount entry
executor_config["KubernetesExecutor"]["volumes"].append(
{
"name": volume.pvc_name,
"persistentVolumeClaim": {"claimName": volume.pvc_name},
}
)
executor_config["KubernetesExecutor"]["volume_mounts"].append(
{"mountPath": volume.path, "name": volume.pvc_name, "read_only": False}
)

# Handle annotations
if op.get("kubernetes_pod_annotations"):
executor_config["KubernetesExecutor"]["annotations"] = {}
for annotation in op.get("kubernetes_pod_annotations", []):
# Add Kubernetes annotation entry
executor_config["KubernetesExecutor"]["annotations"][annotation.key] = annotation.value

return executor_config

@staticmethod
def render_executor_config_for_generic_op(op: Dict) -> Dict[str, Dict[str, List]]:
ptitzler marked this conversation as resolved.
Show resolved Hide resolved
"""
Render annotations defined for the specified generic op
for use in the Airflow DAG template
:returns: a dict defining the annotations to be rendered in the DAG
"""
executor_config = {"KubernetesExecutor": {}}

# Handle annotations
if op.get("kubernetes_pod_annotations"):
executor_config["KubernetesExecutor"]["annotations"] = {}
for annotation in op.get("kubernetes_pod_annotations", []):
# Add Kubernetes annotation entry
executor_config["KubernetesExecutor"]["annotations"][annotation.key] = annotation.value

return executor_config

Expand Down
9 changes: 9 additions & 0 deletions elyra/pipeline/kfp/processor_kfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ def _cc_pipeline(
},
volume_mounts=operation.mounted_volumes,
kubernetes_secrets=operation.kubernetes_secrets,
kubernetes_pod_annotations=operation.kubernetes_pod_annotations,
)

if operation.doc:
Expand Down Expand Up @@ -674,6 +675,14 @@ def _cc_pipeline(
V1VolumeMount(mount_path=volume_mount.path, name=volume_mount.pvc_name)
)

# Add user-specified pod annotations
if operation.kubernetes_pod_annotations:
unique_annotations = []
for annotation in operation.kubernetes_pod_annotations:
if annotation.key not in unique_annotations:
container_op.add_pod_annotation(annotation.key, annotation.value)
unique_annotations.append(annotation.key)

target_ops[operation.id] = container_op
except Exception as e:
# TODO Fix error messaging and break exceptions down into categories
Expand Down
22 changes: 22 additions & 0 deletions elyra/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from typing import Optional

from elyra.pipeline.pipeline_constants import ENV_VARIABLES
from elyra.pipeline.pipeline_constants import KUBERNETES_POD_ANNOTATIONS
from elyra.pipeline.pipeline_constants import KUBERNETES_SECRETS
from elyra.pipeline.pipeline_constants import MOUNTED_VOLUMES

Expand Down Expand Up @@ -110,6 +111,17 @@ def __init__(
# spec) and must be removed from the component_params dict
self._mounted_volumes = self._component_params.pop(MOUNTED_VOLUMES, [])

self._kubernetes_pod_annotations = []
param_annotations = component_params.get(KUBERNETES_POD_ANNOTATIONS)
if (
param_annotations is not None
and isinstance(param_annotations, list)
and (len(param_annotations) == 0 or isinstance(param_annotations[0], KubernetesAnnotation))
):
# The kubernetes_pod_annotations property is an Elyra system property (ie, not defined in the component
# spec) and must be removed from the component_params dict
self._kubernetes_pod_annotations = self._component_params.pop(KUBERNETES_POD_ANNOTATIONS, [])

# Scrub the inputs and outputs lists
self._component_params["inputs"] = Operation._scrub_list(component_params.get("inputs", []))
self._component_params["outputs"] = Operation._scrub_list(component_params.get("outputs", []))
Expand Down Expand Up @@ -158,6 +170,10 @@ def component_params_as_dict(self) -> Dict[str, Any]:
def mounted_volumes(self) -> List["VolumeMount"]:
return self._mounted_volumes

@property
def kubernetes_pod_annotations(self) -> List["KubernetesAnnotation"]:
return self._kubernetes_pod_annotations

@property
def inputs(self) -> Optional[List[str]]:
return self._component_params.get("inputs")
Expand Down Expand Up @@ -549,6 +565,12 @@ class KubernetesSecret:
key: str


@dataclass
class KubernetesAnnotation:
key: str
value: str


class DataClassJSONEncoder(json.JSONEncoder):
"""
A JSON Encoder class to prevent errors during serialization of dataclasses.
Expand Down
3 changes: 2 additions & 1 deletion elyra/pipeline/pipeline_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
ENV_VARIABLES = "env_vars"
MOUNTED_VOLUMES = "mounted_volumes"
KUBERNETES_SECRETS = "kubernetes_secrets"
KUBERNETES_POD_ANNOTATIONS = "kubernetes_pod_annotations"
PIPELINE_META_PROPERTIES = ["name", "description", "runtime"]
# optional static prefix to be used when generating an object name for object storage
COS_OBJECT_PREFIX = "cos_object_prefix"
ELYRA_COMPONENT_PROPERTIES = [MOUNTED_VOLUMES]
ELYRA_COMPONENT_PROPERTIES = [MOUNTED_VOLUMES, KUBERNETES_POD_ANNOTATIONS]
12 changes: 12 additions & 0 deletions elyra/pipeline/pipeline_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@

from elyra.pipeline.component_catalog import ComponentCache
from elyra.pipeline.pipeline import KeyValueList
from elyra.pipeline.pipeline import KubernetesAnnotation
from elyra.pipeline.pipeline import KubernetesSecret
from elyra.pipeline.pipeline import Operation
from elyra.pipeline.pipeline import VolumeMount
from elyra.pipeline.pipeline_constants import ELYRA_COMPONENT_PROPERTIES
from elyra.pipeline.pipeline_constants import ENV_VARIABLES
from elyra.pipeline.pipeline_constants import KUBERNETES_POD_ANNOTATIONS
from elyra.pipeline.pipeline_constants import KUBERNETES_SECRETS
from elyra.pipeline.pipeline_constants import MOUNTED_VOLUMES
from elyra.pipeline.pipeline_constants import PIPELINE_DEFAULTS
Expand Down Expand Up @@ -434,6 +436,16 @@ def convert_data_class_properties(self):

self.set_component_parameter(KUBERNETES_SECRETS, secret_objects)

kubernetes_pod_annotations = self.get_component_parameter(KUBERNETES_POD_ANNOTATIONS)
if kubernetes_pod_annotations and isinstance(kubernetes_pod_annotations, KeyValueList):
annotations_objects = []
for annotation_key, annotation_value in kubernetes_pod_annotations.to_dict().items():
# Validation should have verified that the provided values are valid
# Create a KubernetesAnnotation class instance and add to list
annotations_objects.append(KubernetesAnnotation(annotation_key, annotation_value))

self.set_component_parameter(KUBERNETES_POD_ANNOTATIONS, annotations_objects)


class PipelineDefinition(object):
"""
Expand Down
36 changes: 36 additions & 0 deletions elyra/pipeline/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,22 @@
from elyra.pipeline.component_catalog import ComponentCache
from elyra.pipeline.pipeline import DataClassJSONEncoder
from elyra.pipeline.pipeline import KeyValueList
from elyra.pipeline.pipeline import KubernetesAnnotation
from elyra.pipeline.pipeline import KubernetesSecret
from elyra.pipeline.pipeline import Operation
from elyra.pipeline.pipeline import PIPELINE_CURRENT_SCHEMA
from elyra.pipeline.pipeline import PIPELINE_CURRENT_VERSION
from elyra.pipeline.pipeline import VolumeMount
from elyra.pipeline.pipeline_constants import ENV_VARIABLES
from elyra.pipeline.pipeline_constants import KUBERNETES_POD_ANNOTATIONS
from elyra.pipeline.pipeline_constants import KUBERNETES_SECRETS
from elyra.pipeline.pipeline_constants import MOUNTED_VOLUMES
from elyra.pipeline.pipeline_constants import RUNTIME_IMAGE
from elyra.pipeline.pipeline_definition import Node
from elyra.pipeline.pipeline_definition import PipelineDefinition
from elyra.pipeline.processor import PipelineProcessorManager
from elyra.pipeline.runtime_type import RuntimeProcessorType
from elyra.util.kubernetes import is_valid_annotation_key
from elyra.util.kubernetes import is_valid_kubernetes_key
from elyra.util.kubernetes import is_valid_kubernetes_resource_name
from elyra.util.path import get_expanded_path
Expand Down Expand Up @@ -419,6 +422,7 @@ def _validate_generic_node_properties(self, node: Node, response: ValidationResp
env_vars = node.get_component_parameter(ENV_VARIABLES)
volumes = node.get_component_parameter(MOUNTED_VOLUMES)
secrets = node.get_component_parameter(KUBERNETES_SECRETS)
annotations = node.get_component_parameter(KUBERNETES_POD_ANNOTATIONS)

self._validate_filepath(
node_id=node.id, node_label=node_label, property_name="filename", filename=filename, response=response
Expand All @@ -442,6 +446,8 @@ def _validate_generic_node_properties(self, node: Node, response: ValidationResp
self._validate_mounted_volumes(node.id, node_label, volumes, response=response)
if secrets:
self._validate_kubernetes_secrets(node.id, node_label, secrets, response=response)
if annotations:
self._validate_kubernetes_pod_annotations(node.id, node_label, annotations, response=response)

self._validate_label(node_id=node.id, node_label=node_label, response=response)
if dependencies:
Expand Down Expand Up @@ -490,6 +496,10 @@ async def _validate_custom_component_node_properties(
if volumes and MOUNTED_VOLUMES not in node.elyra_properties_to_skip:
self._validate_mounted_volumes(node.id, node.label, volumes, response=response)

annotations = node.get_component_parameter(KUBERNETES_POD_ANNOTATIONS)
if annotations and KUBERNETES_POD_ANNOTATIONS not in node.elyra_properties_to_skip:
self._validate_kubernetes_pod_annotations(node.id, node.label, annotations, response=response)

for default_parameter in current_parameter_defaults_list:
node_param = node.get_component_parameter(default_parameter)
if self._is_required_property(component_property_dict, default_parameter):
Expand Down Expand Up @@ -707,6 +717,32 @@ def _validate_kubernetes_secrets(
},
)

def _validate_kubernetes_pod_annotations(
self, node_id: str, node_label: str, annotations: List[KubernetesAnnotation], response: ValidationResponse
) -> None:
"""
Checks the format of the user-provided annotations to ensure they're in the correct form
e.g. annotation_key=annotation_value
:param node_id: the unique ID of the node
:param node_label: the given node name or user customized name/label of the node
:param annotations: a KeyValueList of annotations to check
:param response: ValidationResponse containing the issue list to be updated
"""
for annotation in annotations:
# Ensure the annotation key is valid
if not is_valid_annotation_key(annotation.key):
response.add_message(
severity=ValidationSeverity.Error,
message_type="invalidKubernetesAnnotation",
message=f"'{annotation.key}' is not a valid Kubernetes annotation key.",
data={
"nodeID": node_id,
"nodeName": node_label,
"propertyName": KUBERNETES_POD_ANNOTATIONS,
"value": KeyValueList.to_str(annotation.key, annotation.value),
},
)

def _validate_filepath(
self,
node_id: str,
Expand Down
Loading