Skip to content
This repository has been archived by the owner on Nov 2, 2023. It is now read-only.

Commit

Permalink
Add support for polars v0.14.0 (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
JakobGM authored Aug 13, 2022
1 parent 9ff2bc0 commit 8fc784c
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 42 deletions.
12 changes: 6 additions & 6 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ keywords = ["validation", "dataframe"]
[tool.poetry.dependencies]
python = "^3.7"
pydantic = ">=1.7.0"
polars = ">=0.13.18"
polars = ">=0.14.0"
# Required for typing.Literal in python3.7
typing-extensions = "*"
pandas = {version = "*", optional = true, python = "^3.8"}
Expand Down
3 changes: 2 additions & 1 deletion src/patito/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@

from patito import exceptions, sql
from patito.exceptions import ValidationError
from patito.polars import DataFrame
from patito.polars import DataFrame, LazyFrame
from patito.pydantic import Field, Model

_DUCKDB_AVAILABLE = False
__all__ = [
"DataFrame",
"Expr",
"Field",
"LazyFrame",
"Model",
"Series",
"ValidationError",
Expand Down
17 changes: 12 additions & 5 deletions src/patito/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from typing_extensions import Literal

from patito import sql
from patito.polars import DataFrame
from patito.pydantic import Model, ModelType

try:
Expand All @@ -44,7 +45,13 @@

# Types which can be used to instantiate a DuckDB Relation object
RelationSource = Union[
"pd.DataFrame", pl.DataFrame, Path, str, "duckdb.DuckDBPyRelation", "Relation"
DataFrame,
pl.DataFrame,
"pd.DataFrame",
Path,
str,
"duckdb.DuckDBPyRelation",
"Relation",
]

# Used to refer to type(self) in Relation methods which preserve the type.
Expand Down Expand Up @@ -675,7 +682,7 @@ def to_pandas(self) -> "pd.DataFrame":
"""Return a pandas DataFrame representation of relation object."""
return cast("pd.DataFrame", self._relation.to_df())

def to_df(self) -> pl.DataFrame:
def to_df(self) -> DataFrame:
"""Return a polars DataFrame representation of relation object."""
# Here we do a star-select to work around certain weird issues with DuckDB
self._relation = self._relation.project("*")
Expand All @@ -691,7 +698,7 @@ def to_df(self) -> pl.DataFrame:
schema = schema.set(index, dict_field)
arrow_table = arrow_table.cast(schema)
try:
return cast(pl.DataFrame, pl.from_arrow(arrow_table))
return DataFrame._from_arrow(arrow_table)
except pa.ArrowInvalid: # pragma: no cover
# Empty relations with enum columns can sometimes produce errors.
# As a last-ditch effort, we convert such columns to VARCHAR.
Expand All @@ -703,15 +710,15 @@ def to_df(self) -> pl.DataFrame:
]
non_enum_relation = self._relation.project(", ".join(casted_columns))
arrow_table = non_enum_relation.to_arrow_table()
return cast(pl.DataFrame, pl.from_arrow(arrow_table))
return DataFrame._from_arrow(arrow_table)

def to_series(self) -> pl.Series:
if len(self._relation.columns) != 1:
raise TypeError(
f"{self.__class__.__name__}.to_series() was invoked on a relation with "
f"{len(self._relation.columns)} columns, while exactly 1 is required!"
)
dataframe = cast(pl.DataFrame, pl.from_arrow(self._relation.to_arrow_table()))
dataframe = DataFrame._from_arrow(self._relation.to_arrow_table())
return dataframe.to_series(index=0).alias(name=self.columns[0])

def union(self: RelationType, other: RelationSource) -> RelationType:
Expand Down
150 changes: 130 additions & 20 deletions src/patito/polars.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
Generic,
List,
Optional,
Sequence,
Type,
TypeVar,
Union,
Expand All @@ -20,11 +21,80 @@
from patito.exceptions import MultipleRowsReturned, RowDoesNotExist

if TYPE_CHECKING:
import numpy as np

from patito.pydantic import Model


DF = TypeVar("DF", bound="DataFrame")
LDF = TypeVar("LDF", bound="LazyFrame")
ModelType = TypeVar("ModelType", bound="Model")
OtherModelType = TypeVar("OtherModelType", bound="Model")


class LazyFrame(pl.LazyFrame, Generic[ModelType]):
"""LazyFrame class associated to DataFrame."""

model: Type[ModelType]

@classmethod
def _construct_lazyframe_model_class(
cls: Type[LDF], model: Optional[Type[ModelType]]
) -> Type[LazyFrame[ModelType]]:
"""
Return custom LazyFrame sub-class where LazyFrame.model is set.
Can be used to construct a LazyFrame class where
DataFrame.set_model(model) is implicitly invoked at collection.
Args:
model: A patito model which should be used to validate the final dataframe.
If None is provided, the regular LazyFrame class will be returned.
Returns:
A custom LazyFrame model class where LazyFrame.model has been correctly
"hard-coded" to the given model.
"""
if model is None:
return cls

new_class = type(
f"{model.schema()['title']}LazyFrame",
(cls,), # type: ignore
{"model": model},
)
return new_class

def collect(
self,
type_coercion: bool = True,
predicate_pushdown: bool = True,
projection_pushdown: bool = True,
simplify_expression: bool = True,
string_cache: bool = False,
no_optimization: bool = False,
slice_pushdown: bool = True,
) -> "DataFrame[ModelType]": # noqa: DAR101, DAR201
"""
Collect into a DataFrame.
See documentation of polars.DataFrame.collect for full description of
parameters.
"""
df = super().collect(
type_coercion=type_coercion,
predicate_pushdown=predicate_pushdown,
projection_pushdown=projection_pushdown,
simplify_expression=simplify_expression,
string_cache=string_cache,
no_optimization=no_optimization,
slice_pushdown=slice_pushdown,
)
if getattr(self, "model", False):
cls = DataFrame._construct_dataframe_model_class(model=self.model)
else:
cls = DataFrame
return cls._from_pydf(df._df)


class DataFrame(pl.DataFrame, Generic[ModelType]):
Expand All @@ -35,12 +105,12 @@ class DataFrame(pl.DataFrame, Generic[ModelType]):
`DataFrame.validate()`, `DataFrame.derive()`, and so on.
"""

model: ModelType
model: Type[ModelType]

@classmethod
def _construct_dataframe_model_class(
cls: Type[DF], model: Model
) -> DataFrame[Model]:
cls: Type[DF], model: Type[OtherModelType]
) -> Type[DataFrame[OtherModelType]]:
"""
Return custom DataFrame sub-class where DataFrame.model is set.
Expand All @@ -59,12 +129,24 @@ def _construct_dataframe_model_class(
(cls,), # type: ignore
{"model": model},
)
new_class._lazyframe_class = type( # type: ignore
f"{model.__class__.__name__}LazyFrame",
(new_class._lazyframe_class,), # type: ignore
{"_dataframe_class": new_class},
)
return cast("DataFrame[Model]", new_class)
return new_class

def lazy(self: DataFrame[ModelType]) -> LazyFrame[ModelType]:
"""
Convert DataFrame into LazyFrame.
See documentation of polars.DataFrame.lazy() for full description.
Returns:
A new LazyFrame object.
"""
lazyframe_class: LazyFrame[
ModelType
] = LazyFrame._construct_lazyframe_model_class(
model=getattr(self, "model", None)
) # type: ignore
ldf = lazyframe_class._from_pyldf(super().lazy()._ldf)
return ldf

def set_model(self, model): # noqa: ANN001, ANN201
"""
Expand Down Expand Up @@ -232,17 +314,16 @@ def derive(self: DF) -> DF:
"Can not derive dataframe column from type "
f"{type(derived_from)}."
)
return df.collect()
return cast(DF, df.collect())

def fill_null(
self: DF,
strategy: Union[
value: Optional[Any] = None,
strategy: Optional[
Literal[
"backward", "forward", "min", "max", "mean", "one", "zero", "defaults"
],
pl.Expr,
Any,
],
"forward", "backward", "min", "max", "mean", "zero", "one", "defaults"
]
] = None,
limit: Optional[int] = None,
) -> DF:
"""
Expand All @@ -252,6 +333,7 @@ def fill_null(
are used to fill missing values.
Args:
value: Value used to fill null values.
strategy: Accepts the same arguments as `polars.DataFrame.fill_null` in
addition to `"defaults"` which will use the field's default value if
provided.
Expand All @@ -263,10 +345,10 @@ def fill_null(
parameter.
"""
if strategy != "defaults": # pragma: no cover
# Support older versions of polars without the limit argument
if limit is not None:
return super().fill_null(strategy=strategy, limit=limit)
return super().fill_null(strategy=strategy)
return cast(
DF,
super().fill_null(value=value, strategy=strategy, limit=limit),
)
return self.with_columns(
[
pl.col(column).fill_null(pl.lit(default_value))
Expand Down Expand Up @@ -348,3 +430,31 @@ def read_csv(cls: Type[DF], *args, **kwargs) -> DF: # noqa: ANN
kwargs.setdefault("new_columns", cls.model.columns)
df = cls.model.DataFrame._from_pydf(pl.read_csv(*args, **kwargs)._df)
return df.derive()

# --- Type annotation overrides ---
def filter( # noqa: D102
self: DF,
predicate: Union[pl.Expr, str, pl.Series, list[bool], np.ndarray[Any, Any]],
) -> DF:
return cast(DF, super().filter(predicate=predicate))

def select( # noqa: D102
self: DF,
exprs: Union[str, pl.Expr, pl.Series, Sequence[Union[str, pl.Expr, pl.Series]]],
) -> DF:
return cast(DF, super().select(exprs=exprs))

def with_column(self: DF, column: Union[pl.Series, pl.Expr]) -> DF: # noqa: D102
return cast(DF, super().with_column(column=column))

def with_columns( # noqa: D102
self: DF,
exprs: Union[
pl.Expr,
pl.Series,
Sequence[Union[pl.Expr, pl.Series]],
None,
] = None,
**named_exprs: Union[pl.Expr, pl.Series],
) -> DF:
return cast(DF, super().with_columns(exprs=exprs, **named_exprs))
Loading

0 comments on commit 8fc784c

Please sign in to comment.