diff --git a/docs/pipeline.md b/docs/pipeline.md index 4450b0329..8e12cb6bf 100644 --- a/docs/pipeline.md +++ b/docs/pipeline.md @@ -8,7 +8,7 @@ To build a pipeline, you need to define a set of component operations called `Co The component specifications include the location of the Docker image in a registry. -The runtime configuration consists of the component's arguments and the definition of node pools and resources. For example, if a component requires GPU for model inference, you can specify the necessary GPU resources in the runtime configuration. +The runtime configuration consists of the component's arguments and the definition of node pools, resources and custom partitioning specification. For example, if a component requires GPU for model inference, you can specify the necessary GPU resources in the runtime configuration. Here is an example of how to build a pipeline using Fondant: ```python @@ -46,7 +46,72 @@ Next, we define two operations: `load_from_hub_op`, which is a based from a reus !!! note "IMPORTANT" Currently Fondant supports linear DAGs with single dependencies. Support for non-linear DAGs will be available in future releases. +## Setting Custom partitioning parameters +When working with Fondant, each component deals with datasets, and Dask is used internally +to handle datasets larger than the available memory. To achieve this, the data is divided +into smaller chunks called "partitions" that can be processed in parallel. Ensuring a sufficient number of partitions +enables parallel processing, where multiple workers process different partitions simultaneously, +and smaller partitions ensure they fit into memory. + +### How Fondant handles partitions + +**1) Repartitioning the Loaded DataFrame:** This step is optional and comes into play if the number +of partitions is fewer than the available workers on the data processing instance. +By repartitioning, the maximum number of workers can be efficiently utilized, leading to faster +and parallel processing. + +**2) Repartitioning the Written DataFrame:** The written dataframe is also repartitioned into +smaller sizes (default 250MB) to enable the next component to load these partitions into memory. + + +### Customizing Partitioning + +By default, Fondant automatically handles the partitioning, but you can disable this and create your +own custom partitioning logic if you have specific requirements. + +Here's an example of disabling the automatic partitioning: + +```python + +caption_images_op = ComponentOp( + component_dir="components/captioning_component", + arguments={ + "model_id": "Salesforce/blip-image-captioning-base", + "batch_size": 2, + "max_new_tokens": 50, + }, + input_partition_rows='disable', + output_partition_size='disable', +) +``` + +The code snippet above disables automatic partitions for both the loaded and written dataframes, +allowing you to define your own partitioning logic inside the components. + +Moreover, you have the flexibility to set your own custom partitioning parameters to override the default settings: + +```python + +caption_images_op = ComponentOp( + component_dir="components/captioning_component", + arguments={ + "model_id": "Salesforce/blip-image-captioning-base", + "batch_size": 2, + "max_new_tokens": 50, + }, + input_partition_rows=100, + output_partition_size="10MB", +) +``` + +In the example above, each partition of the loaded dataframe will contain approximately one hundred rows, +and the size of the output partitions will be around 10MB. This capability is useful in scenarios +where processing one row significantly increases the number of rows in the dataset +(resulting in dataset explosion) or causes a substantial increase in row size (e.g., fetching images from URLs). + +By setting a lower value for input partition rows, you can mitigate issues where the processed data +grows larger than the available memory before being written to disk. ## Compiling a pipeline diff --git a/examples/pipelines/controlnet-interior-design/pipeline.py b/examples/pipelines/controlnet-interior-design/pipeline.py index 9f9828a90..f44f07d24 100644 --- a/examples/pipelines/controlnet-interior-design/pipeline.py +++ b/examples/pipelines/controlnet-interior-design/pipeline.py @@ -17,6 +17,7 @@ generate_prompts_op = ComponentOp( component_dir="components/generate_prompts", arguments={"n_rows_to_load": None}, + output_partition_size="disable", ) laion_retrieval_op = ComponentOp.from_registry( name="prompt_based_laion_retrieval", diff --git a/src/fondant/component.py b/src/fondant/component.py index 724f4ea8b..f5c743e3e 100644 --- a/src/fondant/component.py +++ b/src/fondant/component.py @@ -41,6 +41,13 @@ def transform(self, dataframe: dd.DataFrame) -> dd.DataFrame: raise NotImplementedError +class DaskWriteComponent(BaseComponent): + """Component that accepts a Dask DataFrame and writes its contents.""" + + def write(self, dataframe: dd.DataFrame) -> None: + raise NotImplementedError + + class PandasTransformComponent(BaseComponent): """Component that transforms the incoming dataset partition per partition as a pandas DataFrame. @@ -57,12 +64,5 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: raise NotImplementedError -class DaskWriteComponent(BaseComponent): - """Component that accepts a Dask DataFrame and writes its contents.""" - - def write(self, dataframe: dd.DataFrame) -> None: - raise NotImplementedError - - Component = t.TypeVar("Component", bound=BaseComponent) """Component type which can represents any of the subclasses of BaseComponent""" diff --git a/src/fondant/component_spec.py b/src/fondant/component_spec.py index ad204c6e6..581342025 100644 --- a/src/fondant/component_spec.py +++ b/src/fondant/component_spec.py @@ -256,6 +256,20 @@ def from_fondant_component_spec( "type": "JsonObject", "default": "None", }, + { + "name": "input_partition_rows", + "description": "The number of rows to load per partition. Set to override the" + " automatic partitioning", + "type": "String", + "default": "None", + }, + { + "name": "output_partition_size", + "description": "The size of the output partition size, defaults" + " to 250MB. Set to `disable` to disable the automatic partitioning", + "type": "String", + "default": "None", + }, *( { "name": arg.name, @@ -285,6 +299,10 @@ def from_fondant_component_spec( {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, + "--input_partition_rows", + {"inputValue": "input_partition_rows"}, + "--output_partition_size", + {"inputValue": "output_partition_size"}, *cls._dump_args(fondant_component.args.values()), "--output_manifest_path", {"outputPath": "output_manifest_path"}, diff --git a/src/fondant/data_io.py b/src/fondant/data_io.py index b3c519273..675422301 100644 --- a/src/fondant/data_io.py +++ b/src/fondant/data_io.py @@ -1,4 +1,5 @@ import logging +import os import typing as t import dask.dataframe as dd @@ -17,6 +18,62 @@ def __init__(self, *, manifest: Manifest, component_spec: ComponentSpec) -> None class DaskDataLoader(DataIO): + def __init__( + self, + *, + manifest: Manifest, + component_spec: ComponentSpec, + input_partition_rows: t.Optional[t.Union[int, str]] = None, + ): + super().__init__(manifest=manifest, component_spec=component_spec) + self.input_partition_rows = input_partition_rows + + def partition_loaded_dataframe(self, dataframe: dd.DataFrame) -> dd.DataFrame: + """ + Function that partitions the loaded dataframe depending on its partitions and the available + workers + Returns: + The partitioned dataframe. + """ + if self.input_partition_rows != "disable": + if isinstance(self.input_partition_rows, int): + # Only load the index column to trigger a faster compute of the rows + total_rows = len(dataframe.index) + # +1 to handle any remainder rows + n_partitions = (total_rows // self.input_partition_rows) + 1 + dataframe = dataframe.repartition(npartitions=n_partitions) + logger.info( + f"Total number of rows is {total_rows}.\n" + f"Repartitioning the data from {dataframe.partitions} partitions to have" + f" {n_partitions} such that the number of partitions per row is approximately" + f"{self.input_partition_rows}", + ) + + elif self.input_partition_rows is None: + n_partitions = dataframe.npartitions + n_workers = os.cpu_count() + if n_partitions < n_workers: # type: ignore + logger.info( + f"The number of partitions of the input dataframe is {n_partitions}. The " + f"available number of workers is {n_workers}.", + ) + dataframe = dataframe.repartition(npartitions=n_workers) + logger.info( + f"Repartitioning the data to {n_workers} partitions before processing" + f" to maximize worker usage", + ) + else: + msg = ( + f"{self.input_partition_rows} is not a valid argument. Choose either " + f"the number of partitions or set to 'disable' to disable automated " + f"partitioning" + ) + raise ValueError( + msg, + ) + + return dataframe + def _load_subset(self, subset_name: str, fields: t.List[str]) -> dd.DataFrame: """ Function that loads a subset from the manifest as a Dask dataframe. @@ -80,15 +137,63 @@ def load_dataframe(self) -> dd.DataFrame: how="left", ) + dataframe = self.partition_loaded_dataframe(dataframe) + logging.info(f"Columns of dataframe: {list(dataframe.columns)}") return dataframe class DaskDataWriter(DataIO): + def __init__( + self, + *, + manifest: Manifest, + component_spec: ComponentSpec, + output_partition_size: t.Optional[t.Union[str]] = None, + ): + super().__init__(manifest=manifest, component_spec=component_spec) + + self.output_partition_size = output_partition_size + + def partition_written_dataframe(self, dataframe: dd.DataFrame) -> dd.DataFrame: + """ + Function that partitions the written dataframe to smaller partitions based on a given + partition size. + """ + if self.output_partition_size != "disable": + if isinstance(self.output_partition_size, str): + dataframe = dataframe.repartition( + partition_size=self.output_partition_size, + ) + logger.info( + f"Repartitioning the written data such that the size per partition is approx." + f" {self.output_partition_size}", + ) + + elif self.output_partition_size is None: + dataframe = dataframe.repartition(partition_size="250MB") + logger.info( + f"Repartitioning the written data such that the size per partition is approx." + f" {self.output_partition_size}. (Automatic repartitioning)", + ) + else: + msg = ( + f"{self.output_partition_size} is not a valid argument. Choose either the" + f" number of size of the partition (e.g. '250Mb' or set to 'disable' to" + f" disable automated partitioning" + ) + raise ValueError( + msg, + ) + + return dataframe + def write_dataframe(self, dataframe: dd.DataFrame) -> None: write_tasks = [] + dataframe = self.partition_written_dataframe(dataframe) + dataframe.index = dataframe.index.rename("id").astype("string") # Turn index into an empty dataframe so we can write it diff --git a/src/fondant/executor.py b/src/fondant/executor.py index fed4be5c3..a2afd78c9 100644 --- a/src/fondant/executor.py +++ b/src/fondant/executor.py @@ -25,6 +25,7 @@ from fondant.component_spec import Argument, ComponentSpec, kubeflow2python_type from fondant.data_io import DaskDataLoader, DaskDataWriter from fondant.manifest import Manifest +from fondant.schema import validate_partition_number, validate_partition_size logger = logging.getLogger(__name__) @@ -39,32 +40,25 @@ def __init__( input_manifest_path: t.Union[str, Path], output_manifest_path: t.Union[str, Path], metadata: t.Dict[str, t.Any], - user_arguments: t.Dict[str, Argument], + user_arguments: t.Dict[str, t.Any], + input_partition_rows: t.Optional[t.Union[str, int]] = None, + output_partition_size: t.Optional[str] = None, ) -> None: self.spec = spec self.input_manifest_path = input_manifest_path self.output_manifest_path = output_manifest_path self.metadata = metadata self.user_arguments = user_arguments - - @classmethod - def from_file( - cls, - path: t.Union[str, Path] = "../fondant_component.yaml", - ) -> "Executor": - """Create an executor from a component spec file. - - Args: - path: Path to the component spec file - """ - component_spec = ComponentSpec.from_file(path) - return cls.from_spec(component_spec) + self.input_partition_rows = input_partition_rows + self.output_partition_size = output_partition_size @classmethod def from_args(cls) -> "Executor": """Create an executor from a passed argument containing the specification as a dict.""" parser = argparse.ArgumentParser() parser.add_argument("--component_spec", type=json.loads) + parser.add_argument("--input_partition_rows", type=validate_partition_number) + parser.add_argument("--output_partition_size", type=validate_partition_size) args, _ = parser.parse_known_args() if "component_spec" not in args: @@ -72,20 +66,37 @@ def from_args(cls) -> "Executor": raise ValueError(msg) component_spec = ComponentSpec(args.component_spec) + input_partition_rows = args.input_partition_rows + output_partition_size = args.output_partition_size - return cls.from_spec(component_spec) + return cls.from_spec( + component_spec, + input_partition_rows, + output_partition_size, + ) @classmethod - def from_spec(cls, component_spec: ComponentSpec) -> "Executor": + def from_spec( + cls, + component_spec: ComponentSpec, + input_partition_rows: t.Optional[t.Union[str, int]], + output_partition_size: t.Optional[str], + ) -> "Executor": """Create an executor from a component spec.""" args_dict = vars(cls._add_and_parse_args(component_spec)) if "component_spec" in args_dict: args_dict.pop("component_spec") + + if "input_partition_rows" in args_dict: + args_dict.pop("input_partition_rows") + + if "output_partition_size" in args_dict: + args_dict.pop("output_partition_size") + input_manifest_path = args_dict.pop("input_manifest_path") output_manifest_path = args_dict.pop("output_manifest_path") metadata = args_dict.pop("metadata") - metadata = json.loads(metadata) if metadata else {} return cls( @@ -94,6 +105,8 @@ def from_spec(cls, component_spec: ComponentSpec) -> "Executor": output_manifest_path=output_manifest_path, metadata=metadata, user_arguments=args_dict, + input_partition_rows=input_partition_rows, + output_partition_size=output_partition_size, ) @classmethod @@ -166,7 +179,12 @@ def _execute_component( def _write_data(self, dataframe: dd.DataFrame, *, manifest: Manifest): """Create a data writer given a manifest and writes out the index and subsets.""" - data_writer = DaskDataWriter(manifest=manifest, component_spec=self.spec) + data_writer = DaskDataWriter( + manifest=manifest, + component_spec=self.spec, + output_partition_size=self.output_partition_size, + ) + data_writer.write_dataframe(dataframe) def execute(self, component_cls: t.Type[Component]) -> None: @@ -249,7 +267,11 @@ def _execute_component( Returns: A `dd.DataFrame` instance with updated data based on the applied data transformations. """ - data_loader = DaskDataLoader(manifest=manifest, component_spec=self.spec) + data_loader = DaskDataLoader( + manifest=manifest, + component_spec=self.spec, + input_partition_rows=self.input_partition_rows, + ) dataframe = data_loader.load_dataframe() return component.transform(dataframe) @@ -310,7 +332,11 @@ def _execute_component( Returns: A `dd.DataFrame` instance with updated data based on the applied data transformations. """ - data_loader = DaskDataLoader(manifest=manifest, component_spec=self.spec) + data_loader = DaskDataLoader( + manifest=manifest, + component_spec=self.spec, + input_partition_rows=self.input_partition_rows, + ) dataframe = data_loader.load_dataframe() # Create meta dataframe with expected format @@ -366,7 +392,11 @@ def _execute_component( *, manifest: Manifest, ) -> None: - data_loader = DaskDataLoader(manifest=manifest, component_spec=self.spec) + data_loader = DaskDataLoader( + manifest=manifest, + component_spec=self.spec, + input_partition_rows=self.input_partition_rows, + ) dataframe = data_loader.load_dataframe() component.write(dataframe) diff --git a/src/fondant/pipeline.py b/src/fondant/pipeline.py index 9693af05f..ec5450010 100644 --- a/src/fondant/pipeline.py +++ b/src/fondant/pipeline.py @@ -15,6 +15,7 @@ from fondant.exceptions import InvalidPipelineDefinition from fondant.import_utils import is_kfp_available from fondant.manifest import Manifest +from fondant.schema import validate_partition_number, validate_partition_size if is_kfp_available(): import kfp @@ -32,6 +33,10 @@ class ComponentOp: Arguments: component_dir: The path to the component directory. arguments: A dictionary containing the argument name and value for the operation. + input_partition_rows: The number of rows to load per partition. Set to override the + automatic partitioning + output_partition_size: the size of the output written dataset. Defaults to 250MB, + set to "disable" to disable automatic repartitioning of the output, number_of_gpus: The number of gpus to assign to the operation node_pool_name: The name of the node pool to which the operation will be assigned. p_volumes: Collection of persistent volumes in a Kubernetes cluster. Keys are mount paths, @@ -57,13 +62,17 @@ def __init__( component_dir: t.Union[str, Path], *, arguments: t.Optional[t.Dict[str, t.Any]] = None, + input_partition_rows: t.Optional[t.Union[str, int]] = None, + output_partition_size: t.Optional[str] = None, number_of_gpus: t.Optional[int] = None, node_pool_name: t.Optional[str] = None, p_volumes: t.Optional[t.Dict[str, k8s_client.V1Volume]] = None, ephemeral_storage_size: t.Optional[str] = None, ) -> None: self.component_dir = Path(component_dir) - self.arguments = arguments or {} + self.input_partition_rows = input_partition_rows + self.output_partitioning_size = output_partition_size + self.arguments = self._set_arguments(arguments) self.component_spec = ComponentSpec.from_file( self.component_dir / self.COMPONENT_SPEC_NAME, @@ -75,6 +84,23 @@ def __init__( self.p_volumes = p_volumes self.ephemeral_storage_size = ephemeral_storage_size + def _set_arguments( + self, + arguments: t.Optional[t.Dict[str, t.Any]], + ) -> t.Dict[str, t.Any]: + """Set component arguments based on provided arguments and relevant ComponentOp + parameters. + """ + arguments = arguments or {} + + input_partition_rows = validate_partition_number(self.input_partition_rows) + output_partition_size = validate_partition_size(self.output_partitioning_size) + + arguments["input_partition_rows"] = str(input_partition_rows) + arguments["output_partition_size"] = str(output_partition_size) + + return arguments + @property def dockerfile_path(self) -> t.Optional[Path]: path = self.component_dir / "Dockerfile" @@ -86,6 +112,8 @@ def from_registry( name: str, *, arguments: t.Optional[t.Dict[str, t.Any]] = None, + input_partition_rows: t.Optional[t.Union[int, str]] = None, + output_partition_size: t.Optional[str] = None, number_of_gpus: t.Optional[int] = None, node_pool_name: t.Optional[str] = None, p_volumes: t.Optional[t.Dict[str, k8s_client.V1Volume]] = None, @@ -96,6 +124,10 @@ def from_registry( Args: name: Name of the component to load arguments: A dictionary containing the argument name and value for the operation. + input_partition_rows: The number of rows to load per partition. Set to override the + automatic partitioning + output_partition_size: the size of the output written dataset. Defaults to 250MB, + set to "disable" to disable automatic repartitioning of the output, number_of_gpus: The number of gpus to assign to the operation node_pool_name: The name of the node pool to which the operation will be assigned. p_volumes: Collection of persistent volumes in a Kubernetes cluster. Keys are mount @@ -113,6 +145,8 @@ def from_registry( return ComponentOp( components_dir, arguments=arguments, + input_partition_rows=input_partition_rows, + output_partition_size=output_partition_size, number_of_gpus=number_of_gpus, node_pool_name=node_pool_name, p_volumes=p_volumes, diff --git a/src/fondant/schema.py b/src/fondant/schema.py index ab4f116d0..eb295b50f 100644 --- a/src/fondant/schema.py +++ b/src/fondant/schema.py @@ -2,6 +2,7 @@ and pipelines. """ +import re import typing as t import pyarrow as pa @@ -154,3 +155,28 @@ class Field(t.NamedTuple): name: str type: Type + + +def validate_partition_number(arg_value): + if arg_value in ["disable", None, "None"]: + return arg_value + try: + return int(arg_value) + except ValueError: + msg = f"Invalid format for '{arg_value}'. The value must be an integer or set to 'disable'" + raise InvalidTypeSchema(msg) + + +def validate_partition_size(arg_value): + if arg_value in ["disable", None, "None"]: + return arg_value + + file_size_pattern = r"^\d+(?:\.\d+)?(?:KB|MB|GB|TB)$" + if not bool(re.match(file_size_pattern, arg_value, re.I)): + msg = ( + f"Invalid partition size defined `{arg_value}`, partition size must be a string f" + f"ollowed by a file size notation e.g. ('250MB') or set to 'disable' to disable" + f" the automatic partitioning" + ) + raise InvalidTypeSchema(msg) + return arg_value diff --git a/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml b/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml index 14c1a23aa..cae66f6fb 100644 --- a/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml +++ b/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml @@ -11,6 +11,10 @@ services: - /foo/bar/first_component/manifest.json - --storage_args - a dummy string arg + - --input_partition_rows + - disable + - --output_partition_size + - disable - --component_spec - '{"name": "First component", "description": "This is an example component", "image": "example_component:latest", "produces": {"images": {"fields": {"data": @@ -29,6 +33,10 @@ services: - /foo/bar/second_component/manifest.json - --storage_args - a dummy string arg + - --input_partition_rows + - '10' + - --output_partition_size + - 30MB - --component_spec - '{"name": "Second component", "description": "This is an example component", "image": "example_component:latest", "consumes": {"images": {"fields": {"data": @@ -54,6 +62,10 @@ services: - a dummy string arg - --some_list - '[1, 2, 3]' + - --input_partition_rows + - None + - --output_partition_size + - None - --component_spec - '{"name": "Third component", "description": "This is an example component", "image": "example_component:latest", "consumes": {"images": {"fields": {"data": diff --git a/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml b/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml index 441bc13ee..53be24617 100644 --- a/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml +++ b/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml @@ -11,6 +11,10 @@ services: - /foo/bar/first_component/manifest.json - --storage_args - a dummy string arg + - --input_partition_rows + - None + - --output_partition_size + - None - --component_spec - '{"name": "First component", "description": "This is an example component", "image": "example_component:latest", "produces": {"images": {"fields": {"data": @@ -28,6 +32,10 @@ services: - '0' - --padding - '0' + - --input_partition_rows + - None + - --output_partition_size + - None - --component_spec - '{"name": "Image cropping", "description": "Component that removes single-colored borders around images and crops them appropriately", "image": "ghcr.io/ml6team/image_cropping:dev", @@ -46,4 +54,4 @@ services: condition: service_completed_successfully image: ghcr.io/ml6team/image_cropping:dev volumes: [] -version: '3.8' \ No newline at end of file +version: '3.8' diff --git a/tests/example_specs/component_specs/kubeflow_component.yaml b/tests/example_specs/component_specs/kubeflow_component.yaml index 5f2e33690..aa74a9de0 100644 --- a/tests/example_specs/component_specs/kubeflow_component.yaml +++ b/tests/example_specs/component_specs/kubeflow_component.yaml @@ -11,6 +11,16 @@ inputs: description: The component specification as a dictionary type: JsonObject default: None +- name: input_partition_rows + description: The number of rows to load per partition. Set to override the automatic + partitioning + type: String + default: None +- name: output_partition_size + description: The size of the output partition size, defaults to 250MB. Set to + `disable` to disable the automatic partitioning + type: String + default: None - name: storage_args description: Storage arguments type: String @@ -30,7 +40,11 @@ implementation: - inputValue: metadata - --component_spec - inputValue: component_spec + - --input_partition_rows + - inputValue: input_partition_rows + - --output_partition_size + - inputValue: output_partition_size - --storage_args - inputValue: storage_args - --output_manifest_path - - outputPath: output_manifest_path \ No newline at end of file + - outputPath: output_manifest_path diff --git a/tests/test_compiler.py b/tests/test_compiler.py index 32a80412b..4bdfba732 100644 --- a/tests/test_compiler.py +++ b/tests/test_compiler.py @@ -17,10 +17,14 @@ ComponentOp( Path(COMPONENTS_PATH / "example_1" / "first_component"), arguments={"storage_args": "a dummy string arg"}, + input_partition_rows="disable", + output_partition_size="disable", ), ComponentOp( Path(COMPONENTS_PATH / "example_1" / "second_component"), arguments={"storage_args": "a dummy string arg"}, + input_partition_rows="10", + output_partition_size="30MB", ), ComponentOp( Path(COMPONENTS_PATH / "example_1" / "fourth_component"), diff --git a/tests/test_component.py b/tests/test_component.py index e7ed11d08..d051ccb4f 100644 --- a/tests/test_component.py +++ b/tests/test_component.py @@ -86,6 +86,10 @@ def test_component_arguments(): str(components_path / "arguments/output_manifest.json"), "--component_spec", yaml_file_to_json_string(components_path / "arguments/component.yaml"), + "--input_partition_rows", + "100", + "--output_partition_size", + "100MB", "--override_default_arg", "bar", "--override_default_none_arg", @@ -104,6 +108,9 @@ def _process_dataset(self, manifest: Manifest) -> t.Union[None, dd.DataFrame]: pass executor = MyExecutor.from_args() + expected_partition_row_arg = 100 + assert executor.input_partition_rows == expected_partition_row_arg + assert executor.output_partition_size == "100MB" assert executor.user_arguments == { "string_default_arg": "foo", "integer_default_arg": 0, @@ -126,7 +133,7 @@ def _process_dataset(self, manifest: Manifest) -> t.Union[None, dd.DataFrame]: @pytest.mark.usefixtures("_patched_data_writing") def test_load_component(): - # Mock CLI argumentsload + # Mock CLI arguments load sys.argv = [ "", "--metadata", @@ -156,6 +163,8 @@ def load(self): return dd.DataFrame.from_dict(data, npartitions=N_PARTITIONS) executor = DaskLoadExecutor.from_args() + assert executor.input_partition_rows is None + assert executor.output_partition_size is None load = patch_method_class(MyLoadComponent.load) with mock.patch.object(MyLoadComponent, "load", load): executor.execute(MyLoadComponent) @@ -175,6 +184,10 @@ def test_dask_transform_component(): "success", "--value", "1", + "--input_partition_rows", + "disable", + "--output_partition_size", + "disable", "--output_manifest_path", str(components_path / "output_manifest.json"), "--component_spec", @@ -193,6 +206,8 @@ def transform(self, dataframe): return dataframe executor = DaskTransformExecutor.from_args() + assert executor.input_partition_rows == "disable" + assert executor.output_partition_size == "disable" transform = patch_method_class(MyDaskComponent.transform) with mock.patch.object( MyDaskComponent, diff --git a/tests/test_data_io.py b/tests/test_data_io.py index 7d42b2e1d..ff2920474 100644 --- a/tests/test_data_io.py +++ b/tests/test_data_io.py @@ -1,3 +1,4 @@ +import os from pathlib import Path import dask.dataframe as dd @@ -59,18 +60,56 @@ def test_load_dataframe(manifest, component_spec): assert dataframe.index.name == "id" +def test_load_dataframe_default(manifest, component_spec): + """Test merging of subsets in a dataframe based on a component_spec.""" + dl = DaskDataLoader(manifest=manifest, component_spec=component_spec) + dataframe = dl.load_dataframe() + number_workers = os.cpu_count() + # repartitioning in dask is an approximation + assert dataframe.npartitions in list(range(number_workers - 1, number_workers + 2)) + + +def test_load_dataframe_rows(manifest, component_spec): + """Test merging of subsets in a dataframe based on a component_spec.""" + dl = DaskDataLoader( + manifest=manifest, + component_spec=component_spec, + input_partition_rows=100, + ) + dataframe = dl.load_dataframe() + expected_partitions = 2 # dataset with 151 rows + assert dataframe.npartitions == expected_partitions + + +def test_load_dataframe_disable(manifest, component_spec): + """Test merging of subsets in a dataframe based on a component_spec.""" + dl = DaskDataLoader( + manifest=manifest, + component_spec=component_spec, + input_partition_rows="disable", + ) + dataframe = dl.load_dataframe() + expected_partitions = 3 # original partitions + assert dataframe.npartitions == expected_partitions + + def test_write_index(tmp_path_factory, dataframe, manifest, component_spec): """Test writing out the index.""" with tmp_path_factory.mktemp("temp") as fn: # override the base path of the manifest with the temp dir manifest.update_metadata("base_path", str(fn)) - data_writer = DaskDataWriter(manifest=manifest, component_spec=component_spec) + data_writer = DaskDataWriter( + manifest=manifest, + component_spec=component_spec, + output_partition_size="1TB", + ) # write out index to temp dir data_writer.write_dataframe(dataframe) # read written data and assert dataframe = dd.read_parquet(fn / "index") assert len(dataframe) == NUMBER_OF_TEST_ROWS assert dataframe.index.name == "id" + assert dataframe.npartitions == 1 def test_write_subsets(tmp_path_factory, dataframe, manifest, component_spec): @@ -126,7 +165,12 @@ def test_write_divisions( with tmp_path_factory.mktemp("temp") as fn: manifest.update_metadata("base_path", str(fn)) - data_writer = DaskDataWriter(manifest=manifest, component_spec=component_spec) + data_writer = DaskDataWriter( + manifest=manifest, + component_spec=component_spec, + output_partition_size="disable", + ) + data_writer.write_dataframe(dataframe) for target in ["properties", "types", "index"]: diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index cbd19e843..7086d78ed 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -3,7 +3,7 @@ import pytest import yaml -from fondant.exceptions import InvalidPipelineDefinition +from fondant.exceptions import InvalidPipelineDefinition, InvalidTypeSchema from fondant.pipeline import ComponentOp, Pipeline valid_pipeline_path = Path(__file__).parent / "example_pipelines/valid_pipeline" @@ -23,6 +23,49 @@ def default_pipeline_args(): } +@pytest.mark.parametrize( + "valid_pipeline_example", + [ + ( + "example_1", + ["first_component", "second_component", "third_component"], + ), + ], +) +def test_component_op( + valid_pipeline_example, +): + component_args = {"storage_args": "a dummy string arg"} + example_dir, component_names = valid_pipeline_example + components_path = Path(valid_pipeline_path / example_dir) + + ComponentOp( + Path(components_path / component_names[0]), + arguments=component_args, + output_partition_size=None, + ) + + ComponentOp( + Path(components_path / component_names[0]), + arguments=component_args, + output_partition_size="250MB", + ) + + with pytest.raises(InvalidTypeSchema): + ComponentOp( + Path(components_path / component_names[0]), + arguments=component_args, + output_partition_size="10", + ) + + with pytest.raises(InvalidTypeSchema): + ComponentOp( + Path(components_path / component_names[0]), + arguments=component_args, + output_partition_size="250 MB", + ) + + @pytest.mark.parametrize( "valid_pipeline_example", [