Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic backend dispatching to dask-expr #728

Merged
merged 18 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions dask_expr/_backends.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from __future__ import annotations

import pandas as pd
from dask.backends import CreationDispatch
from dask.dataframe.backends import DataFrameBackendEntrypoint

from dask_expr._dispatch import get_collection_type


class DXCreationDispatch(CreationDispatch):
"""Dask-expressions version of CreationDispatch"""

# TODO Remove after https://github.com/dask/dask/pull/10794
def dispatch(self, backend: str):
from dask.backends import detect_entrypoints

try:
impl = self._lookup[backend]
except KeyError:
entrypoints = detect_entrypoints(f"dask_expr.{self._module_name}.backends")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only reason we cannot use CreationDispatch directly is the fact that dask_cudf is already using the "dask.dataframe.backends" path to expose the "cudf" backend entrypoint for dask.dataframe.

We can just use CreationDispatch directly after dask/dask#10794 makes it possible to modify the entrypoint path upon initialization.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, removed this now that dask/dask#10794 is in.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scratch that - I should probably wait for the next dask/dask release to remove DXCreationDispatch

if backend in entrypoints:
return self.register_backend(backend, entrypoints[backend].load()())
else:
return impl
raise ValueError(f"No backend dispatch registered for {backend}")


dataframe_creation_dispatch = DXCreationDispatch(
module_name="dataframe",
default="pandas",
entrypoint_class=DataFrameBackendEntrypoint,
name="dataframe_creation_dispatch",
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we want to introduce a new CreationDispatch object for dask-expr, because we want to distinguish "legacy" dask-dataframe creation functions from newer expression-based creation functions (to be discovered at the "dask_expr.dataframe.backend" entrypoint path instead of "dask.dataframe.backend").

In both legacy and expr-based code, the same "dask.dataframe.backend" config value is used to specify the default backend.



class PandasBackendEntrypoint(DataFrameBackendEntrypoint):
"""Pandas-Backend Entrypoint Class for Dask-Expressions

Note that all DataFrame-creation functions are defined
and registered 'in-place'.
"""

@classmethod
def to_backend_dispatch(cls):
from dask.dataframe.dispatch import to_pandas_dispatch

return to_pandas_dispatch

@classmethod
def to_backend(cls, data, **kwargs):
if isinstance(data._meta, (pd.DataFrame, pd.Series, pd.Index)):
# Already a pandas-backed collection
return data
return data.map_partitions(cls.to_backend_dispatch(), **kwargs)


dataframe_creation_dispatch.register_backend("pandas", PandasBackendEntrypoint())


@get_collection_type.register(pd.Series)
def get_collection_type_series(_):
from dask_expr._collection import Series

return Series


@get_collection_type.register(pd.DataFrame)
def get_collection_type_dataframe(_):
from dask_expr._collection import DataFrame

return DataFrame


@get_collection_type.register(pd.Index)
def get_collection_type_index(_):
from dask_expr._collection import Index

return Index


@get_collection_type.register(object)
def get_collection_type_object(_):
from dask_expr._collection import Scalar

return Scalar


######################################
# cuDF: Pandas Dataframes on the GPU #
######################################


@get_collection_type.register_lazy("cudf")
def _register_cudf():
import dask_cudf # noqa: F401
26 changes: 13 additions & 13 deletions dask_expr/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
has_parallel_type,
is_arraylike,
is_dataframe_like,
is_index_like,
is_series_like,
meta_warning,
new_dd_object,
Expand Down Expand Up @@ -55,12 +54,15 @@
from pandas.api.types import is_timedelta64_dtype
from tlz import first

import dask_expr._backends # noqa: F401
from dask_expr import _expr as expr
from dask_expr._align import AlignPartitions
from dask_expr._backends import dataframe_creation_dispatch
from dask_expr._categorical import CategoricalAccessor, Categorize, GetCategories
from dask_expr._concat import Concat
from dask_expr._datetime import DatetimeAccessor
from dask_expr._describe import DescribeNonNumeric, DescribeNumeric
from dask_expr._dispatch import get_collection_type
from dask_expr._expr import (
BFill,
Diff,
Expand Down Expand Up @@ -1529,9 +1531,14 @@ def to_backend(self, backend: str | None = None, **kwargs):
-------
DataFrame, Series or Index
"""
from dask.dataframe.io import to_backend
from dask_expr._backends import dataframe_creation_dispatch

return to_backend(self.to_dask_dataframe(), backend=backend, **kwargs)
phofl marked this conversation as resolved.
Show resolved Hide resolved
# Get desired backend
backend = backend or dataframe_creation_dispatch.backend
# Check that "backend" has a registered entrypoint
backend_entrypoint = dataframe_creation_dispatch.dispatch(backend)
# Call `DataFrameBackendEntrypoint.to_backend`
return backend_entrypoint.to_backend(self, **kwargs)

def dot(self, other, meta=no_default):
if not isinstance(other, FrameBase):
Expand Down Expand Up @@ -1931,7 +1938,7 @@ def __getattr__(self, key):
# Check if key is in columns if key
# is not a normal attribute
if key in self.expr._meta.columns:
return Series(self.expr[key])
return new_collection(self.expr[key])
raise err
except AttributeError:
# Fall back to `BaseFrame.__getattr__`
Expand Down Expand Up @@ -3098,17 +3105,9 @@ def __array__(self):

def new_collection(expr):
"""Create new collection from an expr"""

meta = expr._meta
expr._name # Ensure backend is imported
if is_dataframe_like(meta):
return DataFrame(expr)
elif is_series_like(meta):
return Series(expr)
elif is_index_like(meta):
return Index(expr)
else:
return Scalar(expr)
return get_collection_type(meta)(expr)
Comment on lines 3164 to +3168
Copy link
Member Author

@rjzamora rjzamora Jan 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we have been using new_collection from the beginning, because we knew we needed to make this change to support other backends (i.e. cudf) long term.

The behavior of new_collection is now consistent with the behavior of new_dd_object in dask.dataframe.



def optimize(collection, fuse=True):
Expand Down Expand Up @@ -3175,6 +3174,7 @@ def from_graph(*args, **kwargs):
return new_collection(FromGraph(*args, **kwargs))


@dataframe_creation_dispatch.register_inplace("pandas")
def from_dict(
data,
npartitions,
Expand Down
5 changes: 5 additions & 0 deletions dask_expr/_dispatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from __future__ import annotations

from dask.utils import Dispatch

get_collection_type = Dispatch("get_collection_type")
Copy link
Member Author

@rjzamora rjzamora Jan 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we will need many dispatch functions here. So, we could also define this in dask.datafame.dispatch.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now publicly exposed as dask_expr.get_collection_type. This can always be moved to dask.datafame.dispatch in the future.

Loading