diff --git a/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile b/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile index 70d8693b7..faf76baa7 100644 --- a/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile +++ b/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile @@ -3,7 +3,7 @@ FROM --platform=linux/amd64 python:3.8-slim ## System dependencies RUN apt-get update && \ apt-get upgrade -y && \ - apt-get install git -y + apt-get install git graphviz -y # install requirements COPY requirements.txt / @@ -12,7 +12,7 @@ RUN pip3 install --no-cache-dir -r requirements.txt # Install Fondant # This is split from other requirements to leverage caching -ARG FONDANT_VERSION=main +ARG FONDANT_VERSION=d764ad42887b628883fe916a8b0605a13c6b08e0 RUN pip3 install fondant[aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION} # Set the working directory to the component folder diff --git a/pyproject.toml b/pyproject.toml index 7b837de9f..8d80b242b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,7 +42,9 @@ classifiers = [ [tool.poetry.dependencies] python = ">= 3.8" -dask = {extras = ["dataframe"], version = ">= 2023.4.1"} +dask = {extras = ["dataframe", "distributed"], version = ">= 2023.4.1"} +graphviz = ">= 0.20.1" +bokeh = ">= 3.1.1" importlib-resources = { version = ">= 1.3", python = "<3.9" } jsonschema = ">= 4.18" pyarrow = ">= 11.0.0" diff --git a/src/fondant/component.py b/src/fondant/component.py index 724f4ea8b..f5c743e3e 100644 --- a/src/fondant/component.py +++ b/src/fondant/component.py @@ -41,6 +41,13 @@ def transform(self, dataframe: dd.DataFrame) -> dd.DataFrame: raise NotImplementedError +class DaskWriteComponent(BaseComponent): + """Component that accepts a Dask DataFrame and writes its contents.""" + + def write(self, dataframe: dd.DataFrame) -> None: + raise NotImplementedError + + class PandasTransformComponent(BaseComponent): """Component that transforms the incoming dataset partition per partition as a pandas DataFrame. @@ -57,12 +64,5 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: raise NotImplementedError -class DaskWriteComponent(BaseComponent): - """Component that accepts a Dask DataFrame and writes its contents.""" - - def write(self, dataframe: dd.DataFrame) -> None: - raise NotImplementedError - - Component = t.TypeVar("Component", bound=BaseComponent) """Component type which can represents any of the subclasses of BaseComponent""" diff --git a/src/fondant/component_spec.py b/src/fondant/component_spec.py index ad204c6e6..09e26be08 100644 --- a/src/fondant/component_spec.py +++ b/src/fondant/component_spec.py @@ -256,6 +256,13 @@ def from_fondant_component_spec( "type": "JsonObject", "default": "None", }, + { + "name": "output_partition_size", + "description": "The size of the output partition size, defaults" + " to 250MB. Set to None to disable partitioning the output", + "type": "String", + "default": "250MB", + }, *( { "name": arg.name, @@ -285,6 +292,8 @@ def from_fondant_component_spec( {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, + "--output_partition_size", + {"inputValue": "output_partition_size"}, *cls._dump_args(fondant_component.args.values()), "--output_manifest_path", {"outputPath": "output_manifest_path"}, diff --git a/src/fondant/data_io.py b/src/fondant/data_io.py index b3c519273..8494dd55e 100644 --- a/src/fondant/data_io.py +++ b/src/fondant/data_io.py @@ -1,4 +1,5 @@ import logging +import os import typing as t import dask.dataframe as dd @@ -14,9 +15,37 @@ class DataIO: def __init__(self, *, manifest: Manifest, component_spec: ComponentSpec) -> None: self.manifest = manifest self.component_spec = component_spec + self.diagnostics_path = ( + f"{self.manifest.base_path}/" f"{self.manifest.component_id}" + ) class DaskDataLoader(DataIO): + def __init__(self, *, manifest: Manifest, component_spec: ComponentSpec): + super().__init__(manifest=manifest, component_spec=component_spec) + + @staticmethod + def partition_loaded_dataframe(dataframe: dd.DataFrame) -> dd.DataFrame: + """ + Function that partitions the loaded dataframe depending on its partitions and the available + workers + Returns: + The partitioned dataframe. + """ + n_partitions = dataframe.npartitions + n_workers = os.cpu_count() + logger.info( + f"The number of partitions of the input dataframe is {n_partitions}. The " + f"available number of workers is {n_workers}.", + ) + if n_partitions < n_workers: + dataframe = dataframe.repartition(npartitions=n_partitions) + logger.info( + "Repartitioning the data before transforming to maximize worker usage", + ) + + return dataframe + def _load_subset(self, subset_name: str, fields: t.List[str]) -> dd.DataFrame: """ Function that loads a subset from the manifest as a Dask dataframe. @@ -80,12 +109,37 @@ def load_dataframe(self) -> dd.DataFrame: how="left", ) + dataframe = self.partition_loaded_dataframe(dataframe) + logging.info(f"Columns of dataframe: {list(dataframe.columns)}") return dataframe class DaskDataWriter(DataIO): + def __init__( + self, + *, + manifest: Manifest, + component_spec: ComponentSpec, + output_partition_size: t.Optional[str] = None, + ): + super().__init__(manifest=manifest, component_spec=component_spec) + self.output_partition_size = output_partition_size + + def partition_written_dataframe(self, dataframe: dd.DataFrame) -> dd.DataFrame: + """ + Function that partitions the written dataframe to smaller partitions based on a given + partition size. + """ + if self.output_partition_size: + dataframe = dataframe.repartition(partition_size=self.output_partition_size) + logger.info( + f"repartitioning the written data such that the memory per partition is" + f" {self.output_partition_size}", + ) + return dataframe + def write_dataframe(self, dataframe: dd.DataFrame) -> None: write_tasks = [] @@ -159,6 +213,8 @@ def _write_subset( schema = {field.name: field.type.value for field in subset_spec.fields.values()} + dataframe = self.partition_written_dataframe(dataframe) + return self._create_write_task(dataframe, location=location, schema=schema) @staticmethod diff --git a/src/fondant/exceptions.py b/src/fondant/exceptions.py index 8ce549ed7..019ab0644 100644 --- a/src/fondant/exceptions.py +++ b/src/fondant/exceptions.py @@ -15,6 +15,10 @@ class InvalidComponentSpec(ValidationError, FondantException): """Thrown when a component spec cannot be validated against the schema.""" +class InvalidComponentOpDefinition(ValidationError, FondantException): + """Thrown when a componentOp is invalid.""" + + class InvalidPipelineDefinition(ValidationError, FondantException): """Thrown when a pipeline definition is invalid.""" diff --git a/src/fondant/executor.py b/src/fondant/executor.py index aca8a235d..1aa9a1b32 100644 --- a/src/fondant/executor.py +++ b/src/fondant/executor.py @@ -28,6 +28,20 @@ logger = logging.getLogger(__name__) +DASK_LOGS_TO_SUPRESS = [ + "distributed.process", + "distributed.scheduler", + "distributed.nanny", + "distributed.worker", + "distributed.core", + "distributed.comm.tcp", + "distributed.batched", +] + +for logger_name in DASK_LOGS_TO_SUPRESS: + logger = logging.getLogger(logger_name) + logger.setLevel(logging.WARNING) + class Executor(t.Generic[Component]): """An executor executes a Component.""" @@ -39,13 +53,15 @@ def __init__( input_manifest_path: t.Union[str, Path], output_manifest_path: t.Union[str, Path], metadata: t.Dict[str, t.Any], - user_arguments: t.Dict[str, Argument], + user_arguments: t.Dict[str, t.Any], + output_partition_size: t.Optional[str] = "250MB", ) -> None: self.spec = spec self.input_manifest_path = input_manifest_path self.output_manifest_path = output_manifest_path self.metadata = metadata self.user_arguments = user_arguments + self.output_partition_size = output_partition_size @classmethod def from_file( @@ -85,7 +101,7 @@ def from_spec(cls, component_spec: ComponentSpec) -> "Executor": input_manifest_path = args_dict.pop("input_manifest_path") output_manifest_path = args_dict.pop("output_manifest_path") metadata = args_dict.pop("metadata") - + output_partition_size = args_dict.pop("output_partition_size") metadata = json.loads(metadata) if metadata else {} return cls( @@ -94,6 +110,7 @@ def from_spec(cls, component_spec: ComponentSpec) -> "Executor": output_manifest_path=output_manifest_path, metadata=metadata, user_arguments=args_dict, + output_partition_size=output_partition_size, ) @classmethod @@ -166,7 +183,12 @@ def _execute_component( def _write_data(self, dataframe: dd.DataFrame, *, manifest: Manifest): """Create a data writer given a manifest and writes out the index and subsets.""" - data_writer = DaskDataWriter(manifest=manifest, component_spec=self.spec) + data_writer = DaskDataWriter( + manifest=manifest, + component_spec=self.spec, + output_partition_size=self.output_partition_size, + ) + data_writer.write_dataframe(dataframe) def execute(self, component_cls: t.Type[Component]) -> None: diff --git a/src/fondant/pipeline.py b/src/fondant/pipeline.py index 9693af05f..6fa7e1a6b 100644 --- a/src/fondant/pipeline.py +++ b/src/fondant/pipeline.py @@ -12,7 +12,7 @@ from importlib_resources import files # type: ignore from fondant.component_spec import ComponentSpec -from fondant.exceptions import InvalidPipelineDefinition +from fondant.exceptions import InvalidComponentOpDefinition, InvalidPipelineDefinition from fondant.import_utils import is_kfp_available from fondant.manifest import Manifest @@ -32,6 +32,8 @@ class ComponentOp: Arguments: component_dir: The path to the component directory. arguments: A dictionary containing the argument name and value for the operation. + output_partition_size: the size of the output written dataset. Defaults to 250MB, + set to None to disable partitioning the output, number_of_gpus: The number of gpus to assign to the operation node_pool_name: The name of the node pool to which the operation will be assigned. p_volumes: Collection of persistent volumes in a Kubernetes cluster. Keys are mount paths, @@ -57,13 +59,15 @@ def __init__( component_dir: t.Union[str, Path], *, arguments: t.Optional[t.Dict[str, t.Any]] = None, + output_partition_size: t.Optional[str] = "250MB", number_of_gpus: t.Optional[int] = None, node_pool_name: t.Optional[str] = None, p_volumes: t.Optional[t.Dict[str, k8s_client.V1Volume]] = None, ephemeral_storage_size: t.Optional[str] = None, ) -> None: self.component_dir = Path(component_dir) - self.arguments = arguments or {} + self.output_partitioning_size = output_partition_size + self.arguments = self._set_arguments(arguments) self.component_spec = ComponentSpec.from_file( self.component_dir / self.COMPONENT_SPEC_NAME, @@ -75,6 +79,36 @@ def __init__( self.p_volumes = p_volumes self.ephemeral_storage_size = ephemeral_storage_size + def _set_arguments( + self, + arguments: t.Optional[t.Dict[str, t.Any]], + ) -> t.Dict[str, t.Any]: + """Set component arguments based on provided arguments and relevant ComponentOp + parameters. + """ + + def _validate_file_size(file_size): + # Define the regular expression pattern to match file size notations: KB, MB, GB or TB + pattern = r"^\d+(?:\.\d+)?(?:KB|MB|GB|TB)$" + + # Use the re.match() function to check if the provided file_size matches the pattern + return bool(re.match(pattern, file_size, re.I)) + + arguments = arguments or {} + + if self.output_partitioning_size is not None: + if not _validate_file_size(file_size=str(self.output_partitioning_size)): + msg = ( + f"Invalid partition size defined `{self.output_partitioning_size}`," + " partition size must be a string followed by a file size notation" + " e.g. ('250MB')" + ) + raise InvalidComponentOpDefinition(msg) + + arguments["output_partition_size"] = self.output_partitioning_size + + return arguments + @property def dockerfile_path(self) -> t.Optional[Path]: path = self.component_dir / "Dockerfile" diff --git a/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml b/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml index 46ed2260b..f1b5364db 100644 --- a/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml +++ b/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml @@ -9,6 +9,8 @@ services: - /foo/bar/first_component/manifest.json - --storage_args - a dummy string arg + - --output_partition_size + - 250MB - --component_spec - '{"name": "First component", "description": "This is an example component", "image": "example_component:latest", "produces": {"images": {"fields": {"data": @@ -48,6 +50,8 @@ services: - a dummy string arg - --some_list - '[1, 2, 3]' + - --output_partition_size + - 10MB - --component_spec - '{"name": "Third component", "description": "This is an example component", "image": "example_component:latest", "consumes": {"images": {"fields": {"data": @@ -63,4 +67,4 @@ services: second_component: condition: service_completed_successfully volumes: [] -version: '3.8' \ No newline at end of file +version: '3.8' diff --git a/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml b/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml index 44bfc162e..4e02ca245 100644 --- a/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml +++ b/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml @@ -9,6 +9,8 @@ services: - /foo/bar/first_component/manifest.json - --storage_args - a dummy string arg + - --output_partition_size + - 250MB - --component_spec - '{"name": "First component", "description": "This is an example component", "image": "example_component:latest", "produces": {"images": {"fields": {"data": @@ -26,6 +28,8 @@ services: - '0' - --padding - '0' + - --output_partition_size + - 250MB - --component_spec - '{"name": "Image cropping", "description": "Component that removes single-colored borders around images and crops them appropriately", "image": "ghcr.io/ml6team/image_cropping:dev", diff --git a/tests/example_specs/component_specs/kubeflow_component.yaml b/tests/example_specs/component_specs/kubeflow_component.yaml index 5f2e33690..57ccdb565 100644 --- a/tests/example_specs/component_specs/kubeflow_component.yaml +++ b/tests/example_specs/component_specs/kubeflow_component.yaml @@ -11,6 +11,11 @@ inputs: description: The component specification as a dictionary type: JsonObject default: None +- name: output_partition_size + description: The size of the output partition size, defaults to 250MB. Set to + None to disable partitioning the output + type: String + default: 250MB - name: storage_args description: Storage arguments type: String @@ -30,7 +35,9 @@ implementation: - inputValue: metadata - --component_spec - inputValue: component_spec + - --output_partition_size + - inputValue: output_partition_size - --storage_args - inputValue: storage_args - --output_manifest_path - - outputPath: output_manifest_path \ No newline at end of file + - outputPath: output_manifest_path diff --git a/tests/test_compiler.py b/tests/test_compiler.py index 7ad7382c9..d768805be 100644 --- a/tests/test_compiler.py +++ b/tests/test_compiler.py @@ -17,10 +17,12 @@ ComponentOp( Path(COMPONENTS_PATH / "example_1" / "first_component"), arguments={"storage_args": "a dummy string arg"}, + output_partition_size="250MB", ), ComponentOp( Path(COMPONENTS_PATH / "example_1" / "second_component"), arguments={"storage_args": "a dummy string arg"}, + output_partition_size=None, ), ComponentOp( Path(COMPONENTS_PATH / "example_1" / "fourth_component"), @@ -28,6 +30,7 @@ "storage_args": "a dummy string arg", "some_list": [1, 2, 3], }, + output_partition_size="10MB", ), ], ), diff --git a/tests/test_component.py b/tests/test_component.py index 5fa862d8f..707ea88c0 100644 --- a/tests/test_component.py +++ b/tests/test_component.py @@ -126,7 +126,7 @@ def _process_dataset(self, manifest: Manifest) -> t.Union[None, dd.DataFrame]: @pytest.mark.usefixtures("_patched_data_writing") def test_load_component(): - # Mock CLI argumentsload + # Mock CLI arguments load sys.argv = [ "", "--metadata", @@ -175,6 +175,8 @@ def test_dask_transform_component(): "success", "--value", "1", + "--output_partition_size", + "250MB", "--output_manifest_path", str(components_path / "output_manifest.json"), "--component_spec", diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index cbd19e843..6b58c7abf 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -3,7 +3,7 @@ import pytest import yaml -from fondant.exceptions import InvalidPipelineDefinition +from fondant.exceptions import InvalidComponentOpDefinition, InvalidPipelineDefinition from fondant.pipeline import ComponentOp, Pipeline valid_pipeline_path = Path(__file__).parent / "example_pipelines/valid_pipeline" @@ -23,6 +23,49 @@ def default_pipeline_args(): } +@pytest.mark.parametrize( + "valid_pipeline_example", + [ + ( + "example_1", + ["first_component", "second_component", "third_component"], + ), + ], +) +def test_component_op( + valid_pipeline_example, +): + component_args = {"storage_args": "a dummy string arg"} + example_dir, component_names = valid_pipeline_example + components_path = Path(valid_pipeline_path / example_dir) + + ComponentOp( + Path(components_path / component_names[0]), + arguments=component_args, + output_partition_size=None, + ) + + ComponentOp( + Path(components_path / component_names[0]), + arguments=component_args, + output_partition_size="250MB", + ) + + with pytest.raises(InvalidComponentOpDefinition): + ComponentOp( + Path(components_path / component_names[0]), + arguments=component_args, + output_partition_size=10, + ) + + with pytest.raises(InvalidComponentOpDefinition): + ComponentOp( + Path(components_path / component_names[0]), + arguments=component_args, + output_partition_size="250 MB", + ) + + @pytest.mark.parametrize( "valid_pipeline_example", [