Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify component naming #815

Merged
merged 4 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 6 additions & 25 deletions src/fondant/core/component_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import json
import pkgutil
import pydoc
import re
import types
import typing as t
from dataclasses import dataclass
Expand Down Expand Up @@ -101,7 +100,7 @@ def __init__(
tags: t.Optional[t.List[str]] = None,
):
spec_dict: t.Dict[str, t.Any] = {
"name": name,
"name": self.sanitized_component_name(name),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be the only place where the incoming name is cleaned and stored

"image": image,
}

Expand Down Expand Up @@ -180,27 +179,9 @@ def from_dict(cls, component_spec_dict: t.Dict[str, t.Any]) -> "ComponentSpec":
def name(self):
return self._specification["name"]

@property
def component_folder_name(self):
"""Cleans and converts a name to a proper folder name."""
return self._specification["name"].lower().replace(" ", "_")

@property
def sanitized_component_name(self):
"""Cleans and converts a name to be kfp V2 compatible.

Taken from https://github.com/kubeflow/pipelines/blob/
cfe671c485d4ee8514290ee81ca2785e8bda5c9b/sdk/python/kfp/dsl/utils.py#L52
"""
return (
re.sub(
"-+",
"-",
re.sub("[^-0-9a-z]+", "-", self._specification["name"].lower()),
)
.lstrip("-")
.rstrip("-")
)
def sanitized_component_name(self, name) -> str:
"""Cleans and converts a component name."""
return name.lower().replace(" ", "_")

@property
def description(self):
Expand Down Expand Up @@ -536,8 +517,8 @@ def outer_produces(self) -> t.Mapping[str, Field]:

@property
def component_folder_name(self) -> str:
"""Cleans and converts a name to a proper folder name."""
return self._component_spec.component_folder_name
"""Get the component folder name."""
return self._component_spec.name

@property
def previous_index(self) -> t.Optional[str]:
Expand Down
50 changes: 29 additions & 21 deletions src/fondant/pipeline/compiler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import logging
import os
import re
import shlex
import tempfile
import textwrap
Expand Down Expand Up @@ -174,7 +175,7 @@ def _generate_spec(

component_cache_key = None

for component_name, component in pipeline._graph.items():
for component_id, component in pipeline._graph.items():
component_op = component["operation"]

component_cache_key = component_op.get_component_cache_key(
Expand All @@ -185,11 +186,11 @@ def _generate_spec(
pipeline_name=pipeline.name,
run_id=run_id,
base_path=path,
component_id=component_name,
component_id=component_id,
cache_key=component_cache_key,
)

logger.info(f"Compiling service for {component_name}")
logger.info(f"Compiling service for {component_id}")

entrypoint = self._build_entrypoint(component_op.image)

Expand All @@ -201,7 +202,7 @@ def _generate_spec(
[
"--output_manifest_path",
f"{path}/{metadata.pipeline_name}/{metadata.run_id}/"
f"{component_name}/manifest.json",
f"{component_id}/manifest.json",
],
)

Expand Down Expand Up @@ -239,7 +240,7 @@ def _generate_spec(
f"{DASK_DIAGNOSTIC_DASHBOARD_PORT}:{DASK_DIAGNOSTIC_DASHBOARD_PORT}",
)

services[component_name] = {
services[component_id] = {
"entrypoint": entrypoint,
"command": command,
"depends_on": depends_on,
Expand All @@ -250,26 +251,26 @@ def _generate_spec(
},
}

self._set_configuration(services, component_op, component_name)
self._set_configuration(services, component_op, component_id)

if component_op.dockerfile_path is not None:
logger.info(
f"Found Dockerfile for {component_name}, adding build step.",
f"Found Dockerfile for {component_id}, adding build step.",
)
services[component_name]["build"] = {
services[component_id]["build"] = {
"context": str(component_op.component_dir.absolute()),
"args": build_args,
}
else:
services[component_name]["image"] = component_op.component_spec.image
services[component_id]["image"] = component_op.component_spec.image

return {
"name": pipeline.name,
"version": "3.8",
"services": services,
}

def _set_configuration(self, services, fondant_component_operation, component_name):
def _set_configuration(self, services, fondant_component_operation, component_id):
resources_dict = fondant_component_operation.resources.to_dict()

accelerator_name = resources_dict.pop("accelerator_name")
Expand All @@ -288,7 +289,7 @@ def _set_configuration(self, services, fondant_component_operation, component_na
raise InvalidPipelineDefinition(msg)

if accelerator_name == "GPU":
services[component_name]["deploy"] = {
services[component_id]["deploy"] = {
"resources": {
"reservations": {
"devices": [
Expand Down Expand Up @@ -353,48 +354,55 @@ def from_fondant_component_spec(
},
}

cleaned_component_name = fondant_component.sanitized_component_name

kfp_safe_name = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice name! Makes it clear.

re.sub(
"-+",
"-",
re.sub("[^-0-9a-z]+", "-", fondant_component.name.lower()),
)
.lstrip("-")
.rstrip("-")
)
specification = {
"components": {
"comp-"
+ cleaned_component_name: {
"executorLabel": "exec-" + cleaned_component_name,
+ kfp_safe_name: {
"executorLabel": "exec-" + kfp_safe_name,
"inputDefinitions": input_definitions,
},
},
"deploymentSpec": {
"executors": {
"exec-"
+ cleaned_component_name: {
+ kfp_safe_name: {
"container": {
"command": command,
"image": image_uri,
},
},
},
},
"pipelineInfo": {"name": cleaned_component_name},
"pipelineInfo": {"name": kfp_safe_name},
"root": {
"dag": {
"tasks": {
cleaned_component_name: {
kfp_safe_name: {
"cachingOptions": {"enableCache": True},
"componentRef": {"name": "comp-" + cleaned_component_name},
"componentRef": {"name": "comp-" + kfp_safe_name},
"inputs": {
"parameters": {
param: {"componentInputParameter": param}
for param in input_definitions["parameters"]
},
},
"taskInfo": {"name": cleaned_component_name},
"taskInfo": {"name": kfp_safe_name},
},
},
},
"inputDefinitions": input_definitions,
},
"schemaVersion": "2.1.0",
"sdkVersion": "kfp-2.0.1",
"sdkVersion": "kfp-2.6.0",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any side effects by increasing the version?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, now it is in line with the KFP version we use now.

}
return cls(specification)

Expand Down
23 changes: 11 additions & 12 deletions src/fondant/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ class ComponentOp:

def __init__(
self,
name: str,
image: Image,
component_spec: ComponentSpec,
*,
Expand All @@ -152,7 +151,6 @@ def __init__(
resources: t.Optional[Resources] = None,
component_dir: t.Optional[Path] = None,
) -> None:
self.name = name
self.image = image
self.component_spec = component_spec
self.input_partition_rows = input_partition_rows
Expand Down Expand Up @@ -195,13 +193,11 @@ def from_component_yaml(cls, path, **kwargs) -> "ComponentOp":
component_spec = ComponentSpec.from_file(
component_dir / cls.COMPONENT_SPEC_NAME,
)
name = component_spec.component_folder_name

image = Image(
base_image=component_spec.image,
)
return cls(
name=name,
image=image,
component_spec=component_spec,
component_dir=component_dir,
Expand Down Expand Up @@ -233,7 +229,6 @@ def from_ref(cls, ref: t.Any, **kwargs) -> "ComponentOp":
)

operation = cls(
name,
image,
component_spec,
**kwargs,
Expand Down Expand Up @@ -278,7 +273,7 @@ def _configure_caching_from_image_tag(

if image_tag == "latest":
logger.warning(
f"Component `{self.name}` has an image tag set to latest. "
f"Component `{self.component_spec.name}` has an image tag set to latest. "
f"Caching for the component will be disabled to prevent"
f" unpredictable behavior due to images updates",
)
Expand Down Expand Up @@ -308,6 +303,10 @@ def _get_registry_path(name: str) -> Path:
raise ValueError(msg)
return component_dir

@property
def component_name(self) -> str:
return self.component_spec.name

def get_component_cache_key(
self,
previous_component_cache: t.Optional[str] = None,
Expand Down Expand Up @@ -393,11 +392,11 @@ def register_operation(
output_dataset: t.Optional["Dataset"],
) -> None:
dependencies = []
for operation_name, info in self._graph.items():
for component_name, info in self._graph.items():
if info["output_dataset"] == input_dataset:
dependencies.append(operation_name)
dependencies.append(component_name)

self._graph[operation.name] = {
self._graph[operation.component_name] = {
"operation": operation,
"dependencies": dependencies,
"output_dataset": output_dataset,
Expand Down Expand Up @@ -456,7 +455,7 @@ def read(
pipeline_name=self.name,
base_path=self.base_path,
run_id=self.get_run_id(),
component_id=operation.name,
component_id=operation.component_name,
)
dataset = Dataset(manifest, pipeline=self)

Expand Down Expand Up @@ -549,8 +548,8 @@ def _validate_pipeline_definition(self, run_id: str):
) in operation_spec.outer_consumes.items():
if component_field_name not in manifest.fields:
msg = (
f"Component '{component_op.name}' is trying to invoke the field "
f"'{component_field_name}', which has not been defined or created "
f"Component '{component_op.component_name}' is trying to invoke the"
f"field '{component_field_name}', which has not been defined or created"
f"in the previous components. \n"
f"Available field names: {list(manifest.fields.keys())}"
)
Expand Down
7 changes: 2 additions & 5 deletions tests/component/test_data_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,7 @@ def test_write_dataset(
data_writer.write_dataframe(dataframe, dask_client)
# read written data and assert
dataframe = dd.read_parquet(
temp_dir
/ manifest.pipeline_name
/ manifest.run_id
/ component_spec.component_folder_name,
temp_dir / manifest.pipeline_name / manifest.run_id / component_spec.name,
)
assert len(dataframe) == NUMBER_OF_TEST_ROWS
assert list(dataframe.columns) == columns
Expand Down Expand Up @@ -181,7 +178,7 @@ def test_write_dataset_custom_produces(
temp_dir
/ manifest.pipeline_name
/ manifest.run_id
/ component_spec_produces.component_folder_name,
/ component_spec_produces.name,
)
assert len(dataframe) == NUMBER_OF_TEST_ROWS
assert list(dataframe.columns) == expected_columns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,4 @@ root:
name: example-component
inputDefinitions: *id001
schemaVersion: 2.1.0
sdkVersion: kfp-2.0.1
sdkVersion: kfp-2.6.0
2 changes: 1 addition & 1 deletion tests/core/examples/component_specs/valid_component.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Example component
name: example_component
image: example_component:latest
description: This is an example component
tags:
Expand Down
4 changes: 2 additions & 2 deletions tests/core/test_component_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def test_attribute_access(valid_fondant_schema):
"""
fondant_component = ComponentSpec.from_dict(valid_fondant_schema)

assert fondant_component.name == "Example component"
assert fondant_component.name == "example_component"
assert fondant_component.description == "This is an example component"
assert fondant_component.consumes["images"].type == Type("binary")
assert fondant_component.consumes["embeddings"].type == Type.list(
Expand All @@ -91,7 +91,7 @@ def test_component_spec_no_args(valid_fondant_schema_no_args):
"""Test that a component spec without args is supported."""
fondant_component = ComponentSpec.from_dict(valid_fondant_schema_no_args)

assert fondant_component.name == "Example component"
assert fondant_component.name == "example_component"
assert fondant_component.description == "This is an example component"
assert fondant_component.args == fondant_component.default_arguments

Expand Down
2 changes: 1 addition & 1 deletion tests/core/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def test_evolve_manifest():

assert output_manifest.base_path == input_manifest.base_path
assert output_manifest.run_id == run_id
assert output_manifest.index.location == f"/{run_id}/{spec.component_folder_name}"
assert output_manifest.index.location == f"/{run_id}/{spec.name}"
assert output_manifest.fields["captions"].type.name == "string"


Expand Down
2 changes: 1 addition & 1 deletion tests/core/test_manifest_evolution.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,5 +135,5 @@ def test_component_spec_location_update():
)

assert evolved_manifest.index.location.endswith(
component_spec.component_folder_name,
component_spec.name,
)
Loading
Loading