Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce repartitioning #309

Merged
merged 37 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
2683b93
optimize image download component
PhilippeMoussalli Jul 11, 2023
78a60c2
Merge branch 'main' into improve-imagedownload
PhilippeMoussalli Jul 11, 2023
1b7521c
add dask diagnostics to fondant
PhilippeMoussalli Jul 17, 2023
1c53aa1
add repartitioning strategy to fondant
PhilippeMoussalli Jul 18, 2023
897f5f8
remove changes from component
PhilippeMoussalli Jul 18, 2023
0c48233
Merge branch 'main' into improve-imagedownload
PhilippeMoussalli Jul 18, 2023
2edcd91
change save path diagnostics
PhilippeMoussalli Jul 19, 2023
d63eed7
add client to dataIO
PhilippeMoussalli Jul 19, 2023
51cadfe
remove progressbar
PhilippeMoussalli Jul 19, 2023
678b3e1
move client
PhilippeMoussalli Jul 19, 2023
44d5493
move dataframe visualization
PhilippeMoussalli Jul 19, 2023
592f3ae
add bokeh to dependencies
PhilippeMoussalli Jul 19, 2023
5c56ddf
add bokeh to dependencies
PhilippeMoussalli Jul 19, 2023
8e575c7
add bokeh to dependencies
PhilippeMoussalli Jul 19, 2023
0d1e76b
move client to execute
PhilippeMoussalli Jul 19, 2023
43132f5
silence logs
PhilippeMoussalli Jul 19, 2023
7f38c31
remove log supression
PhilippeMoussalli Jul 19, 2023
d764ad4
add log silencing to cluster
PhilippeMoussalli Jul 19, 2023
d50cc6b
add worker log filter
PhilippeMoussalli Jul 19, 2023
9e1eb31
supress logs
PhilippeMoussalli Jul 19, 2023
f344ea8
supress logs
PhilippeMoussalli Jul 19, 2023
08b74cc
add output_partition_size as an argument and revert diagnosis
PhilippeMoussalli Jul 19, 2023
df19ad3
remove output_partition from user arguments
PhilippeMoussalli Jul 19, 2023
09ef925
enable more control over partitioning
PhilippeMoussalli Jul 19, 2023
6b1e63b
fix test
PhilippeMoussalli Jul 19, 2023
57a8af4
Merge branch 'main' into introduce-repartioning
PhilippeMoussalli Jul 19, 2023
8715369
correct load partitioning
PhilippeMoussalli Jul 19, 2023
c15e24c
address PR feedback
PhilippeMoussalli Jul 24, 2023
9ff1e68
Adjust argument passing
PhilippeMoussalli Jul 24, 2023
4c8d482
debug
PhilippeMoussalli Jul 24, 2023
5bf3a50
Further debugging
PhilippeMoussalli Jul 24, 2023
18e1f79
Further debugging
PhilippeMoussalli Jul 24, 2023
b2bdecc
Revert debug and improve logging
PhilippeMoussalli Jul 24, 2023
951a786
Merge branch 'main' into introduce-repartioning
PhilippeMoussalli Jul 24, 2023
3394fcf
fix indentation
PhilippeMoussalli Jul 24, 2023
02561ab
fix tests
PhilippeMoussalli Jul 24, 2023
99746dc
add docs
PhilippeMoussalli Jul 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 66 additions & 1 deletion docs/pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ To build a pipeline, you need to define a set of component operations called `Co

The component specifications include the location of the Docker image in a registry.

The runtime configuration consists of the component's arguments and the definition of node pools and resources. For example, if a component requires GPU for model inference, you can specify the necessary GPU resources in the runtime configuration.
The runtime configuration consists of the component's arguments and the definition of node pools, resources and custom partitioning specification. For example, if a component requires GPU for model inference, you can specify the necessary GPU resources in the runtime configuration.

Here is an example of how to build a pipeline using Fondant:
```python
Expand Down Expand Up @@ -46,7 +46,72 @@ Next, we define two operations: `load_from_hub_op`, which is a based from a reus
!!! note "IMPORTANT"
Currently Fondant supports linear DAGs with single dependencies. Support for non-linear DAGs will be available in future releases.

## Setting Custom partitioning parameters

When working with Fondant, each component deals with datasets, and Dask is used internally
to handle datasets larger than the available memory. To achieve this, the data is divided
into smaller chunks called "partitions" that can be processed in parallel. Ensuring a sufficient number of partitions
enables parallel processing, where multiple workers process different partitions simultaneously,
and smaller partitions ensure they fit into memory.

### How Fondant handles partitions

**1) Repartitioning the Loaded DataFrame:** This step is optional and comes into play if the number
of partitions is fewer than the available workers on the data processing instance.
By repartitioning, the maximum number of workers can be efficiently utilized, leading to faster
and parallel processing.

**2) Repartitioning the Written DataFrame:** The written dataframe is also repartitioned into
smaller sizes (default 250MB) to enable the next component to load these partitions into memory.


### Customizing Partitioning

By default, Fondant automatically handles the partitioning, but you can disable this and create your
own custom partitioning logic if you have specific requirements.

Here's an example of disabling the automatic partitioning:

```python

caption_images_op = ComponentOp(
component_dir="components/captioning_component",
arguments={
"model_id": "Salesforce/blip-image-captioning-base",
"batch_size": 2,
"max_new_tokens": 50,
},
input_partition_rows='disable',
output_partition_size='disable',
)
```

The code snippet above disables automatic partitions for both the loaded and written dataframes,
allowing you to define your own partitioning logic inside the components.

Moreover, you have the flexibility to set your own custom partitioning parameters to override the default settings:

```python

caption_images_op = ComponentOp(
component_dir="components/captioning_component",
arguments={
"model_id": "Salesforce/blip-image-captioning-base",
"batch_size": 2,
"max_new_tokens": 50,
},
input_partition_rows=100,
output_partition_size="10MB",
Comment on lines +103 to +104
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason the input partitions are specified in terms of rows and the output partitions in terms of size?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the output partition size ensures that the written partitions are small and can be easily loaded by the next component.

The input is defined by rows to allow you to easily iterate on it if you run into out of memory issues, for example if you run into OOM issues when retrieving 100 images from URLs. It's more intuitive to set that number to lower (10 rows for example) then to change the size of the input partitions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya I do feel using num rows is way more intuitive than size

)
```

In the example above, each partition of the loaded dataframe will contain approximately one hundred rows,
and the size of the output partitions will be around 10MB. This capability is useful in scenarios
where processing one row significantly increases the number of rows in the dataset
(resulting in dataset explosion) or causes a substantial increase in row size (e.g., fetching images from URLs).

By setting a lower value for input partition rows, you can mitigate issues where the processed data
grows larger than the available memory before being written to disk.

## Compiling a pipeline

Expand Down
1 change: 1 addition & 0 deletions examples/pipelines/controlnet-interior-design/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
generate_prompts_op = ComponentOp(
component_dir="components/generate_prompts",
arguments={"n_rows_to_load": None},
output_partition_size="disable",
)
laion_retrieval_op = ComponentOp.from_registry(
name="prompt_based_laion_retrieval",
Expand Down
14 changes: 7 additions & 7 deletions src/fondant/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ def transform(self, dataframe: dd.DataFrame) -> dd.DataFrame:
raise NotImplementedError


class DaskWriteComponent(BaseComponent):
"""Component that accepts a Dask DataFrame and writes its contents."""

def write(self, dataframe: dd.DataFrame) -> None:
raise NotImplementedError


Comment on lines +44 to +50
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just moved in the file? I think both orders can be logical (Dask -> Pandas) or (Read -> Transform -> Write)

class PandasTransformComponent(BaseComponent):
"""Component that transforms the incoming dataset partition per partition as a pandas
DataFrame.
Expand All @@ -57,12 +64,5 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
raise NotImplementedError


class DaskWriteComponent(BaseComponent):
"""Component that accepts a Dask DataFrame and writes its contents."""

def write(self, dataframe: dd.DataFrame) -> None:
raise NotImplementedError


Component = t.TypeVar("Component", bound=BaseComponent)
"""Component type which can represents any of the subclasses of BaseComponent"""
18 changes: 18 additions & 0 deletions src/fondant/component_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,20 @@ def from_fondant_component_spec(
"type": "JsonObject",
"default": "None",
},
{
"name": "input_partition_rows",
"description": "The number of rows to load per partition. Set to override the"
" automatic partitioning",
"type": "String",
"default": "None",
},
{
"name": "output_partition_size",
"description": "The size of the output partition size, defaults"
" to 250MB. Set to `disable` to disable the automatic partitioning",
"type": "String",
"default": "None",
},
*(
{
"name": arg.name,
Expand Down Expand Up @@ -285,6 +299,10 @@ def from_fondant_component_spec(
{"inputValue": "metadata"},
"--component_spec",
{"inputValue": "component_spec"},
"--input_partition_rows",
{"inputValue": "input_partition_rows"},
"--output_partition_size",
{"inputValue": "output_partition_size"},
*cls._dump_args(fondant_component.args.values()),
"--output_manifest_path",
{"outputPath": "output_manifest_path"},
Expand Down
105 changes: 105 additions & 0 deletions src/fondant/data_io.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import os
import typing as t

import dask.dataframe as dd
Expand All @@ -17,6 +18,62 @@ def __init__(self, *, manifest: Manifest, component_spec: ComponentSpec) -> None


class DaskDataLoader(DataIO):
def __init__(
self,
*,
manifest: Manifest,
component_spec: ComponentSpec,
input_partition_rows: t.Optional[t.Union[int, str]] = None,
):
super().__init__(manifest=manifest, component_spec=component_spec)
self.input_partition_rows = input_partition_rows

def partition_loaded_dataframe(self, dataframe: dd.DataFrame) -> dd.DataFrame:
"""
Function that partitions the loaded dataframe depending on its partitions and the available
workers
Returns:
The partitioned dataframe.
"""
if self.input_partition_rows != "disable":
if isinstance(self.input_partition_rows, int):
# Only load the index column to trigger a faster compute of the rows
total_rows = len(dataframe.index)
# +1 to handle any remainder rows
n_partitions = (total_rows // self.input_partition_rows) + 1
dataframe = dataframe.repartition(npartitions=n_partitions)
logger.info(
f"Total number of rows is {total_rows}.\n"
f"Repartitioning the data from {dataframe.partitions} partitions to have"
f" {n_partitions} such that the number of partitions per row is approximately"
f"{self.input_partition_rows}",
)

elif self.input_partition_rows is None:
n_partitions = dataframe.npartitions
n_workers = os.cpu_count()
if n_partitions < n_workers: # type: ignore
logger.info(
f"The number of partitions of the input dataframe is {n_partitions}. The "
f"available number of workers is {n_workers}.",
)
dataframe = dataframe.repartition(npartitions=n_workers)
logger.info(
f"Repartitioning the data to {n_workers} partitions before processing"
f" to maximize worker usage",
)
else:
msg = (
f"{self.input_partition_rows} is not a valid argument. Choose either "
f"the number of partitions or set to 'disable' to disable automated "
f"partitioning"
)
raise ValueError(
msg,
)

return dataframe

def _load_subset(self, subset_name: str, fields: t.List[str]) -> dd.DataFrame:
"""
Function that loads a subset from the manifest as a Dask dataframe.
Expand Down Expand Up @@ -80,15 +137,63 @@ def load_dataframe(self) -> dd.DataFrame:
how="left",
)

dataframe = self.partition_loaded_dataframe(dataframe)

logging.info(f"Columns of dataframe: {list(dataframe.columns)}")

return dataframe


class DaskDataWriter(DataIO):
def __init__(
self,
*,
manifest: Manifest,
component_spec: ComponentSpec,
output_partition_size: t.Optional[t.Union[str]] = None,
):
super().__init__(manifest=manifest, component_spec=component_spec)

self.output_partition_size = output_partition_size

def partition_written_dataframe(self, dataframe: dd.DataFrame) -> dd.DataFrame:
"""
Function that partitions the written dataframe to smaller partitions based on a given
partition size.
"""
if self.output_partition_size != "disable":
if isinstance(self.output_partition_size, str):
dataframe = dataframe.repartition(
partition_size=self.output_partition_size,
)
logger.info(
f"Repartitioning the written data such that the size per partition is approx."
f" {self.output_partition_size}",
)

elif self.output_partition_size is None:
dataframe = dataframe.repartition(partition_size="250MB")
logger.info(
f"Repartitioning the written data such that the size per partition is approx."
f" {self.output_partition_size}. (Automatic repartitioning)",
)
else:
msg = (
f"{self.output_partition_size} is not a valid argument. Choose either the"
f" number of size of the partition (e.g. '250Mb' or set to 'disable' to"
f" disable automated partitioning"
)
raise ValueError(
msg,
)

return dataframe

def write_dataframe(self, dataframe: dd.DataFrame) -> None:
write_tasks = []

dataframe = self.partition_written_dataframe(dataframe)

dataframe.index = dataframe.index.rename("id").astype("string")

# Turn index into an empty dataframe so we can write it
Expand Down
Loading