Skip to content

Commit

Permalink
add docs
Browse files Browse the repository at this point in the history
  • Loading branch information
PhilippeMoussalli committed Jul 25, 2023
1 parent 02561ab commit 99746dc
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 15 deletions.
67 changes: 66 additions & 1 deletion docs/pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ To build a pipeline, you need to define a set of component operations called `Co

The component specifications include the location of the Docker image in a registry.

The runtime configuration consists of the component's arguments and the definition of node pools and resources. For example, if a component requires GPU for model inference, you can specify the necessary GPU resources in the runtime configuration.
The runtime configuration consists of the component's arguments and the definition of node pools, resources and custom partitioning specification. For example, if a component requires GPU for model inference, you can specify the necessary GPU resources in the runtime configuration.

Here is an example of how to build a pipeline using Fondant:
```python
Expand Down Expand Up @@ -46,7 +46,72 @@ Next, we define two operations: `load_from_hub_op`, which is a based from a reus
!!! note "IMPORTANT"
Currently Fondant supports linear DAGs with single dependencies. Support for non-linear DAGs will be available in future releases.

## Setting Custom partitioning parameters

When working with Fondant, each component deals with datasets, and Dask is used internally
to handle datasets larger than the available memory. To achieve this, the data is divided
into smaller chunks called "partitions" that can be processed in parallel. Ensuring a sufficient number of partitions
enables parallel processing, where multiple workers process different partitions simultaneously,
and smaller partitions ensure they fit into memory.

### How Fondant handles partitions

**1) Repartitioning the Loaded DataFrame:** This step is optional and comes into play if the number
of partitions is fewer than the available workers on the data processing instance.
By repartitioning, the maximum number of workers can be efficiently utilized, leading to faster
and parallel processing.

**2) Repartitioning the Written DataFrame:** The written dataframe is also repartitioned into
smaller sizes (default 250MB) to enable the next component to load these partitions into memory.


### Customizing Partitioning

By default, Fondant automatically handles the partitioning, but you can disable this and create your
own custom partitioning logic if you have specific requirements.

Here's an example of disabling the automatic partitioning:

```python

caption_images_op = ComponentOp(
component_dir="components/captioning_component",
arguments={
"model_id": "Salesforce/blip-image-captioning-base",
"batch_size": 2,
"max_new_tokens": 50,
},
input_partition_rows='disable',
output_partition_size='disable',
)
```

The code snippet above disables automatic partitions for both the loaded and written dataframes,
allowing you to define your own partitioning logic inside the components.

Moreover, you have the flexibility to set your own custom partitioning parameters to override the default settings:

```python

caption_images_op = ComponentOp(
component_dir="components/captioning_component",
arguments={
"model_id": "Salesforce/blip-image-captioning-base",
"batch_size": 2,
"max_new_tokens": 50,
},
input_partition_rows=100,
output_partition_size="10MB",
)
```

In the example above, each partition of the loaded dataframe will contain approximately one hundred rows,
and the size of the output partitions will be around 10MB. This capability is useful in scenarios
where processing one row significantly increases the number of rows in the dataset
(resulting in dataset explosion) or causes a substantial increase in row size (e.g., fetching images from URLs).

By setting a lower value for input partition rows, you can mitigate issues where the processed data
grows larger than the available memory before being written to disk.

## Compiling a pipeline

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ RUN pip3 install --no-cache-dir -r requirements.txt

# Install Fondant
# This is split from other requirements to leverage caching
ARG FONDANT_VERSION=09ef9254fef5d382d7d60d97b66fa2ac1e0df7e0
ARG FONDANT_VERSION=main
RUN pip3 install fondant[aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION}

# Set the working directory to the component folder
Expand Down
10 changes: 5 additions & 5 deletions examples/pipelines/controlnet-interior-design/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@
pipeline = Pipeline(pipeline_name=pipeline_name, base_path=PipelineConfigs.BASE_PATH)

pipeline.add_op(generate_prompts_op)
# pipeline.add_op(laion_retrieval_op, dependencies=generate_prompts_op)
# pipeline.add_op(download_images_op, dependencies=laion_retrieval_op)
# pipeline.add_op(caption_images_op, dependencies=download_images_op)
# pipeline.add_op(segment_images_op, dependencies=caption_images_op)
# pipeline.add_op(write_to_hub_controlnet, dependencies=segment_images_op)
pipeline.add_op(laion_retrieval_op, dependencies=generate_prompts_op)
pipeline.add_op(download_images_op, dependencies=laion_retrieval_op)
pipeline.add_op(caption_images_op, dependencies=download_images_op)
pipeline.add_op(segment_images_op, dependencies=caption_images_op)
pipeline.add_op(write_to_hub_controlnet, dependencies=segment_images_op)
8 changes: 0 additions & 8 deletions tests/test_component_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,6 @@ def test_kfp_component_creation(valid_fondant_schema, valid_kubeflow_schema):
"""Test that the created kubeflow component matches the expected kubeflow component."""
fondant_component = ComponentSpec(valid_fondant_schema)
kubeflow_component = fondant_component.kubeflow_specification
with open("data.yml", "w") as outfile:
yaml.dump(
kubeflow_component._specification,
outfile,
default_flow_style=False,
sort_keys=False,
indent=4,
)
assert kubeflow_component._specification == valid_kubeflow_schema


Expand Down

0 comments on commit 99746dc

Please sign in to comment.