Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for pandas DataFrame subclasses #52

Open
jorisvandenbossche opened this issue Jun 21, 2021 · 2 comments
Open

Support for pandas DataFrame subclasses #52

jorisvandenbossche opened this issue Jun 21, 2021 · 2 comments

Comments

@jorisvandenbossche
Copy link
Member

When dask uses partd for eg shuffle operations, the dataframes always come back as a pandas.DataFrame, even if a subclass was stored (xref geopandas/dask-geopandas#59 (comment)).

For example:

import geopandas
gdf = geopandas.read_file(geopandas.datasets.get_path("naturalearth_lowres"))

import partd
# dask.dataframe shuffle operations use PandasBlocks
p = partd.PandasBlocks(partd.Dict())

p.append({"gdf": gdf})
res = p.get("gdf")

>>> type(gdf)
pandas.core.frame.DataFrame
>>> type(res)
pandas.core.frame.DataFrame

To be able to use dask's shuffle operations with dask_geopandas, which uses a pandas subclass as the partition type, the subclass should be preserved in the partd roundtrip (or are there other ways that you can override / dispatch this operation in dask?).
I was wondering how other dask.dataframe subclasses handle this, but eg dask_cudf doesn't seem to support "disk"-based shuffling.

@jorisvandenbossche
Copy link
Member Author

The pandas.DataFrame is recreated here:

partd/partd/pandas.py

Lines 194 to 203 in 9c9ba0a

def deserialize(bytes):
""" Deserialize and decompress bytes back to a pandas DataFrame """
frames = list(framesplit(bytes))
headers = pickle.loads(frames[0])
bytes = frames[1:]
axes = [index_from_header_bytes(headers[0], bytes[0]),
index_from_header_bytes(headers[1], bytes[1])]
blocks = [block_from_header_bytes(h, b)
for (h, b) in zip(headers[2:], bytes[2:])]
return pd.DataFrame(create_block_manager_from_blocks(blocks, axes))

Currently only the actual underlying data is stored, and so when deserializing, I don't think partd knows anything about the original class? So if we want to change that, it would need to start store some additional information?

An alternative might be to tackle this on the dask side, and ensure the retrieved part is of the same type as meta (eg could do something like res = meta._constructor(res) at https://github.com/dask/dask/blob/8aea537d925b794a94f828d35211a5da05ad9dce/dask/dataframe/shuffle.py#L740-L744)

@jrbourbeau
Copy link
Member

cc @madsbk @quasiben for their GPU-shuffle connection

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants