Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce repartitioning #309

Merged
merged 37 commits into from
Jul 25, 2023
Merged

Introduce repartitioning #309

merged 37 commits into from
Jul 25, 2023

Conversation

PhilippeMoussalli
Copy link
Contributor

@PhilippeMoussalli PhilippeMoussalli commented Jul 19, 2023

PR that introduces the partitioning strategy discussed in #288

  1. The automatic behavior is as follows for all component types (dask, pandas)
  • The written dataframe is re-partitioned to 250 Mb
  • The loaded dataframe is re-partitioned depending on the current number of partitions and workers
  1. The behavior above can be overwritten by the end user in case they want to implement their own custom logic, this is done on the ComponentOp level as an additional flag parameters that can be passed. See added docs with this PR for more details

I will handle adding the diagnostic tools and optimizing the downloader component in a separate PR.

@PhilippeMoussalli PhilippeMoussalli requested review from RobbeSneyders and GeorgesLorre and removed request for RobbeSneyders and GeorgesLorre July 19, 2023 15:01
f"available number of workers is {n_workers}.",
)
if n_partitions < n_workers:
dataframe = dataframe.repartition(npartitions=n_partitions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm we first get n_partitions = dataframe.npartitions and then we repartition using the same number?

Can you explain?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woops it should be dataframe = dataframe.repartition(npartitions=n_workers)
good catch

Copy link
Member

@RobbeSneyders RobbeSneyders left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @PhilippeMoussalli! Some comments.

@@ -12,7 +12,7 @@ RUN pip3 install --no-cache-dir -r requirements.txt

# Install Fondant
# This is split from other requirements to leverage caching
ARG FONDANT_VERSION=main
ARG FONDANT_VERSION=09ef9254fef5d382d7d60d97b66fa2ac1e0df7e0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be reverted before merging.

Comment on lines 76 to 80
# pipeline.add_op(laion_retrieval_op, dependencies=generate_prompts_op)
# pipeline.add_op(download_images_op, dependencies=laion_retrieval_op)
# pipeline.add_op(caption_images_op, dependencies=download_images_op)
# pipeline.add_op(segment_images_op, dependencies=caption_images_op)
# pipeline.add_op(write_to_hub_controlnet, dependencies=segment_images_op)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be reverted before merging.

Comment on lines +44 to +50
class DaskWriteComponent(BaseComponent):
"""Component that accepts a Dask DataFrame and writes its contents."""

def write(self, dataframe: dd.DataFrame) -> None:
raise NotImplementedError


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just moved in the file? I think both orders can be logical (Dask -> Pandas) or (Read -> Transform -> Write)

Comment on lines 259 to 265
{
"name": "output_partition_size",
"description": "The size of the output partition size, defaults"
" to 250MB. Set to `disable` to disable the automatic partitioning",
"type": "String",
"default": "250MB",
},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's the output partitioning we need to make dynamic, as this will only impact the following component. I think the user should be able to overwrite the input partitioning, so the partitions can be made small at the start and still fit in memory when the data grows.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I think it would be ideal if the user could specify it in rows instead of MB, but not sure if that's possible.

Comment on lines 21 to 22
def __init__(self, *, manifest: Manifest, component_spec: ComponentSpec):
super().__init__(manifest=manifest, component_spec=component_spec)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to overwrite this if yo're just calling super.

f"The number of partitions of the input dataframe is {n_partitions}. The "
f"available number of workers is {n_workers}.",
)
if n_partitions < n_workers:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would then become:

Suggested change
if n_partitions < n_workers:
if input_partition_size:
dataframe.repartition(partition_size=input_partition_size)
elif n_partitions < n_workers:

@@ -159,6 +210,8 @@ def _write_subset(

schema = {field.name: field.type.value for field in subset_spec.fields.values()}

dataframe = self.partition_written_dataframe(dataframe)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this done on a subset level here? I would do it once on the dataframe level.

@@ -39,13 +39,15 @@ def __init__(
input_manifest_path: t.Union[str, Path],
output_manifest_path: t.Union[str, Path],
metadata: t.Dict[str, t.Any],
user_arguments: t.Dict[str, Argument],
user_arguments: t.Dict[str, t.Any],
output_partition_size: t.Optional[str] = "250MB",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be added to the parser as well, so it's extracted by from_args?

parameters.
"""

def _validate_partition_size_arg(file_size):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add this as a type when registering it on the parser in the Executor class?

Comment on lines +103 to +104
input_partition_rows=100,
output_partition_size="10MB",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason the input partitions are specified in terms of rows and the output partitions in terms of size?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the output partition size ensures that the written partitions are small and can be easily loaded by the next component.

The input is defined by rows to allow you to easily iterate on it if you run into out of memory issues, for example if you run into OOM issues when retrieving 100 images from URLs. It's more intuitive to set that number to lower (10 rows for example) then to change the size of the input partitions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya I do feel using num rows is way more intuitive than size

Copy link
Collaborator

@GeorgesLorre GeorgesLorre left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! Looking forward to see this in action

@PhilippeMoussalli PhilippeMoussalli merged commit 56b8326 into main Jul 25, 2023
@PhilippeMoussalli PhilippeMoussalli deleted the introduce-repartioning branch July 25, 2023 11:00
satishjasthi pushed a commit to satishjasthi/fondant that referenced this pull request Jul 26, 2023
PR that introduces the partitioning strategy discussed in ml6team#288 

1) The automatic behavior is as follows for all component types (dask,
pandas)
* The written dataframe is re-partitioned to 250 Mb
* The loaded dataframe is re-partitioned depending on the current number
of partitions and workers

2) The behavior above can be overwritten by the end user in case they
want to implement their own custom logic, this is done on the
ComponentOp level as an additional flag parameters that can be passed.
See added docs with this PR for more details

I will handle adding the diagnostic tools and optimizing the downloader
component in a separate PR.
Hakimovich99 pushed a commit that referenced this pull request Oct 16, 2023
PR that introduces the partitioning strategy discussed in #288 

1) The automatic behavior is as follows for all component types (dask,
pandas)
* The written dataframe is re-partitioned to 250 Mb
* The loaded dataframe is re-partitioned depending on the current number
of partitions and workers

2) The behavior above can be overwritten by the end user in case they
want to implement their own custom logic, this is done on the
ComponentOp level as an additional flag parameters that can be passed.
See added docs with this PR for more details

I will handle adding the diagnostic tools and optimizing the downloader
component in a separate PR.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants