Skip to content
/ cudf Public
forked from rapidsai/cudf

Commit

Permalink
Expunge NamedColumn
Browse files Browse the repository at this point in the history
Everything in the expression evaluation now operates on columns
without names. DataFrame construction takes either a mapping from
string-valued names to columns, or a sequence of pairs of names and
columns.

This removes some duplicate code in the NamedColumn class (by removing
it) where we had to fight the inheritance hierarchy.

- Closes rapidsai#16272
  • Loading branch information
wence- committed Oct 1, 2024
1 parent 04baa22 commit 97e3568
Show file tree
Hide file tree
Showing 10 changed files with 242 additions and 272 deletions.
4 changes: 2 additions & 2 deletions python/cudf_polars/cudf_polars/containers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from __future__ import annotations

__all__: list[str] = ["DataFrame", "Column", "NamedColumn"]
__all__: list[str] = ["DataFrame", "Column"]

from cudf_polars.containers.column import Column, NamedColumn
from cudf_polars.containers.column import Column
from cudf_polars.containers.dataframe import DataFrame
57 changes: 1 addition & 56 deletions python/cudf_polars/cudf_polars/containers/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import polars as pl

__all__: list[str] = ["Column", "NamedColumn"]
__all__: list[str] = ["Column"]


class Column:
Expand Down Expand Up @@ -217,58 +217,3 @@ def nan_count(self) -> int:
)
).as_py()
return 0


class NamedColumn(Column):
"""A column with a name."""

name: str

def __init__(
self,
column: plc.Column,
name: str,
*,
is_sorted: plc.types.Sorted = plc.types.Sorted.NO,
order: plc.types.Order = plc.types.Order.ASCENDING,
null_order: plc.types.NullOrder = plc.types.NullOrder.BEFORE,
) -> None:
super().__init__(
column, is_sorted=is_sorted, order=order, null_order=null_order
)
self.name = name

def copy(self, *, new_name: str | None = None) -> Self:
"""
A shallow copy of the column.
Parameters
----------
new_name
Optional new name for the copied column.
Returns
-------
New column sharing data with self.
"""
return type(self)(
self.obj,
self.name if new_name is None else new_name,
is_sorted=self.is_sorted,
order=self.order,
null_order=self.null_order,
)

def mask_nans(self) -> Self:
"""Return a shallow copy of self with nans masked out."""
# Annoying, the inheritance is not right (can't call the
# super-type mask_nans), but will sort that by refactoring
# later.
if plc.traits.is_floating_point(self.obj.type()):
old_count = self.obj.null_count()
mask, new_count = plc.transform.nans_to_nulls(self.obj)
result = type(self)(self.obj.with_mask(mask, new_count), self.name)
if old_count == new_count:
return result.sorted_like(self)
return result
return self.copy()
99 changes: 50 additions & 49 deletions python/cudf_polars/cudf_polars/containers/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,33 @@

import polars as pl

from cudf_polars.containers.column import NamedColumn
from cudf_polars.containers import Column
from cudf_polars.utils import dtypes

if TYPE_CHECKING:
from collections.abc import Mapping, Sequence, Set
from collections.abc import Iterable, Mapping, Sequence, Set

from typing_extensions import Self

from cudf_polars.containers import Column


__all__: list[str] = ["DataFrame"]


class DataFrame:
"""A representation of a dataframe."""

columns: list[NamedColumn]
column_map: dict[str, Column]
table: plc.Table

def __init__(self, columns: Sequence[NamedColumn]) -> None:
self.columns = list(columns)
self._column_map = {c.name: c for c in self.columns}
self.table = plc.Table([c.obj for c in columns])
def __init__(
self, columns: Iterable[tuple[str, Column]] | Mapping[str, Column]
) -> None:
self.column_map = dict(columns)
self.table = plc.Table([c.obj for c in self.column_map.values()])

def copy(self) -> Self:
"""Return a shallow copy of self."""
return type(self)([c.copy() for c in self.columns])
return type(self)((name, c.copy()) for name, c in self.column_map.items())

def to_polars(self) -> pl.DataFrame:
"""Convert to a polars DataFrame."""
Expand All @@ -51,42 +50,47 @@ def to_polars(self) -> pl.DataFrame:
# https://github.com/pola-rs/polars/issues/11632
# To guarantee we produce correct names, we therefore
# serialise with names we control and rename with that map.
name_map = {f"column_{i}": c.name for i, c in enumerate(self.columns)}
name_map = {f"column_{i}": name for i, name in enumerate(self.column_map)}
table: pa.Table = plc.interop.to_arrow(
self.table,
[plc.interop.ColumnMetadata(name=name) for name in name_map],
)
df: pl.DataFrame = pl.from_arrow(table)
return df.rename(name_map).with_columns(
*(
pl.col(c.name).set_sorted(
descending=c.order == plc.types.Order.DESCENDING
pl.col(name).set_sorted(
descending=column.order == plc.types.Order.DESCENDING
)
if c.is_sorted
else pl.col(c.name)
for c in self.columns
if column.is_sorted
else pl.col(name)
for name, column in self.column_map.items()
)
)

@cached_property
def column_names_set(self) -> frozenset[str]:
"""Return the column names as a set."""
return frozenset(c.name for c in self.columns)
return frozenset(self.column_map)

@cached_property
def column_names(self) -> list[str]:
"""Return a list of the column names."""
return [c.name for c in self.columns]
return list(self.column_map)

@cached_property
def columns(self) -> list[Column]:
"""Return a list of the columns."""
return list(self.column_map.values())

@cached_property
def num_columns(self) -> int:
"""Number of columns."""
return len(self.columns)
return len(self.column_map)

@cached_property
def num_rows(self) -> int:
"""Number of rows."""
return 0 if len(self.columns) == 0 else self.table.num_rows()
return 0 if len(self.column_map) == 0 else self.table.num_rows()

@classmethod
def from_polars(cls, df: pl.DataFrame) -> Self:
Expand All @@ -111,12 +115,8 @@ def from_polars(cls, df: pl.DataFrame) -> Self:
# No-op if the schema is unchanged.
d_table = plc.interop.from_arrow(table.cast(schema))
return cls(
[
NamedColumn(column, h_col.name).copy_metadata(h_col)
for column, h_col in zip(
d_table.columns(), df.iter_columns(), strict=True
)
]
(h_col.name, Column(column).copy_metadata(h_col))
for column, h_col in zip(d_table.columns(), df.iter_columns(), strict=True)
)

@classmethod
Expand All @@ -143,12 +143,7 @@ def from_table(cls, table: plc.Table, names: Sequence[str]) -> Self:
"""
if table.num_columns() != len(names):
raise ValueError("Mismatching name and table length.")
return cls(
[
NamedColumn(c, name)
for c, name in zip(table.columns(), names, strict=True)
]
)
return cls(zip(names, map(Column, table.columns()), strict=True))

def sorted_like(
self, like: DataFrame, /, *, subset: Set[str] | None = None
Expand All @@ -175,13 +170,15 @@ def sorted_like(
if like.column_names != self.column_names:
raise ValueError("Can only copy from identically named frame")
subset = self.column_names_set if subset is None else subset
self.columns = [
c.sorted_like(other) if c.name in subset else c
for c, other in zip(self.columns, like.columns, strict=True)
]
self.column_map = {
name: column.sorted_like(other) if name in subset else column
for (name, column), other in zip(
self.column_map.items(), like.column_map.values(), strict=True
)
}
return self

def with_columns(self, columns: Sequence[NamedColumn]) -> Self:
def with_columns(self, columns: Iterable[tuple[str, Column]]) -> Self:
"""
Return a new dataframe with extra columns.
Expand All @@ -198,36 +195,40 @@ def with_columns(self, columns: Sequence[NamedColumn]) -> Self:
-----
If column names overlap, newer names replace older ones.
"""
columns = list(
{c.name: c for c in itertools.chain(self.columns, columns)}.values()
)
return type(self)(columns)
return type(self)(itertools.chain(self.column_map.items(), columns))

def discard_columns(self, names: Set[str]) -> Self:
"""Drop columns by name."""
return type(self)([c for c in self.columns if c.name not in names])
return type(self)(
(name, column)
for name, column in self.column_map.items()
if name not in names
)

def select(self, names: Sequence[str]) -> Self:
"""Select columns by name returning DataFrame."""
want = set(names)
if not want.issubset(self.column_names_set):
raise ValueError("Can't select missing names")
return type(self)([self._column_map[name] for name in names])
return type(self)((name, self.column_map[name]) for name in names)

def replace_columns(self, *columns: NamedColumn) -> Self:
def replace_columns(self, *columns: tuple[str, Column]) -> Self:
"""Return a new dataframe with columns replaced by name."""
new = {c.name: c for c in columns}
new = dict(columns)
if not set(new).issubset(self.column_names_set):
raise ValueError("Cannot replace with non-existing names")
return type(self)([new.get(c.name, c) for c in self.columns])
return type(self)(self.column_map | new)

def rename_columns(self, mapping: Mapping[str, str]) -> Self:
"""Rename some columns."""
return type(self)([c.copy(new_name=mapping.get(c.name)) for c in self.columns])
return type(self)(
(mapping.get(name, name), column)
for name, column in self.column_map.items()
)

def select_columns(self, names: Set[str]) -> list[NamedColumn]:
def select_columns(self, names: Set[str]) -> list[Column]:
"""Select columns by name."""
return [c for c in self.columns if c.name in names]
return [c for name, c in self.column_map.items() if name in names]

def filter(self, mask: Column) -> Self:
"""Return a filtered table given a mask."""
Expand Down
17 changes: 5 additions & 12 deletions python/cudf_polars/cudf_polars/dsl/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from polars.exceptions import InvalidOperationError
from polars.polars import _expr_nodes as pl_expr

from cudf_polars.containers import Column, NamedColumn
from cudf_polars.containers import Column
from cudf_polars.utils import dtypes, sorting

if TYPE_CHECKING:
Expand Down Expand Up @@ -313,7 +313,7 @@ def evaluate(
*,
context: ExecutionContext = ExecutionContext.FRAME,
mapping: Mapping[Expr, Column] | None = None,
) -> NamedColumn:
) -> tuple[str, Column]:
"""
Evaluate this expression given a dataframe for context.
Expand All @@ -328,21 +328,14 @@ def evaluate(
Returns
-------
NamedColumn attaching a name to an evaluated Column
tuple of name and evaluated Column
See Also
--------
:meth:`Expr.evaluate` for details, this function just adds the
name to a column produced from an expression.
"""
obj = self.value.evaluate(df, context=context, mapping=mapping)
return NamedColumn(
obj.obj,
self.name,
is_sorted=obj.is_sorted,
order=obj.order,
null_order=obj.null_order,
)
return self.name, self.value.evaluate(df, context=context, mapping=mapping)

def collect_agg(self, *, depth: int) -> AggInfo:
"""Collect information about aggregations in groupbys."""
Expand Down Expand Up @@ -428,7 +421,7 @@ def do_evaluate(
mapping: Mapping[Expr, Column] | None = None,
) -> Column:
"""Evaluate this expression given a dataframe for context."""
return df._column_map[self.name]
return df.column_map[self.name]

def collect_agg(self, *, depth: int) -> AggInfo:
"""Collect information about aggregations in groupbys."""
Expand Down
Loading

0 comments on commit 97e3568

Please sign in to comment.