From 2683b937dadab33dc60a5509f1a5e699ee698673 Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Tue, 11 Jul 2023 13:13:17 +0200 Subject: [PATCH 01/21] optimize image download component --- components/download_images/src/main.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/components/download_images/src/main.py b/components/download_images/src/main.py index 017001e0d..39574415a 100644 --- a/components/download_images/src/main.py +++ b/components/download_images/src/main.py @@ -7,6 +7,7 @@ """ import io import logging +import os import traceback import urllib @@ -124,8 +125,9 @@ def transform( max_aspect_ratio=max_aspect_ratio, ) - # Remove duplicates from laion retrieval + # Remove duplicates from laion retrieval (global) dataframe = dataframe.drop_duplicates() + dataframe = dataframe.repartition(npartitions=os.cpu_count()) dataframe = dataframe.apply( lambda example: download_image_with_retry( @@ -146,7 +148,7 @@ def transform( # Remove images that could not be fetched dataframe = dataframe.dropna() - + dataframe = dataframe.repartition(partition_size="250MB") return dataframe From 1b7521cb9bf76b8a7323609217db9865b79bdacc Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Mon, 17 Jul 2023 17:54:40 +0200 Subject: [PATCH 02/21] add dask diagnostics to fondant --- pyproject.toml | 3 ++- src/fondant/data_io.py | 43 ++++++++++++++++++++++++++---------------- 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index d205a96bf..8e4c2c9a1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,7 +42,8 @@ classifiers = [ [tool.poetry.dependencies] python = ">= 3.8" -dask = {extras = ["dataframe"], version = ">= 2023.4.1"} +dask = {extras = ["dataframe", "distributed"], version = ">= 2023.4.1"} +graphviz = ">= 0.20.1" importlib-resources = { version = ">= 1.3", python = "<3.9" } jsonschema = ">= 4.18" pyarrow = ">= 11.0.0" diff --git a/src/fondant/data_io.py b/src/fondant/data_io.py index b3c519273..126bdf951 100644 --- a/src/fondant/data_io.py +++ b/src/fondant/data_io.py @@ -3,6 +3,7 @@ import dask.dataframe as dd from dask.diagnostics import ProgressBar +from dask.distributed import performance_report from fondant.component_spec import ComponentSpec, ComponentSubset from fondant.manifest import Manifest @@ -14,6 +15,11 @@ class DataIO: def __init__(self, *, manifest: Manifest, component_spec: ComponentSpec) -> None: self.manifest = manifest self.component_spec = component_spec + self.diagnostics_path = f"{self.manifest.base_path}/" \ + f"{self.manifest.component_id}/" \ + f"{self.manifest.run_id} + self.performance_report_path = f"{self.diagnostics_path}/dask_report.html" + self.execution_graph_path = f"{self.diagnostics_path}/execution_graph.png" class DaskDataLoader(DataIO): @@ -87,6 +93,9 @@ def load_dataframe(self) -> dd.DataFrame: class DaskDataWriter(DataIO): def write_dataframe(self, dataframe: dd.DataFrame) -> None: + logging.info(f"Saving execution graph to {self.execution_graph_path}") + dataframe.visualize(self.execution_graph_path) + write_tasks = [] dataframe.index = dataframe.index.rename("id").astype("string") @@ -114,15 +123,17 @@ def write_dataframe(self, dataframe: dd.DataFrame) -> None: write_tasks.append(write_subset_task) with ProgressBar(): - logging.info("Writing data...") - dd.compute(*write_tasks) + with performance_report(filename=self.performance_report_path): + logging.info("Writing data...") + logging.info(f"Saving performance report to {self.performance_report_path}") + dd.compute(*write_tasks) @staticmethod def _extract_subset_dataframe( - dataframe: dd.DataFrame, - *, - subset_name: str, - subset_spec: ComponentSubset, + dataframe: dd.DataFrame, + *, + subset_name: str, + subset_spec: ComponentSubset, ) -> dd.DataFrame: """Create subset dataframe to save with the original field name as the column name.""" # Create a new dataframe with only the columns needed for the output subset @@ -140,17 +151,17 @@ def _extract_subset_dataframe( # Remove the subset prefix from the column names subset_df = subset_df.rename( - columns={col: col[(len(f"{subset_name}_")) :] for col in subset_columns}, + columns={col: col[(len(f"{subset_name}_")):] for col in subset_columns}, ) return subset_df def _write_subset( - self, - dataframe: dd.DataFrame, - *, - subset_name: str, - subset_spec: ComponentSubset, + self, + dataframe: dd.DataFrame, + *, + subset_name: str, + subset_spec: ComponentSubset, ) -> dd.core.Scalar: if subset_name == "index": location = self.manifest.index.location @@ -163,10 +174,10 @@ def _write_subset( @staticmethod def _create_write_task( - dataframe: dd.DataFrame, - *, - location: str, - schema: t.Dict[str, str], + dataframe: dd.DataFrame, + *, + location: str, + schema: t.Dict[str, str], ) -> dd.core.Scalar: """ Creates a delayed Dask task to upload the given DataFrame to the remote storage location From 1c53aa1fc63f854f3567ab43da896e285df7538b Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Tue, 18 Jul 2023 14:41:04 +0200 Subject: [PATCH 03/21] add repartitioning strategy to fondant --- src/fondant/data_io.py | 45 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/src/fondant/data_io.py b/src/fondant/data_io.py index 126bdf951..0b3fb41e8 100644 --- a/src/fondant/data_io.py +++ b/src/fondant/data_io.py @@ -1,5 +1,7 @@ import logging +import os import typing as t +from abc import abstractmethod import dask.dataframe as dd from dask.diagnostics import ProgressBar @@ -17,12 +19,34 @@ def __init__(self, *, manifest: Manifest, component_spec: ComponentSpec) -> None self.component_spec = component_spec self.diagnostics_path = f"{self.manifest.base_path}/" \ f"{self.manifest.component_id}/" \ - f"{self.manifest.run_id} + f"{self.manifest.run_id}" self.performance_report_path = f"{self.diagnostics_path}/dask_report.html" self.execution_graph_path = f"{self.diagnostics_path}/execution_graph.png" + raise NotImplementedError + class DaskDataLoader(DataIO): + + @staticmethod + def partition_loaded_dataframe(dataframe: dd.DataFrame) -> dd.DataFrame: + """ + Function that partitions the loaded dataframe depending on its partitions and the available + workers + Returns: + The partitioned dataframe + """ + n_partitions = dataframe.npartitions + n_workers = os.cpu_count() + dataframe = dataframe.repartition(npartitions=n_partitions) + logger.info(f"The number of partitions of the input dataframe is {n_partitions}. The " + f"available number of workers is {n_workers}.") + if n_partitions < n_workers: + dataframe = dataframe.repartition(npartitions=n_partitions) + logger.info("Repartitioning the data before transforming to maximize worker usage") + + return dataframe + def _load_subset(self, subset_name: str, fields: t.List[str]) -> dd.DataFrame: """ Function that loads a subset from the manifest as a Dask dataframe. @@ -86,14 +110,31 @@ def load_dataframe(self) -> dd.DataFrame: how="left", ) + dataframe = self.partition_loaded_dataframe(dataframe) + logging.info(f"Columns of dataframe: {list(dataframe.columns)}") return dataframe class DaskDataWriter(DataIO): + + @staticmethod + def partition_written_dataframe(dataframe: dd.DataFrame, partition_size="250MB") -> dd.DataFrame: + """ + Function that partitions the written dataframe to smaller partitions based on a given + partition size. + """ + + dataframe = dataframe.repartition(partition_size=partition_size) + logger.info(f"repartitioning the written data such that the memory per partition is" + f" {partition_size}") + return dataframe + def write_dataframe(self, dataframe: dd.DataFrame) -> None: + logging.info(f"Saving execution graph to {self.execution_graph_path}") + dataframe.visualize(self.execution_graph_path) write_tasks = [] @@ -170,6 +211,8 @@ def _write_subset( schema = {field.name: field.type.value for field in subset_spec.fields.values()} + dataframe = self.partition_written_dataframe(dataframe) + return self._create_write_task(dataframe, location=location, schema=schema) @staticmethod From 897f5f839f081e83fef3f70638cf7c81bbfd9e97 Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Tue, 18 Jul 2023 14:56:26 +0200 Subject: [PATCH 04/21] remove changes from component --- components/download_images/src/main.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/components/download_images/src/main.py b/components/download_images/src/main.py index 5201270b8..51b9a2d53 100644 --- a/components/download_images/src/main.py +++ b/components/download_images/src/main.py @@ -7,7 +7,6 @@ """ import io import logging -import os import traceback import urllib @@ -124,10 +123,6 @@ def transform( max_aspect_ratio=max_aspect_ratio, ) - # Remove duplicates from laion retrieval (global) - dataframe = dataframe.drop_duplicates() - dataframe = dataframe.repartition(npartitions=os.cpu_count()) - dataframe = dataframe.apply( lambda example: download_image_with_retry( url=example.images_url, @@ -147,7 +142,7 @@ def transform( # Remove images that could not be fetched dataframe = dataframe.dropna() - dataframe = dataframe.repartition(partition_size="250MB") + return dataframe From 2edcd91bb7777278ed41c474672a966e0c47872a Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Wed, 19 Jul 2023 09:29:09 +0200 Subject: [PATCH 05/21] change save path diagnostics --- .../components/generate_prompts/Dockerfile | 4 ++-- .../controlnet-interior-design/pipeline.py | 23 +++++++++++-------- src/fondant/data_io.py | 14 ++++++----- 3 files changed, 23 insertions(+), 18 deletions(-) diff --git a/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile b/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile index 70d8693b7..7072dd29e 100644 --- a/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile +++ b/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile @@ -3,7 +3,7 @@ FROM --platform=linux/amd64 python:3.8-slim ## System dependencies RUN apt-get update && \ apt-get upgrade -y && \ - apt-get install git -y + apt-get install git graphviz -y # install requirements COPY requirements.txt / @@ -12,7 +12,7 @@ RUN pip3 install --no-cache-dir -r requirements.txt # Install Fondant # This is split from other requirements to leverage caching -ARG FONDANT_VERSION=main +ARG FONDANT_VERSION=0c482339c02b9de58b9f4d70c9877314200c0400 RUN pip3 install fondant[aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION} # Set the working directory to the component folder diff --git a/examples/pipelines/controlnet-interior-design/pipeline.py b/examples/pipelines/controlnet-interior-design/pipeline.py index 9b39a7379..73080694b 100644 --- a/examples/pipelines/controlnet-interior-design/pipeline.py +++ b/examples/pipelines/controlnet-interior-design/pipeline.py @@ -16,7 +16,7 @@ # Define component ops generate_prompts_op = ComponentOp( component_dir="components/generate_prompts", - arguments={"n_rows_to_load": None}, + arguments={"n_rows_to_load": 2}, ) laion_retrieval_op = ComponentOp.from_registry( name="prompt_based_laion_retrieval", @@ -24,11 +24,11 @@ "num_images": 2, "aesthetic_score": 9, "aesthetic_weight": 0.5, - "url": None, + "url": "http://34.91.40.120:1230/knn-service?key=AIzaSyBDPCHlqagh284jz6zXStrgLeTrNrZrDww", }, ) -download_images_op = ComponentOp.from_registry( - name="download_images", +download_images_op = ComponentOp( + component_dir="components/download_images", arguments={ "timeout": 1, "retries": 0, @@ -69,11 +69,14 @@ }, ) -pipeline = Pipeline(pipeline_name=pipeline_name, base_path=PipelineConfigs.BASE_PATH) +pipeline = Pipeline( + pipeline_name=pipeline_name, + base_path="/home/philippe/Scripts/express/local_artifact/controlnet_2", +) pipeline.add_op(generate_prompts_op) -pipeline.add_op(laion_retrieval_op, dependencies=generate_prompts_op) -pipeline.add_op(download_images_op, dependencies=laion_retrieval_op) -pipeline.add_op(caption_images_op, dependencies=download_images_op) -pipeline.add_op(segment_images_op, dependencies=caption_images_op) -pipeline.add_op(write_to_hub_controlnet, dependencies=segment_images_op) +# pipeline.add_op(laion_retrieval_op, dependencies=generate_prompts_op) +# pipeline.add_op(download_images_op, dependencies=laion_retrieval_op) +# pipeline.add_op(caption_images_op, dependencies=download_images_op) +# pipeline.add_op(segment_images_op, dependencies=caption_images_op) +# pipeline.add_op(write_to_hub_controlnet, dependencies=segment_images_op) diff --git a/src/fondant/data_io.py b/src/fondant/data_io.py index 4299dd26d..869355cf0 100644 --- a/src/fondant/data_io.py +++ b/src/fondant/data_io.py @@ -17,12 +17,15 @@ def __init__(self, *, manifest: Manifest, component_spec: ComponentSpec) -> None self.manifest = manifest self.component_spec = component_spec self.diagnostics_path = ( - f"{self.manifest.base_path}/" - f"{self.manifest.component_id}/" - f"{self.manifest.run_id}" + f"{self.manifest.base_path}/" f"{self.manifest.component_id}" + ) + + self.performance_report_path = ( + f"{self.diagnostics_path}/{self.manifest.run_id}_dask_report.html" + ) + self.execution_graph_path = ( + f"{self.diagnostics_path}/{self.manifest.run_id}_execution_graph.png" ) - self.performance_report_path = f"{self.diagnostics_path}/dask_report.html" - self.execution_graph_path = f"{self.diagnostics_path}/execution_graph.png" class DaskDataLoader(DataIO): @@ -36,7 +39,6 @@ def partition_loaded_dataframe(dataframe: dd.DataFrame) -> dd.DataFrame: """ n_partitions = dataframe.npartitions n_workers = os.cpu_count() - dataframe = dataframe.repartition(npartitions=n_partitions) logger.info( f"The number of partitions of the input dataframe is {n_partitions}. The " f"available number of workers is {n_workers}.", From d63eed74e3e9bdd9495427c50e7bd1d6c8b51ff9 Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Wed, 19 Jul 2023 10:01:45 +0200 Subject: [PATCH 06/21] add client to dataIO --- .../components/generate_prompts/Dockerfile | 2 +- src/fondant/data_io.py | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile b/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile index 7072dd29e..525aa343d 100644 --- a/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile +++ b/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile @@ -12,7 +12,7 @@ RUN pip3 install --no-cache-dir -r requirements.txt # Install Fondant # This is split from other requirements to leverage caching -ARG FONDANT_VERSION=0c482339c02b9de58b9f4d70c9877314200c0400 +ARG FONDANT_VERSION=2edcd91bb7777278ed41c474672a966e0c47872a RUN pip3 install fondant[aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION} # Set the working directory to the component folder diff --git a/src/fondant/data_io.py b/src/fondant/data_io.py index 869355cf0..bec9ec5c9 100644 --- a/src/fondant/data_io.py +++ b/src/fondant/data_io.py @@ -4,13 +4,16 @@ import dask.dataframe as dd from dask.diagnostics import ProgressBar -from dask.distributed import performance_report +from dask.distributed import Client, LocalCluster, performance_report from fondant.component_spec import ComponentSpec, ComponentSubset from fondant.manifest import Manifest logger = logging.getLogger(__name__) +cluster = LocalCluster() +client = Client(cluster) + class DataIO: def __init__(self, *, manifest: Manifest, component_spec: ComponentSpec) -> None: From 51cadfeee0ced6b2402ada83030fdc98d06099b9 Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Wed, 19 Jul 2023 10:08:48 +0200 Subject: [PATCH 07/21] remove progressbar --- .../components/generate_prompts/Dockerfile | 2 +- src/fondant/data_io.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile b/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile index 525aa343d..0d32cfa2a 100644 --- a/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile +++ b/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile @@ -12,7 +12,7 @@ RUN pip3 install --no-cache-dir -r requirements.txt # Install Fondant # This is split from other requirements to leverage caching -ARG FONDANT_VERSION=2edcd91bb7777278ed41c474672a966e0c47872a +ARG FONDANT_VERSION=d63eed74e3e9bdd9495427c50e7bd1d6c8b51ff9 RUN pip3 install fondant[aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION} # Set the working directory to the component folder diff --git a/src/fondant/data_io.py b/src/fondant/data_io.py index bec9ec5c9..cb71130c2 100644 --- a/src/fondant/data_io.py +++ b/src/fondant/data_io.py @@ -3,7 +3,6 @@ import typing as t import dask.dataframe as dd -from dask.diagnostics import ProgressBar from dask.distributed import Client, LocalCluster, performance_report from fondant.component_spec import ComponentSpec, ComponentSubset @@ -172,7 +171,7 @@ def write_dataframe(self, dataframe: dd.DataFrame) -> None: ) write_tasks.append(write_subset_task) - with ProgressBar(), performance_report(filename=self.performance_report_path): + with performance_report(filename=self.performance_report_path): logging.info("Writing data...") logging.info(f"Saving performance report to {self.performance_report_path}") dd.compute(*write_tasks) From 678b3e1ef93e181cb182b39e018d9951446cc9b3 Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Wed, 19 Jul 2023 10:14:22 +0200 Subject: [PATCH 08/21] move client --- .../components/generate_prompts/Dockerfile | 2 +- src/fondant/data_io.py | 5 +---- src/fondant/executor.py | 4 ++++ 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile b/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile index 0d32cfa2a..f8c0e0571 100644 --- a/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile +++ b/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile @@ -12,7 +12,7 @@ RUN pip3 install --no-cache-dir -r requirements.txt # Install Fondant # This is split from other requirements to leverage caching -ARG FONDANT_VERSION=d63eed74e3e9bdd9495427c50e7bd1d6c8b51ff9 +ARG FONDANT_VERSION=51cadfeee0ced6b2402ada83030fdc98d06099b9 RUN pip3 install fondant[aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION} # Set the working directory to the component folder diff --git a/src/fondant/data_io.py b/src/fondant/data_io.py index cb71130c2..b96ef90ac 100644 --- a/src/fondant/data_io.py +++ b/src/fondant/data_io.py @@ -3,16 +3,13 @@ import typing as t import dask.dataframe as dd -from dask.distributed import Client, LocalCluster, performance_report +from dask.distributed import performance_report from fondant.component_spec import ComponentSpec, ComponentSubset from fondant.manifest import Manifest logger = logging.getLogger(__name__) -cluster = LocalCluster() -client = Client(cluster) - class DataIO: def __init__(self, *, manifest: Manifest, component_spec: ComponentSpec) -> None: diff --git a/src/fondant/executor.py b/src/fondant/executor.py index aca8a235d..cd1e743c2 100644 --- a/src/fondant/executor.py +++ b/src/fondant/executor.py @@ -14,6 +14,7 @@ import dask.dataframe as dd import pandas as pd +from dask.distributed import Client, LocalCluster from fondant.component import ( Component, @@ -217,6 +218,9 @@ def _execute_component( Returns: A `dd.DataFrame` instance with initial data. """ + cluster = LocalCluster() + Client(cluster) + return component.load() From 44d54939764bce2f6ab62b01dfa985fea73ecbbd Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Wed, 19 Jul 2023 10:24:27 +0200 Subject: [PATCH 09/21] move dataframe visualization --- .../components/generate_prompts/Dockerfile | 2 +- .../components/generate_prompts/requirements.txt | 1 + src/fondant/data_io.py | 8 ++++---- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile b/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile index f8c0e0571..ebe30912d 100644 --- a/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile +++ b/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile @@ -12,7 +12,7 @@ RUN pip3 install --no-cache-dir -r requirements.txt # Install Fondant # This is split from other requirements to leverage caching -ARG FONDANT_VERSION=51cadfeee0ced6b2402ada83030fdc98d06099b9 +ARG FONDANT_VERSION=678b3e1ef93e181cb182b39e018d9951446cc9b3 RUN pip3 install fondant[aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION} # Set the working directory to the component folder diff --git a/examples/pipelines/controlnet-interior-design/components/generate_prompts/requirements.txt b/examples/pipelines/controlnet-interior-design/components/generate_prompts/requirements.txt index e69de29bb..470b510ef 100644 --- a/examples/pipelines/controlnet-interior-design/components/generate_prompts/requirements.txt +++ b/examples/pipelines/controlnet-interior-design/components/generate_prompts/requirements.txt @@ -0,0 +1 @@ +bokeh \ No newline at end of file diff --git a/src/fondant/data_io.py b/src/fondant/data_io.py index b96ef90ac..afa13c914 100644 --- a/src/fondant/data_io.py +++ b/src/fondant/data_io.py @@ -138,10 +138,6 @@ def partition_written_dataframe( return dataframe def write_dataframe(self, dataframe: dd.DataFrame) -> None: - logging.info(f"Saving execution graph to {self.execution_graph_path}") - - dataframe.visualize(self.execution_graph_path) - write_tasks = [] dataframe.index = dataframe.index.rename("id").astype("string") @@ -155,6 +151,10 @@ def write_dataframe(self, dataframe: dd.DataFrame) -> None: ) write_tasks.append(write_index_task) + logging.info(f"Saving execution graph to {self.execution_graph_path}") + + dataframe.visualize(self.execution_graph_path) + for subset_name, subset_spec in self.component_spec.produces.items(): subset_df = self._extract_subset_dataframe( dataframe, From 592f3aeb92e8dcf394d8b12926f20122154f96da Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Wed, 19 Jul 2023 11:14:00 +0200 Subject: [PATCH 10/21] add bokeh to dependencies --- .../components/generate_prompts/Dockerfile | 2 +- .../components/generate_prompts/requirements.txt | 1 - pyproject.toml | 1 + 3 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile b/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile index ebe30912d..a8e756441 100644 --- a/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile +++ b/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile @@ -12,7 +12,7 @@ RUN pip3 install --no-cache-dir -r requirements.txt # Install Fondant # This is split from other requirements to leverage caching -ARG FONDANT_VERSION=678b3e1ef93e181cb182b39e018d9951446cc9b3 +ARG FONDANT_VERSION=18f7950d3923bdde8e9a92f12fa40ac094aa30ae RUN pip3 install fondant[aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION} # Set the working directory to the component folder diff --git a/examples/pipelines/controlnet-interior-design/components/generate_prompts/requirements.txt b/examples/pipelines/controlnet-interior-design/components/generate_prompts/requirements.txt index 470b510ef..e69de29bb 100644 --- a/examples/pipelines/controlnet-interior-design/components/generate_prompts/requirements.txt +++ b/examples/pipelines/controlnet-interior-design/components/generate_prompts/requirements.txt @@ -1 +0,0 @@ -bokeh \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 16510af4b..31a548d37 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,6 +44,7 @@ classifiers = [ python = ">= 3.8" dask = {extras = ["dataframe", "distributed"], version = ">= 2023.4.1"} graphviz = ">= 0.20.1" +bokeh = ">= 3.2.1" importlib-resources = { version = ">= 1.3", python = "<3.9" } jsonschema = ">= 4.18" pyarrow = ">= 11.0.0" From 5c56ddf9beeeb57da293699a70dcdc7c77f8068e Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Wed, 19 Jul 2023 11:19:04 +0200 Subject: [PATCH 11/21] add bokeh to dependencies --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 31a548d37..13e01997e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,7 +44,7 @@ classifiers = [ python = ">= 3.8" dask = {extras = ["dataframe", "distributed"], version = ">= 2023.4.1"} graphviz = ">= 0.20.1" -bokeh = ">= 3.2.1" +bokeh = ">= 3.2.0" importlib-resources = { version = ">= 1.3", python = "<3.9" } jsonschema = ">= 4.18" pyarrow = ">= 11.0.0" From 8e575c7051196aa6c042e0aa29c30243e1450d42 Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Wed, 19 Jul 2023 11:20:14 +0200 Subject: [PATCH 12/21] add bokeh to dependencies --- .../components/generate_prompts/Dockerfile | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile b/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile index a8e756441..e918d2cf4 100644 --- a/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile +++ b/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile @@ -12,7 +12,7 @@ RUN pip3 install --no-cache-dir -r requirements.txt # Install Fondant # This is split from other requirements to leverage caching -ARG FONDANT_VERSION=18f7950d3923bdde8e9a92f12fa40ac094aa30ae +ARG FONDANT_VERSION=5c56ddf9beeeb57da293699a70dcdc7c77f8068e RUN pip3 install fondant[aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION} # Set the working directory to the component folder diff --git a/pyproject.toml b/pyproject.toml index 13e01997e..8d80b242b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,7 +44,7 @@ classifiers = [ python = ">= 3.8" dask = {extras = ["dataframe", "distributed"], version = ">= 2023.4.1"} graphviz = ">= 0.20.1" -bokeh = ">= 3.2.0" +bokeh = ">= 3.1.1" importlib-resources = { version = ">= 1.3", python = "<3.9" } jsonschema = ">= 4.18" pyarrow = ">= 11.0.0" From 0d1e76bc46bda97c8dd7fdf640ade6c302f6663e Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Wed, 19 Jul 2023 11:31:17 +0200 Subject: [PATCH 13/21] move client to execute --- .../components/generate_prompts/Dockerfile | 2 +- src/fondant/executor.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile b/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile index e918d2cf4..445841b4a 100644 --- a/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile +++ b/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile @@ -12,7 +12,7 @@ RUN pip3 install --no-cache-dir -r requirements.txt # Install Fondant # This is split from other requirements to leverage caching -ARG FONDANT_VERSION=5c56ddf9beeeb57da293699a70dcdc7c77f8068e +ARG FONDANT_VERSION=8e575c7051196aa6c042e0aa29c30243e1450d42 RUN pip3 install fondant[aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION} # Set the working directory to the component folder diff --git a/src/fondant/executor.py b/src/fondant/executor.py index cd1e743c2..60ecb4e2c 100644 --- a/src/fondant/executor.py +++ b/src/fondant/executor.py @@ -176,6 +176,9 @@ def execute(self, component_cls: t.Type[Component]) -> None: Args: component_cls: The class of the component to execute """ + cluster = LocalCluster() + Client(cluster, silence_logs="info") + input_manifest = self._load_or_create_manifest() component = component_cls(self.spec, **self.user_arguments) @@ -218,9 +221,6 @@ def _execute_component( Returns: A `dd.DataFrame` instance with initial data. """ - cluster = LocalCluster() - Client(cluster) - return component.load() From 43132f5c6fefac81122425940ce917919a581c4a Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Wed, 19 Jul 2023 11:34:54 +0200 Subject: [PATCH 14/21] silence logs --- src/fondant/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fondant/executor.py b/src/fondant/executor.py index 60ecb4e2c..5f4519b83 100644 --- a/src/fondant/executor.py +++ b/src/fondant/executor.py @@ -177,7 +177,7 @@ def execute(self, component_cls: t.Type[Component]) -> None: component_cls: The class of the component to execute """ cluster = LocalCluster() - Client(cluster, silence_logs="info") + Client(cluster, silence_logs=logging.INFO) input_manifest = self._load_or_create_manifest() From 7f38c31886bce6b1dfe53768e60683cf77ddcdd0 Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Wed, 19 Jul 2023 11:36:59 +0200 Subject: [PATCH 15/21] remove log supression --- src/fondant/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fondant/executor.py b/src/fondant/executor.py index 5f4519b83..2509ec80b 100644 --- a/src/fondant/executor.py +++ b/src/fondant/executor.py @@ -177,7 +177,7 @@ def execute(self, component_cls: t.Type[Component]) -> None: component_cls: The class of the component to execute """ cluster = LocalCluster() - Client(cluster, silence_logs=logging.INFO) + Client(cluster) input_manifest = self._load_or_create_manifest() From d764ad42887b628883fe916a8b0605a13c6b08e0 Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Wed, 19 Jul 2023 11:46:44 +0200 Subject: [PATCH 16/21] add log silencing to cluster --- src/fondant/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fondant/executor.py b/src/fondant/executor.py index 2509ec80b..41e4719f7 100644 --- a/src/fondant/executor.py +++ b/src/fondant/executor.py @@ -176,7 +176,7 @@ def execute(self, component_cls: t.Type[Component]) -> None: Args: component_cls: The class of the component to execute """ - cluster = LocalCluster() + cluster = LocalCluster(silence_logs=logging.INFO) Client(cluster) input_manifest = self._load_or_create_manifest() From d50cc6bb1d3cb8beb8e6b3cb6309bf13638e06d5 Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Wed, 19 Jul 2023 12:02:13 +0200 Subject: [PATCH 17/21] add worker log filter --- .../components/generate_prompts/Dockerfile | 2 +- src/fondant/executor.py | 11 ++++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile b/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile index 445841b4a..faf76baa7 100644 --- a/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile +++ b/examples/pipelines/controlnet-interior-design/components/generate_prompts/Dockerfile @@ -12,7 +12,7 @@ RUN pip3 install --no-cache-dir -r requirements.txt # Install Fondant # This is split from other requirements to leverage caching -ARG FONDANT_VERSION=8e575c7051196aa6c042e0aa29c30243e1450d42 +ARG FONDANT_VERSION=d764ad42887b628883fe916a8b0605a13c6b08e0 RUN pip3 install fondant[aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION} # Set the working directory to the component folder diff --git a/src/fondant/executor.py b/src/fondant/executor.py index 41e4719f7..55b703106 100644 --- a/src/fondant/executor.py +++ b/src/fondant/executor.py @@ -30,6 +30,15 @@ logger = logging.getLogger(__name__) +class WorkerLogsFilter(logging.Filter): + def filter(self, record): + return not record.name.startswith("distributed") + + +filter_worker_logs = WorkerLogsFilter() +logger.addFilter(filter_worker_logs) + + class Executor(t.Generic[Component]): """An executor executes a Component.""" @@ -176,7 +185,7 @@ def execute(self, component_cls: t.Type[Component]) -> None: Args: component_cls: The class of the component to execute """ - cluster = LocalCluster(silence_logs=logging.INFO) + cluster = LocalCluster() Client(cluster) input_manifest = self._load_or_create_manifest() From 9e1eb3134c4663abd6c77f253251b970b21a12a5 Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Wed, 19 Jul 2023 12:09:50 +0200 Subject: [PATCH 18/21] supress logs --- src/fondant/executor.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/src/fondant/executor.py b/src/fondant/executor.py index 55b703106..17f8ac2d0 100644 --- a/src/fondant/executor.py +++ b/src/fondant/executor.py @@ -28,15 +28,8 @@ from fondant.manifest import Manifest logger = logging.getLogger(__name__) - - -class WorkerLogsFilter(logging.Filter): - def filter(self, record): - return not record.name.startswith("distributed") - - -filter_worker_logs = WorkerLogsFilter() -logger.addFilter(filter_worker_logs) +dask_logger = logging.getLogger("distributed.process") +dask_logger.setLevel(logging.WARNING) class Executor(t.Generic[Component]): From f344ea848dd8e4129a9e55eab6f45344d5b8dc43 Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Wed, 19 Jul 2023 12:16:14 +0200 Subject: [PATCH 19/21] supress logs --- src/fondant/executor.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/fondant/executor.py b/src/fondant/executor.py index 17f8ac2d0..16f0a3b99 100644 --- a/src/fondant/executor.py +++ b/src/fondant/executor.py @@ -28,8 +28,20 @@ from fondant.manifest import Manifest logger = logging.getLogger(__name__) -dask_logger = logging.getLogger("distributed.process") -dask_logger.setLevel(logging.WARNING) + +DASK_LOGS_TO_SUPRESS = [ + "distributed.process", + "distributed.scheduler", + "distributed.nanny", + "distributed.worker", + "distributed.core", + "distributed.comm.tcp", + "distributed.batched", +] + +for logger_name in DASK_LOGS_TO_SUPRESS: + logger = logging.getLogger(logger_name) + logger.setLevel(logging.WARNING) class Executor(t.Generic[Component]): From 08b74ccc6d65d0880f7d0debf83e905f0bff3029 Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Wed, 19 Jul 2023 15:23:47 +0200 Subject: [PATCH 20/21] add output_partition_size as an argument and revert diagnosis --- src/fondant/component.py | 14 +++--- src/fondant/component_spec.py | 9 ++++ src/fondant/data_io.py | 46 +++++++++--------- src/fondant/exceptions.py | 4 ++ src/fondant/executor.py | 14 +++--- src/fondant/pipeline.py | 38 ++++++++++++++- .../example_1/docker-compose.yml | 6 ++- .../example_2/docker-compose.yml | 4 ++ .../component_specs/kubeflow_component.yaml | 9 +++- tests/test_compiler.py | 10 +++- tests/test_component.py | 17 +++++-- tests/test_pipeline.py | 47 ++++++++++++++++++- 12 files changed, 168 insertions(+), 50 deletions(-) diff --git a/src/fondant/component.py b/src/fondant/component.py index 724f4ea8b..f5c743e3e 100644 --- a/src/fondant/component.py +++ b/src/fondant/component.py @@ -41,6 +41,13 @@ def transform(self, dataframe: dd.DataFrame) -> dd.DataFrame: raise NotImplementedError +class DaskWriteComponent(BaseComponent): + """Component that accepts a Dask DataFrame and writes its contents.""" + + def write(self, dataframe: dd.DataFrame) -> None: + raise NotImplementedError + + class PandasTransformComponent(BaseComponent): """Component that transforms the incoming dataset partition per partition as a pandas DataFrame. @@ -57,12 +64,5 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: raise NotImplementedError -class DaskWriteComponent(BaseComponent): - """Component that accepts a Dask DataFrame and writes its contents.""" - - def write(self, dataframe: dd.DataFrame) -> None: - raise NotImplementedError - - Component = t.TypeVar("Component", bound=BaseComponent) """Component type which can represents any of the subclasses of BaseComponent""" diff --git a/src/fondant/component_spec.py b/src/fondant/component_spec.py index ad204c6e6..09e26be08 100644 --- a/src/fondant/component_spec.py +++ b/src/fondant/component_spec.py @@ -256,6 +256,13 @@ def from_fondant_component_spec( "type": "JsonObject", "default": "None", }, + { + "name": "output_partition_size", + "description": "The size of the output partition size, defaults" + " to 250MB. Set to None to disable partitioning the output", + "type": "String", + "default": "250MB", + }, *( { "name": arg.name, @@ -285,6 +292,8 @@ def from_fondant_component_spec( {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, + "--output_partition_size", + {"inputValue": "output_partition_size"}, *cls._dump_args(fondant_component.args.values()), "--output_manifest_path", {"outputPath": "output_manifest_path"}, diff --git a/src/fondant/data_io.py b/src/fondant/data_io.py index afa13c914..8494dd55e 100644 --- a/src/fondant/data_io.py +++ b/src/fondant/data_io.py @@ -3,7 +3,7 @@ import typing as t import dask.dataframe as dd -from dask.distributed import performance_report +from dask.diagnostics import ProgressBar from fondant.component_spec import ComponentSpec, ComponentSubset from fondant.manifest import Manifest @@ -19,15 +19,11 @@ def __init__(self, *, manifest: Manifest, component_spec: ComponentSpec) -> None f"{self.manifest.base_path}/" f"{self.manifest.component_id}" ) - self.performance_report_path = ( - f"{self.diagnostics_path}/{self.manifest.run_id}_dask_report.html" - ) - self.execution_graph_path = ( - f"{self.diagnostics_path}/{self.manifest.run_id}_execution_graph.png" - ) - class DaskDataLoader(DataIO): + def __init__(self, *, manifest: Manifest, component_spec: ComponentSpec): + super().__init__(manifest=manifest, component_spec=component_spec) + @staticmethod def partition_loaded_dataframe(dataframe: dd.DataFrame) -> dd.DataFrame: """ @@ -121,20 +117,27 @@ def load_dataframe(self) -> dd.DataFrame: class DaskDataWriter(DataIO): - @staticmethod - def partition_written_dataframe( - dataframe: dd.DataFrame, - partition_size="250MB", - ) -> dd.DataFrame: + def __init__( + self, + *, + manifest: Manifest, + component_spec: ComponentSpec, + output_partition_size: t.Optional[str] = None, + ): + super().__init__(manifest=manifest, component_spec=component_spec) + self.output_partition_size = output_partition_size + + def partition_written_dataframe(self, dataframe: dd.DataFrame) -> dd.DataFrame: """ Function that partitions the written dataframe to smaller partitions based on a given partition size. """ - dataframe = dataframe.repartition(partition_size=partition_size) - logger.info( - f"repartitioning the written data such that the memory per partition is" - f" {partition_size}", - ) + if self.output_partition_size: + dataframe = dataframe.repartition(partition_size=self.output_partition_size) + logger.info( + f"repartitioning the written data such that the memory per partition is" + f" {self.output_partition_size}", + ) return dataframe def write_dataframe(self, dataframe: dd.DataFrame) -> None: @@ -151,10 +154,6 @@ def write_dataframe(self, dataframe: dd.DataFrame) -> None: ) write_tasks.append(write_index_task) - logging.info(f"Saving execution graph to {self.execution_graph_path}") - - dataframe.visualize(self.execution_graph_path) - for subset_name, subset_spec in self.component_spec.produces.items(): subset_df = self._extract_subset_dataframe( dataframe, @@ -168,9 +167,8 @@ def write_dataframe(self, dataframe: dd.DataFrame) -> None: ) write_tasks.append(write_subset_task) - with performance_report(filename=self.performance_report_path): + with ProgressBar(): logging.info("Writing data...") - logging.info(f"Saving performance report to {self.performance_report_path}") dd.compute(*write_tasks) @staticmethod diff --git a/src/fondant/exceptions.py b/src/fondant/exceptions.py index 8ce549ed7..019ab0644 100644 --- a/src/fondant/exceptions.py +++ b/src/fondant/exceptions.py @@ -15,6 +15,10 @@ class InvalidComponentSpec(ValidationError, FondantException): """Thrown when a component spec cannot be validated against the schema.""" +class InvalidComponentOpDefinition(ValidationError, FondantException): + """Thrown when a componentOp is invalid.""" + + class InvalidPipelineDefinition(ValidationError, FondantException): """Thrown when a pipeline definition is invalid.""" diff --git a/src/fondant/executor.py b/src/fondant/executor.py index 16f0a3b99..446199cd2 100644 --- a/src/fondant/executor.py +++ b/src/fondant/executor.py @@ -14,7 +14,6 @@ import dask.dataframe as dd import pandas as pd -from dask.distributed import Client, LocalCluster from fondant.component import ( Component, @@ -54,7 +53,7 @@ def __init__( input_manifest_path: t.Union[str, Path], output_manifest_path: t.Union[str, Path], metadata: t.Dict[str, t.Any], - user_arguments: t.Dict[str, Argument], + user_arguments: t.Dict[str, t.Any], ) -> None: self.spec = spec self.input_manifest_path = input_manifest_path @@ -181,7 +180,13 @@ def _execute_component( def _write_data(self, dataframe: dd.DataFrame, *, manifest: Manifest): """Create a data writer given a manifest and writes out the index and subsets.""" - data_writer = DaskDataWriter(manifest=manifest, component_spec=self.spec) + output_partition_size = self.user_arguments["output_partition_size"] + data_writer = DaskDataWriter( + manifest=manifest, + component_spec=self.spec, + output_partition_size=output_partition_size, + ) + data_writer.write_dataframe(dataframe) def execute(self, component_cls: t.Type[Component]) -> None: @@ -190,9 +195,6 @@ def execute(self, component_cls: t.Type[Component]) -> None: Args: component_cls: The class of the component to execute """ - cluster = LocalCluster() - Client(cluster) - input_manifest = self._load_or_create_manifest() component = component_cls(self.spec, **self.user_arguments) diff --git a/src/fondant/pipeline.py b/src/fondant/pipeline.py index 9693af05f..6fa7e1a6b 100644 --- a/src/fondant/pipeline.py +++ b/src/fondant/pipeline.py @@ -12,7 +12,7 @@ from importlib_resources import files # type: ignore from fondant.component_spec import ComponentSpec -from fondant.exceptions import InvalidPipelineDefinition +from fondant.exceptions import InvalidComponentOpDefinition, InvalidPipelineDefinition from fondant.import_utils import is_kfp_available from fondant.manifest import Manifest @@ -32,6 +32,8 @@ class ComponentOp: Arguments: component_dir: The path to the component directory. arguments: A dictionary containing the argument name and value for the operation. + output_partition_size: the size of the output written dataset. Defaults to 250MB, + set to None to disable partitioning the output, number_of_gpus: The number of gpus to assign to the operation node_pool_name: The name of the node pool to which the operation will be assigned. p_volumes: Collection of persistent volumes in a Kubernetes cluster. Keys are mount paths, @@ -57,13 +59,15 @@ def __init__( component_dir: t.Union[str, Path], *, arguments: t.Optional[t.Dict[str, t.Any]] = None, + output_partition_size: t.Optional[str] = "250MB", number_of_gpus: t.Optional[int] = None, node_pool_name: t.Optional[str] = None, p_volumes: t.Optional[t.Dict[str, k8s_client.V1Volume]] = None, ephemeral_storage_size: t.Optional[str] = None, ) -> None: self.component_dir = Path(component_dir) - self.arguments = arguments or {} + self.output_partitioning_size = output_partition_size + self.arguments = self._set_arguments(arguments) self.component_spec = ComponentSpec.from_file( self.component_dir / self.COMPONENT_SPEC_NAME, @@ -75,6 +79,36 @@ def __init__( self.p_volumes = p_volumes self.ephemeral_storage_size = ephemeral_storage_size + def _set_arguments( + self, + arguments: t.Optional[t.Dict[str, t.Any]], + ) -> t.Dict[str, t.Any]: + """Set component arguments based on provided arguments and relevant ComponentOp + parameters. + """ + + def _validate_file_size(file_size): + # Define the regular expression pattern to match file size notations: KB, MB, GB or TB + pattern = r"^\d+(?:\.\d+)?(?:KB|MB|GB|TB)$" + + # Use the re.match() function to check if the provided file_size matches the pattern + return bool(re.match(pattern, file_size, re.I)) + + arguments = arguments or {} + + if self.output_partitioning_size is not None: + if not _validate_file_size(file_size=str(self.output_partitioning_size)): + msg = ( + f"Invalid partition size defined `{self.output_partitioning_size}`," + " partition size must be a string followed by a file size notation" + " e.g. ('250MB')" + ) + raise InvalidComponentOpDefinition(msg) + + arguments["output_partition_size"] = self.output_partitioning_size + + return arguments + @property def dockerfile_path(self) -> t.Optional[Path]: path = self.component_dir / "Dockerfile" diff --git a/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml b/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml index 46ed2260b..f1b5364db 100644 --- a/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml +++ b/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml @@ -9,6 +9,8 @@ services: - /foo/bar/first_component/manifest.json - --storage_args - a dummy string arg + - --output_partition_size + - 250MB - --component_spec - '{"name": "First component", "description": "This is an example component", "image": "example_component:latest", "produces": {"images": {"fields": {"data": @@ -48,6 +50,8 @@ services: - a dummy string arg - --some_list - '[1, 2, 3]' + - --output_partition_size + - 10MB - --component_spec - '{"name": "Third component", "description": "This is an example component", "image": "example_component:latest", "consumes": {"images": {"fields": {"data": @@ -63,4 +67,4 @@ services: second_component: condition: service_completed_successfully volumes: [] -version: '3.8' \ No newline at end of file +version: '3.8' diff --git a/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml b/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml index 44bfc162e..4e02ca245 100644 --- a/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml +++ b/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml @@ -9,6 +9,8 @@ services: - /foo/bar/first_component/manifest.json - --storage_args - a dummy string arg + - --output_partition_size + - 250MB - --component_spec - '{"name": "First component", "description": "This is an example component", "image": "example_component:latest", "produces": {"images": {"fields": {"data": @@ -26,6 +28,8 @@ services: - '0' - --padding - '0' + - --output_partition_size + - 250MB - --component_spec - '{"name": "Image cropping", "description": "Component that removes single-colored borders around images and crops them appropriately", "image": "ghcr.io/ml6team/image_cropping:dev", diff --git a/tests/example_specs/component_specs/kubeflow_component.yaml b/tests/example_specs/component_specs/kubeflow_component.yaml index 5f2e33690..57ccdb565 100644 --- a/tests/example_specs/component_specs/kubeflow_component.yaml +++ b/tests/example_specs/component_specs/kubeflow_component.yaml @@ -11,6 +11,11 @@ inputs: description: The component specification as a dictionary type: JsonObject default: None +- name: output_partition_size + description: The size of the output partition size, defaults to 250MB. Set to + None to disable partitioning the output + type: String + default: 250MB - name: storage_args description: Storage arguments type: String @@ -30,7 +35,9 @@ implementation: - inputValue: metadata - --component_spec - inputValue: component_spec + - --output_partition_size + - inputValue: output_partition_size - --storage_args - inputValue: storage_args - --output_manifest_path - - outputPath: output_manifest_path \ No newline at end of file + - outputPath: output_manifest_path diff --git a/tests/test_compiler.py b/tests/test_compiler.py index 7ad7382c9..7dd98e795 100644 --- a/tests/test_compiler.py +++ b/tests/test_compiler.py @@ -6,9 +6,9 @@ from fondant.compiler import DockerCompiler from fondant.pipeline import ComponentOp, Pipeline -COMPONENTS_PATH = Path("./tests/example_pipelines/valid_pipeline") +COMPONENTS_PATH = Path("./example_pipelines/valid_pipeline") -VALID_DOCKER_PIPELINE = Path("./tests/example_pipelines/compiled_pipeline/") +VALID_DOCKER_PIPELINE = Path("./example_pipelines/compiled_pipeline/") TEST_PIPELINES = [ ( @@ -17,10 +17,12 @@ ComponentOp( Path(COMPONENTS_PATH / "example_1" / "first_component"), arguments={"storage_args": "a dummy string arg"}, + output_partition_size="250MB", ), ComponentOp( Path(COMPONENTS_PATH / "example_1" / "second_component"), arguments={"storage_args": "a dummy string arg"}, + output_partition_size=None, ), ComponentOp( Path(COMPONENTS_PATH / "example_1" / "fourth_component"), @@ -28,6 +30,7 @@ "storage_args": "a dummy string arg", "some_list": [1, 2, 3], }, + output_partition_size="10MB", ), ], ), @@ -94,6 +97,9 @@ def test_docker_compiler(setup_pipeline, tmp_path_factory): with open(output_path) as src, open( VALID_DOCKER_PIPELINE / example_dir / "docker-compose.yml", ) as truth: + d = yaml.safe_load(src) + with open(f"{example_dir}.yml", "w") as outfile: + yaml.dump(d, outfile, default_flow_style=False) assert yaml.safe_load(src) == yaml.safe_load(truth) diff --git a/tests/test_component.py b/tests/test_component.py index 5fa862d8f..21a8a8eb3 100644 --- a/tests/test_component.py +++ b/tests/test_component.py @@ -121,12 +121,13 @@ def _process_dataset(self, manifest: Manifest) -> t.Union[None, dd.DataFrame]: "override_default_arg": "bar", "override_default_none_arg": 3.14, "override_default_arg_with_none": None, + "output_partition_size": "250MB", } @pytest.mark.usefixtures("_patched_data_writing") def test_load_component(): - # Mock CLI argumentsload + # Mock CLI arguments load sys.argv = [ "", "--metadata", @@ -142,13 +143,15 @@ def test_load_component(): ] class MyLoadComponent(DaskLoadComponent): - def __init__(self, *args, flag, value): + def __init__(self, *args, flag, value, output_partition_size): self.flag = flag self.value = value + self.output_partition_size = output_partition_size def load(self): assert self.flag == "success" assert self.value == 1 + assert self.output_partition_size == "250MB" data = { "id": [0, 1], "captions_data": ["hello world", "this is another caption"], @@ -182,9 +185,10 @@ def test_dask_transform_component(): ] class MyDaskComponent(DaskTransformComponent): - def __init__(self, *args, flag, value): + def __init__(self, *args, flag, value, output_partition_size): self.flag = flag self.value = value + self.output_partition_size = output_partition_size def transform(self, dataframe): assert self.flag == "success" @@ -223,9 +227,10 @@ def test_pandas_transform_component(): ] class MyPandasComponent(PandasTransformComponent): - def __init__(self, *args, flag, value): + def __init__(self, *args, flag, value, output_partition_size): assert flag == "success" assert value == 1 + assert output_partition_size == "250MB" def transform(self, dataframe): assert isinstance(dataframe, pd.DataFrame) @@ -336,13 +341,15 @@ def test_write_component(): ] class MyWriteComponent(DaskWriteComponent): - def __init__(self, *args, flag, value): + def __init__(self, *args, flag, value, output_partition_size): self.flag = flag self.value = value + self.output_partition_size = output_partition_size def write(self, dataframe): assert self.flag == "success" assert self.value == 1 + assert self.output_partition_size == "250MB" assert isinstance(dataframe, dd.DataFrame) executor = DaskWriteExecutor.from_args() diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index cbd19e843..ed58d4627 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -3,10 +3,10 @@ import pytest import yaml -from fondant.exceptions import InvalidPipelineDefinition +from fondant.exceptions import InvalidComponentOpDefinition, InvalidPipelineDefinition from fondant.pipeline import ComponentOp, Pipeline -valid_pipeline_path = Path(__file__).parent / "example_pipelines/valid_pipeline" +valid_pipeline_path = Path("example_pipelines/valid_pipeline") invalid_pipeline_path = Path(__file__).parent / "example_pipelines/invalid_pipeline" @@ -23,6 +23,49 @@ def default_pipeline_args(): } +@pytest.mark.parametrize( + "valid_pipeline_example", + [ + ( + "example_1", + ["first_component", "second_component", "third_component"], + ), + ], +) +def test_component_op( + valid_pipeline_example, +): + component_args = {"storage_args": "a dummy string arg"} + example_dir, component_names = valid_pipeline_example + components_path = Path(valid_pipeline_path / example_dir) + + ComponentOp( + Path(components_path / component_names[0]), + arguments=component_args, + output_partition_size=None, + ) + + ComponentOp( + Path(components_path / component_names[0]), + arguments=component_args, + output_partition_size="250MB", + ) + + with pytest.raises(InvalidComponentOpDefinition): + ComponentOp( + Path(components_path / component_names[0]), + arguments=component_args, + output_partition_size=10, + ) + + with pytest.raises(InvalidComponentOpDefinition): + ComponentOp( + Path(components_path / component_names[0]), + arguments=component_args, + output_partition_size="250 MB", + ) + + @pytest.mark.parametrize( "valid_pipeline_example", [ From df19ad3db5f28d076b85d369aa12eadac0393e05 Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Wed, 19 Jul 2023 16:04:52 +0200 Subject: [PATCH 21/21] remove output_partition from user arguments --- .../controlnet-interior-design/pipeline.py | 23 ++++++++----------- src/fondant/executor.py | 8 ++++--- tests/test_compiler.py | 7 ++---- tests/test_component.py | 17 +++++--------- tests/test_pipeline.py | 2 +- 5 files changed, 24 insertions(+), 33 deletions(-) diff --git a/examples/pipelines/controlnet-interior-design/pipeline.py b/examples/pipelines/controlnet-interior-design/pipeline.py index 73080694b..9b39a7379 100644 --- a/examples/pipelines/controlnet-interior-design/pipeline.py +++ b/examples/pipelines/controlnet-interior-design/pipeline.py @@ -16,7 +16,7 @@ # Define component ops generate_prompts_op = ComponentOp( component_dir="components/generate_prompts", - arguments={"n_rows_to_load": 2}, + arguments={"n_rows_to_load": None}, ) laion_retrieval_op = ComponentOp.from_registry( name="prompt_based_laion_retrieval", @@ -24,11 +24,11 @@ "num_images": 2, "aesthetic_score": 9, "aesthetic_weight": 0.5, - "url": "http://34.91.40.120:1230/knn-service?key=AIzaSyBDPCHlqagh284jz6zXStrgLeTrNrZrDww", + "url": None, }, ) -download_images_op = ComponentOp( - component_dir="components/download_images", +download_images_op = ComponentOp.from_registry( + name="download_images", arguments={ "timeout": 1, "retries": 0, @@ -69,14 +69,11 @@ }, ) -pipeline = Pipeline( - pipeline_name=pipeline_name, - base_path="/home/philippe/Scripts/express/local_artifact/controlnet_2", -) +pipeline = Pipeline(pipeline_name=pipeline_name, base_path=PipelineConfigs.BASE_PATH) pipeline.add_op(generate_prompts_op) -# pipeline.add_op(laion_retrieval_op, dependencies=generate_prompts_op) -# pipeline.add_op(download_images_op, dependencies=laion_retrieval_op) -# pipeline.add_op(caption_images_op, dependencies=download_images_op) -# pipeline.add_op(segment_images_op, dependencies=caption_images_op) -# pipeline.add_op(write_to_hub_controlnet, dependencies=segment_images_op) +pipeline.add_op(laion_retrieval_op, dependencies=generate_prompts_op) +pipeline.add_op(download_images_op, dependencies=laion_retrieval_op) +pipeline.add_op(caption_images_op, dependencies=download_images_op) +pipeline.add_op(segment_images_op, dependencies=caption_images_op) +pipeline.add_op(write_to_hub_controlnet, dependencies=segment_images_op) diff --git a/src/fondant/executor.py b/src/fondant/executor.py index 446199cd2..1aa9a1b32 100644 --- a/src/fondant/executor.py +++ b/src/fondant/executor.py @@ -54,12 +54,14 @@ def __init__( output_manifest_path: t.Union[str, Path], metadata: t.Dict[str, t.Any], user_arguments: t.Dict[str, t.Any], + output_partition_size: t.Optional[str] = "250MB", ) -> None: self.spec = spec self.input_manifest_path = input_manifest_path self.output_manifest_path = output_manifest_path self.metadata = metadata self.user_arguments = user_arguments + self.output_partition_size = output_partition_size @classmethod def from_file( @@ -99,7 +101,7 @@ def from_spec(cls, component_spec: ComponentSpec) -> "Executor": input_manifest_path = args_dict.pop("input_manifest_path") output_manifest_path = args_dict.pop("output_manifest_path") metadata = args_dict.pop("metadata") - + output_partition_size = args_dict.pop("output_partition_size") metadata = json.loads(metadata) if metadata else {} return cls( @@ -108,6 +110,7 @@ def from_spec(cls, component_spec: ComponentSpec) -> "Executor": output_manifest_path=output_manifest_path, metadata=metadata, user_arguments=args_dict, + output_partition_size=output_partition_size, ) @classmethod @@ -180,11 +183,10 @@ def _execute_component( def _write_data(self, dataframe: dd.DataFrame, *, manifest: Manifest): """Create a data writer given a manifest and writes out the index and subsets.""" - output_partition_size = self.user_arguments["output_partition_size"] data_writer = DaskDataWriter( manifest=manifest, component_spec=self.spec, - output_partition_size=output_partition_size, + output_partition_size=self.output_partition_size, ) data_writer.write_dataframe(dataframe) diff --git a/tests/test_compiler.py b/tests/test_compiler.py index 7dd98e795..d768805be 100644 --- a/tests/test_compiler.py +++ b/tests/test_compiler.py @@ -6,9 +6,9 @@ from fondant.compiler import DockerCompiler from fondant.pipeline import ComponentOp, Pipeline -COMPONENTS_PATH = Path("./example_pipelines/valid_pipeline") +COMPONENTS_PATH = Path("./tests/example_pipelines/valid_pipeline") -VALID_DOCKER_PIPELINE = Path("./example_pipelines/compiled_pipeline/") +VALID_DOCKER_PIPELINE = Path("./tests/example_pipelines/compiled_pipeline/") TEST_PIPELINES = [ ( @@ -97,9 +97,6 @@ def test_docker_compiler(setup_pipeline, tmp_path_factory): with open(output_path) as src, open( VALID_DOCKER_PIPELINE / example_dir / "docker-compose.yml", ) as truth: - d = yaml.safe_load(src) - with open(f"{example_dir}.yml", "w") as outfile: - yaml.dump(d, outfile, default_flow_style=False) assert yaml.safe_load(src) == yaml.safe_load(truth) diff --git a/tests/test_component.py b/tests/test_component.py index 21a8a8eb3..707ea88c0 100644 --- a/tests/test_component.py +++ b/tests/test_component.py @@ -121,7 +121,6 @@ def _process_dataset(self, manifest: Manifest) -> t.Union[None, dd.DataFrame]: "override_default_arg": "bar", "override_default_none_arg": 3.14, "override_default_arg_with_none": None, - "output_partition_size": "250MB", } @@ -143,15 +142,13 @@ def test_load_component(): ] class MyLoadComponent(DaskLoadComponent): - def __init__(self, *args, flag, value, output_partition_size): + def __init__(self, *args, flag, value): self.flag = flag self.value = value - self.output_partition_size = output_partition_size def load(self): assert self.flag == "success" assert self.value == 1 - assert self.output_partition_size == "250MB" data = { "id": [0, 1], "captions_data": ["hello world", "this is another caption"], @@ -178,6 +175,8 @@ def test_dask_transform_component(): "success", "--value", "1", + "--output_partition_size", + "250MB", "--output_manifest_path", str(components_path / "output_manifest.json"), "--component_spec", @@ -185,10 +184,9 @@ def test_dask_transform_component(): ] class MyDaskComponent(DaskTransformComponent): - def __init__(self, *args, flag, value, output_partition_size): + def __init__(self, *args, flag, value): self.flag = flag self.value = value - self.output_partition_size = output_partition_size def transform(self, dataframe): assert self.flag == "success" @@ -227,10 +225,9 @@ def test_pandas_transform_component(): ] class MyPandasComponent(PandasTransformComponent): - def __init__(self, *args, flag, value, output_partition_size): + def __init__(self, *args, flag, value): assert flag == "success" assert value == 1 - assert output_partition_size == "250MB" def transform(self, dataframe): assert isinstance(dataframe, pd.DataFrame) @@ -341,15 +338,13 @@ def test_write_component(): ] class MyWriteComponent(DaskWriteComponent): - def __init__(self, *args, flag, value, output_partition_size): + def __init__(self, *args, flag, value): self.flag = flag self.value = value - self.output_partition_size = output_partition_size def write(self, dataframe): assert self.flag == "success" assert self.value == 1 - assert self.output_partition_size == "250MB" assert isinstance(dataframe, dd.DataFrame) executor = DaskWriteExecutor.from_args() diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index ed58d4627..6b58c7abf 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -6,7 +6,7 @@ from fondant.exceptions import InvalidComponentOpDefinition, InvalidPipelineDefinition from fondant.pipeline import ComponentOp, Pipeline -valid_pipeline_path = Path("example_pipelines/valid_pipeline") +valid_pipeline_path = Path(__file__).parent / "example_pipelines/valid_pipeline" invalid_pipeline_path = Path(__file__).parent / "example_pipelines/invalid_pipeline"