Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Optimize image download component #288

Closed
wants to merge 23 commits into from

Conversation

PhilippeMoussalli
Copy link
Contributor

PR that optimizes the image download component as proposed here:

https://docs.google.com/document/d/1Nv9gLe1uiD9mFt62LLJ1Z13cyIihNokOtY20jk9GEpY/edit#heading=h.9ucv3iu9w7zy

This implementation currently only optimizes the current component but the goal is to move the optimization outside the components and into Fondant. Once this approach is validated for different scenarios we can proceed to do so.

@PhilippeMoussalli PhilippeMoussalli self-assigned this Jul 11, 2023
@PhilippeMoussalli PhilippeMoussalli added Core Core framework Components Implementation of components labels Jul 11, 2023
@PhilippeMoussalli PhilippeMoussalli changed the title optimize image download component Optimize image download component Jul 11, 2023
Copy link
Member

@RobbeSneyders RobbeSneyders left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @PhilippeMoussalli!

Some questions that can hopefully help me build a better understanding 🙂

Did you run any benchmarks or use one of the profiling options provided by Dask to validate the changes?

@@ -123,8 +124,9 @@ def transform(
max_aspect_ratio=max_aspect_ratio,
)

# Remove duplicates from laion retrieval
# Remove duplicates from laion retrieval (global)
dataframe = dataframe.drop_duplicates()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we do this in the laion retrieval components instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally yes, but I think the issue there is that we are working on a per-partition basis since it's a pandas component. So we are unable to execute drop duplicates on the whole dataset afaik

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, but I don't think drop_duplicates is very performant until we sort on the index first, which is probably something we should do after retrieving LAION, since we're changing the ids.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would that sorting be global then and require repartitioning such that the duplicate ids would all be clustered in one partition? or is it per-partition?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Global indeed so they are clustered per partition. I think the drop_duplicates method requires a reshuffle step otherwise anyway. Have you generated a task graph of this component? This might be something interesting to log for each component actually.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying out the download images component for Datacomp and I had to remove drop_duplicates to get it to work

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is that? can you elaborate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me it resulted in TypeError: unhashable type: 'numpy.ndarray'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be because you have a numpy array in your dataframe and they considered mutable and not hashable. drop_duplicates() probably expects the elements to be hashable and by default it's now operating on all the columns, you can specify that you only want to drop an id column by using the subset argument as mentioned here

I will also make the changes in this PR

dataframe = dataframe.drop_duplicates()
dataframe = dataframe.repartition(npartitions=os.cpu_count())
Copy link
Member

@RobbeSneyders RobbeSneyders Jul 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if our data is larger than RAM x os.cpu_count()?

Maybe we should partition here based on size as well. If the size is sufficiently large, npartitions will be larger than os.cpu_count(). If the size is small, the optimization is less important.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, I still think that we should somehow consider both especially if the operation per row takes a long time but the dataset is just inherently small because it just consists of text data.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can consider both, that would be ideal, but not sure if we can.

Copy link
Contributor Author

@PhilippeMoussalli PhilippeMoussalli Jul 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a new updated workflow that I think can mitigate the issue of having large data (also added it to the doc). That way we only repartition if we need but the reparitions should still be smaller than RAM x os.cpu_count()

image

There is still the issue of having single partitions being larger than RAM, I proposed a way in which this can be tackled but would need to investigate more. Not sure if there is a clear-cut solution for that one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we actually know if n_partitions<n_workers at graph construction time?

We can make the first repartition based on memory dynamic. Where we use 250Mb by default, but the user can define a lower number if they know that their component explodes the memory usage per row.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we actually know if n_partitions<n_workers at graph construction time?

Yes, maybe it's not clear but this block would then be part of the transform component, i'v updated the plot.

df = dd.read_parquet()
n_partitions = df.npartitions
n_workers = os.cpu_count()
if n_partitions<n_workers:
     n_partitions = n_workers
df = df.repartition(n_partitions=n_partitions)
return df # dataframe returned to the user 

We can make the first repartition based on memory dynamic. Where we use 250Mb by default, but the user can define a lower number if they know that their component explodes the memory usage per row.

I think this approach could work, but would involve quite a bit of trial and error since you probably will adjust it if your pipeline fails

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, the split between the load and transform component was not clear to me before. I thought this was all happening in the transform component. I think this makes sense indeed!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update this PR to reflect this flow?

@@ -145,7 +147,7 @@ def transform(

# Remove images that could not be fetched
dataframe = dataframe.dropna()

dataframe = dataframe.repartition(partition_size="250MB")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the memory of a single partition after downloading the images is larger than RAM? Will it not lead to memory issues before this repartitioning is executed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that the repartitioning should happen on the fly as the dataset is being created and not at the end, but I might be wrong. Would need to check this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you might be right, we still have all partitions that only get re-partitioned at the end but they might be larger than RAM. The highlighted part is the partition that might be troublesome. Thanks to @shayorshay for helping with the interpretation.

image

@PhilippeMoussalli
Copy link
Contributor Author

Thanks @PhilippeMoussalli!

Some questions that can hopefully help me build a better understanding slightly_smiling_face

Did you run any benchmarks or use one of the profiling options provided by Dask to validate the changes?

Only bench-marking I did was with docker desktop for now and checked that there were more CPU usage active for the different cores. Didn't introduce the Dask profiling yet

@PhilippeMoussalli PhilippeMoussalli changed the title Optimize image download component [WIP] Optimize image download component Jul 18, 2023
@PhilippeMoussalli
Copy link
Contributor Author

Closing this PR in favor of #309

@PhilippeMoussalli
Copy link
Contributor Author

Closed in favor of #309

PhilippeMoussalli added a commit that referenced this pull request Jul 25, 2023
PR that introduces the partitioning strategy discussed in #288 

1) The automatic behavior is as follows for all component types (dask,
pandas)
* The written dataframe is re-partitioned to 250 Mb
* The loaded dataframe is re-partitioned depending on the current number
of partitions and workers

2) The behavior above can be overwritten by the end user in case they
want to implement their own custom logic, this is done on the
ComponentOp level as an additional flag parameters that can be passed.
See added docs with this PR for more details

I will handle adding the diagnostic tools and optimizing the downloader
component in a separate PR.
satishjasthi pushed a commit to satishjasthi/fondant that referenced this pull request Jul 26, 2023
PR that introduces the partitioning strategy discussed in ml6team#288 

1) The automatic behavior is as follows for all component types (dask,
pandas)
* The written dataframe is re-partitioned to 250 Mb
* The loaded dataframe is re-partitioned depending on the current number
of partitions and workers

2) The behavior above can be overwritten by the end user in case they
want to implement their own custom logic, this is done on the
ComponentOp level as an additional flag parameters that can be passed.
See added docs with this PR for more details

I will handle adding the diagnostic tools and optimizing the downloader
component in a separate PR.
Hakimovich99 pushed a commit that referenced this pull request Oct 16, 2023
PR that introduces the partitioning strategy discussed in #288 

1) The automatic behavior is as follows for all component types (dask,
pandas)
* The written dataframe is re-partitioned to 250 Mb
* The loaded dataframe is re-partitioned depending on the current number
of partitions and workers

2) The behavior above can be overwritten by the end user in case they
want to implement their own custom logic, this is done on the
ComponentOp level as an additional flag parameters that can be passed.
See added docs with this PR for more details

I will handle adding the diagnostic tools and optimizing the downloader
component in a separate PR.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Components Implementation of components Core Core framework
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

3 participants