-
Notifications
You must be signed in to change notification settings - Fork 90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(datasets): Add option to async load and save in PartitionedDatasets #696
base: main
Are you sure you want to change the base?
feat(datasets): Add option to async load and save in PartitionedDatasets #696
Conversation
Signed-off-by: puneeter <puneet.saini@quantumblack.com>
Signed-off-by: puneeter <puneet.saini@quantumblack.com>
Signed-off-by: puneeter <puneet.saini@quantumblack.com>
Signed-off-by: puneeter <puneet.saini@quantumblack.com>
Signed-off-by: puneeter <puneet.saini@quantumblack.com>
Hi @puneeter, can you please provide a description and any relevant development notes on the PR? This will make it easier for the team to review. |
I updated the description. Please let me know if it needs any refactoring. |
async def load_partition(partition: str) -> None: | ||
kwargs = deepcopy(self._dataset_config) | ||
kwargs[self._filepath_arg] = self._join_protocol(partition) | ||
dataset = self._dataset_type(**kwargs) # type: ignore | ||
partition_id = self._path_to_partition(partition) | ||
partitions[partition_id] = dataset.load | ||
|
||
await asyncio.gather( | ||
*[load_partition(partition) for partition in self._list_partitions()] | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, there's no actual I/O being performed here, right? Only the partitions
dictionary is being populated.
I don't see the need of using async helpers and asyncio.gather
here.
If anything, as a user I'd expect to have the async
loaders available in my node function so that I can await
them (provided that my node is asynchronous), use asyncio.gather
myself, or use an asyncio.TaskGroup
.
my_partitioned_dataset:
type: partitions.PartitionedDataset
path: s3://my-bucket-name/path/to/folder
...
use_async: True
def concat_partitions(partitioned_input: dict[str, Awaitable]) -> pd.DataFrame:
tasks = []
async with asyncio.TaskGroup() as tg:
for partition_key, partition_load_func in sorted(partitioned_input.items()):
tasks.append(tg.create_task(partition_load_func()))
result = pd.DataFrame()
result = pd.concat([result] + [tasks.result() for task in tasks], ignore_index=True, sort=True)
(not that I find this a particularly friendly DX, but it's more or less a continuation of our current approach https://docs.kedro.org/en/stable/data/partitioned_and_incremental_datasets.html#partitioned-dataset-load)
What am I missing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense. I am open to both the options. Let me know if you want to revert the load method to the original definition. Happy to also update the documentation once we are aligned with the changes made
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC the original question was using the async option of Runner, and we found that the partitioned dataset only do async on the whole dataset level and it is not efficient.
I think we need to think about this separately for save and load.
For load, the logic is actually implemented in node, can we already do this today with the async node @astrojuanlu shown? If so it seems that we don't need to change anything for load in this PR.
Save is where we actually need changes for partitioned dataset, especially lazy saving. I think it is reasonable to use async by default for save. This is not possible today because how we list partitions and save it in a sync loop. We can only do async on the whole partitioned dataset level but not the underlying dataset (using runner is_async).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason why we prefer making it at the dataset level rather than runner? It seems like having the common approach at the above layer is needed anyway to make it efficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now I think this is to achieve consistency with synchronous PartitionedDatasets
, not sure what you have in mind for runners but maybe we should discuss that separately? Unless you still see issues with the proposed approach
Signed-off-by: puneeter <puneet.saini@quantumblack.com>
Signed-off-by: puneeter <puneet.saini@quantumblack.com>
Signed-off-by: puneeter <puneet.saini@quantumblack.com>
Signed-off-by: puneeter <puneet.saini@quantumblack.com>
Would need team's help to point to the right documentation to be changed because of this change. Maybe: |
Description
PartitionedDataset
asynchronously for partitions provided.use_async
argument.Development notes
use_async
argument toPartitionedDataset
constructor is used to control the async load/save.argument
,_save
and_load
methods call different private functions.PartitionedDataset
by parameterizing value foruse_async
using@pytest.mark.parametrize("use_async", [True, False])
Checklist
RELEASE.md
file