-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Split dataframe to subsets #58
Conversation
fondant/dataset.py
Outdated
# Add the output subset to the manifest | ||
manifest_fields = [ | ||
(field.name, Type[field.type.name]) for field in subset.fields.values() | ||
] | ||
self.manifest.add_subset(subset_name, fields=manifest_fields) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Dataset
class should be a wrapper around an immutable Manifest
(see discussion here).
The evolve
method currently takes care of updating the manifest (adding subsets based on the output subsets of the component spec). cc @RobbeSneyders
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call, I think i'll resolve this when merging with the #56. Maybe let's keep the focus of this PR on the splitting logic with dask
fondant/component.py
Outdated
dataset.add_index(df) | ||
dataset.add_subsets(df, self.spec) | ||
index_task = dataset.get_upload_index_task(df) | ||
subset_tasks = dataset.get_upload_subsets_task(df, self.spec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to keep these tasks within the Dataset
class. That way we only need to implement different Dataset
classes if we want to support different frameworks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to change the merge subsets into dataframe into a task as well? So we can include it in the the task list and let dask figure out how to handle it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I'v updated it accordingly.Only small downside is that now we're handling both writing of index and all of the subsets separately but that shouldn't introduce a major performance bump
For testing you can use https://docs.pytest.org/en/7.1.x/how-to/tmp_path.html to write out the subsets temporarily. |
#56 has been merged. Can you rebase or merge so the conflicts are resolved? That will make it easier to review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @PhilippeMoussalli! General approach looks good to me. Left some comments :)
d4932de
to
d4f852f
Compare
fondant/component.py
Outdated
dataset.add_index(df) | ||
dataset.add_subsets(df, self.spec) | ||
index_task = dataset.get_upload_index_task(df) | ||
subset_tasks = dataset.get_upload_subsets_task(df, self.spec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to change the merge subsets into dataframe into a task as well? So we can include it in the the task list and let dask figure out how to handle it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What exactly changed in the parquet files ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the operation you defined returns a lazy dataframe so no need to define a task there.
I changed the source
data type from integer to string since that's what we currently expect
Args: | ||
df: The output Dask dataframe returned by the user. | ||
""" | ||
remote_path = self.manifest.index.location | ||
index_columns = list(self.manifest.index.fields.keys()) | ||
|
||
# load index dataframe | ||
index_df = df[index_columns] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this index_df
have an index ? Maybe we need to call .set_index()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we're using both the id
and source
as index. It does not seem like dask supports multiindex though https://dask.discourse.group/t/everything-about-multiindex-in-dask/593/2.
We might want to reconsider having the index be a string that contains both the source and id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could just index id
since it is better then no index (this is also what happens in the test data see split.py). I'll create a ticket to solve multi-index.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
alright added it to both index and subsets
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work!
This PR optimizes the process of executing dask transformations and writing by creating delayed tasks that are executed in parallel. This is mainly to optimize the process of writing many different subsets at the same time. I also added more documentation for the `Dataset` class. The changes did require some refactoring
This PR optimizes the process of executing dask transformations and writing by creating delayed tasks that are executed in parallel. This is mainly to optimize the process of writing many different subsets at the same time.
I also added more documentation for the
Dataset
class. The changes did require some refactoring