-
Notifications
You must be signed in to change notification settings - Fork 92
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(datasets): support Polars lazy evaluation (#350)
* feat(datasets) add PolarsDataset to support Polars's Lazy API Signed-off-by: Matthias Roels <matthias.roels21@gmail.com> * Fix(datasets): rename PolarsDataSet to PolarsDataSet Add PolarsDataSet as an alias for PolarsDataset with deprecation warning. Signed-off-by: Matthias Roels <matthias.roels21@gmail.com> * Fix(datasets): apply ruff linting rules Signed-off-by: Matthias Roels <matthias.roels21@gmail.com> * Fix(datasets): Correct pattern matching when Raising exceptions Corrected PolarsDataSet to PolarsDataset in the pattern to match in test_load_missing_file Signed-off-by: Matthias Roels <matthias.roels21@gmail.com> * fix(datasets): clean up PolarsDataset related code Remove reference to PolarsDataSet as this is not required for new dataset implementations. Signed-off-by: Matthias Roels <matthias.roels21@gmail.com> * feat(datasets): Rename Polars Datasets to better describe their intent Signed-off-by: Matthias Roels <matthias.roels21@gmail.com> * feat(datasets): clean up LazyPolarsDataset Signed-off-by: Matthias Roels <matthias.roels21@gmail.com> * fix(datasets): increase test coverage for PolarsDataset classes Signed-off-by: Matthias Roels <matthias.roels21@gmail.com> * docs(datasets): add renamed Polars datasets to docs Signed-off-by: Matthias Roels <matthias.roels21@gmail.com> * docs(datasets): Add new polars datasets to release notes Signed-off-by: Matthias Roels <mroels2@its.jnj.com> * fix(datasets): load_args not properly passed to LazyPolarsDataset.load Signed-off-by: Matthias Roels <matthias.roels21@gmail.com> * docs(datasets): fix spelling error in release notes Co-authored-by: Merel Theisen <49397448+merelcht@users.noreply.github.com> Signed-off-by: Matthias Roels <matthias.roels21@gmail.com> --------- Signed-off-by: Matthias Roels <matthias.roels21@gmail.com> Signed-off-by: Matthias Roels <mroels2@its.jnj.com> Signed-off-by: Merel Theisen <49397448+merelcht@users.noreply.github.com> Co-authored-by: Matthias Roels <mroels2@its.jnj.com> Co-authored-by: Merel Theisen <49397448+merelcht@users.noreply.github.com>
- Loading branch information
1 parent
7140080
commit b44bf0e
Showing
9 changed files
with
741 additions
and
57 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
248 changes: 248 additions & 0 deletions
248
kedro-datasets/kedro_datasets/polars/lazy_polars_dataset.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,248 @@ | ||
"""``LazyPolarsDataset`` loads/saves data from/to a data file using an underlying | ||
filesystem (e.g.: local, S3, GCS). It uses polars to handle the | ||
type of read/write target. | ||
""" | ||
import logging | ||
from copy import deepcopy | ||
from io import BytesIO | ||
from pathlib import PurePosixPath | ||
from typing import Any, ClassVar, Dict, Optional, Union | ||
|
||
import fsspec | ||
import polars as pl | ||
import pyarrow.dataset as ds | ||
from kedro.io.core import ( | ||
AbstractVersionedDataSet, | ||
DatasetError, | ||
Version, | ||
get_filepath_str, | ||
get_protocol_and_path, | ||
) | ||
|
||
ACCEPTED_FILE_FORMATS = ["csv", "parquet"] | ||
|
||
PolarsFrame = Union[pl.LazyFrame, pl.DataFrame] | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class LazyPolarsDataset(AbstractVersionedDataSet[pl.LazyFrame, PolarsFrame]): | ||
"""``LazyPolarsDataset`` loads/saves data from/to a data file using an | ||
underlying filesystem (e.g.: local, S3, GCS). It uses Polars to handle | ||
the type of read/write target. It uses lazy loading with Polars Lazy API, but it can | ||
save both Lazy and Eager Polars DataFrames. | ||
Example usage for the `YAML API <https://kedro.readthedocs.io/en/stable/data/\ | ||
data_catalog_yaml_examples.html>`_: | ||
.. code-block:: yaml | ||
>>> cars: | ||
>>> type: polars.LazyPolarsDataset | ||
>>> filepath: data/01_raw/company/cars.csv | ||
>>> load_args: | ||
>>> sep: "," | ||
>>> parse_dates: False | ||
>>> save_args: | ||
>>> has_header: False | ||
null_value: "somenullstring" | ||
>>> | ||
>>> motorbikes: | ||
>>> type: polars.LazyPolarsDataset | ||
>>> filepath: s3://your_bucket/data/02_intermediate/company/motorbikes.csv | ||
>>> credentials: dev_s3 | ||
Example using Python API: | ||
:: | ||
>>> from kedro_datasets.polars import LazyPolarsDataset | ||
>>> import polars as pl | ||
>>> | ||
>>> data = pl.DataFrame({'col1': [1, 2], 'col2': [4, 5], | ||
>>> 'col3': [5, 6]}) | ||
>>> | ||
>>> data_set = LazyPolarsDataset(filepath="test.csv") | ||
>>> data_set.save(data) | ||
>>> reloaded = data_set.load() | ||
>>> assert data.frame_equal(reloaded) | ||
""" | ||
|
||
DEFAULT_LOAD_ARGS: ClassVar[Dict[str, Any]] = {} | ||
DEFAULT_SAVE_ARGS: ClassVar[Dict[str, Any]] = {} | ||
|
||
def __init__( # noqa: PLR0913 | ||
self, | ||
filepath: str, | ||
file_format: str, | ||
load_args: Optional[Dict[str, Any]] = None, | ||
save_args: Optional[Dict[str, Any]] = None, | ||
version: Version = None, | ||
credentials: Optional[Dict[str, Any]] = None, | ||
fs_args: Optional[Dict[str, Any]] = None, | ||
metadata: Optional[Dict[str, Any]] = None, | ||
) -> None: | ||
"""Creates a new instance of ``LazyPolarsDataset`` pointing to a concrete | ||
data file on a specific filesystem. | ||
Args: | ||
filepath: Filepath in POSIX format to a file prefixed with a protocol like | ||
`s3://`. | ||
If prefix is not provided, `file` protocol (local filesystem) | ||
will be used. | ||
The prefix should be any protocol supported by ``fsspec``. | ||
Key assumption: The first argument of either load/save method points to | ||
a filepath/buffer/io type location. There are some read/write targets | ||
such as 'clipboard' or 'records' that will fail since they do not take a | ||
filepath like argument. | ||
file_format: String which is used to match the appropriate load/save method | ||
on a best effort basis. For example if 'csv' is passed the | ||
`polars.read_csv` and | ||
`polars.DataFrame.write_csv` methods will be identified. An error will | ||
be raised unless | ||
at least one matching `read_{file_format}` or `write_{file_format}`. | ||
load_args: polars options for loading files. | ||
Here you can find all available arguments: | ||
https://pola-rs.github.io/polars/py-polars/html/reference/io.html | ||
All defaults are preserved. | ||
save_args: Polars options for saving files. | ||
Here you can find all available arguments: | ||
https://pola-rs.github.io/polars/py-polars/html/reference/io.html | ||
All defaults are preserved. | ||
version: If specified, should be an instance of | ||
``kedro.io.core.Version``. If its ``load`` attribute is | ||
None, the latest version will be loaded. If its ``save`` | ||
attribute is None, save version will be autogenerated. | ||
credentials: Credentials required to get access to the underlying filesystem. | ||
E.g. for ``GCSFileSystem`` it should look like `{"token": None}`. | ||
fs_args: Extra arguments to pass into underlying filesystem class constructor | ||
(e.g. `{"project": "my-project"}` for ``GCSFileSystem``), as well as | ||
to pass to the filesystem's `open` method through nested keys | ||
`open_args_load` and `open_args_save`. | ||
Here you can find all available arguments for `open`: | ||
https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.open | ||
All defaults are preserved, except `mode`, which is set to `r` when loading | ||
and to `w` when saving. | ||
metadata: Any arbitrary metadata. | ||
This is ignored by Kedro, but may be consumed by users or external plugins. | ||
Raises: | ||
DatasetError: Will be raised if at least less than one appropriate | ||
read or write methods are identified. | ||
""" | ||
self._file_format = file_format.lower() | ||
|
||
if self._file_format not in ACCEPTED_FILE_FORMATS: | ||
raise DatasetError( | ||
f"'{self._file_format}' is not an accepted format " | ||
f"({ACCEPTED_FILE_FORMATS}) ensure that your 'file_format' parameter " | ||
"has been defined correctly as per the Polars API " | ||
"https://pola-rs.github.io/polars/py-polars/html/reference/io.html" | ||
) | ||
|
||
_fs_args = deepcopy(fs_args) or {} | ||
_credentials = deepcopy(credentials) or {} | ||
|
||
protocol, path = get_protocol_and_path(filepath, version) | ||
if protocol == "file": | ||
_fs_args.setdefault("auto_mkdir", True) | ||
|
||
self._protocol = protocol | ||
self._storage_options = {**_credentials, **_fs_args} | ||
self._fs = fsspec.filesystem(self._protocol, **self._storage_options) | ||
|
||
self.metadata = metadata | ||
|
||
super().__init__( | ||
filepath=PurePosixPath(path), | ||
version=version, | ||
exists_function=self._fs.exists, | ||
glob_function=self._fs.glob, | ||
) | ||
|
||
# Handle default load and save arguments | ||
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) | ||
if load_args is not None: | ||
self._load_args.update(load_args) | ||
self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) | ||
if save_args is not None: | ||
self._save_args.update(save_args) | ||
|
||
if "storage_options" in self._save_args or "storage_options" in self._load_args: | ||
logger.warning( | ||
"Dropping 'storage_options' for %s, " | ||
"please specify them under 'fs_args' or 'credentials'.", | ||
self._filepath, | ||
) | ||
self._save_args.pop("storage_options", None) | ||
self._load_args.pop("storage_options", None) | ||
|
||
def _describe(self) -> Dict[str, Any]: | ||
return { | ||
"filepath": self._filepath, | ||
"protocol": self._protocol, | ||
"load_args": self._load_args, | ||
"save_args": self._save_args, | ||
"version": self._version, | ||
} | ||
|
||
def _load(self) -> pl.LazyFrame: | ||
load_path = str(self._get_load_path()) | ||
|
||
if self._protocol == "file": | ||
# With local filesystems, we can use Polar's build-in I/O method: | ||
load_method = getattr(pl, f"scan_{self._file_format}", None) | ||
return load_method(load_path, **self._load_args) | ||
|
||
# For object storage, we use pyarrow for I/O: | ||
dataset = ds.dataset( | ||
load_path, filesystem=self._fs, format=self._file_format, **self._load_args | ||
) | ||
return pl.scan_pyarrow_dataset(dataset) | ||
|
||
def _save(self, data: Union[pl.DataFrame, pl.LazyFrame]) -> None: | ||
save_path = get_filepath_str(self._get_save_path(), self._protocol) | ||
|
||
collected_data = None | ||
if isinstance(data, pl.LazyFrame): | ||
collected_data = data.collect() | ||
else: | ||
collected_data = data | ||
|
||
# Note: polars does support writing partitioned parquet file | ||
# it is leveraging Arrow to do so, see e.g. | ||
# https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.DataFrame.write_parquet.html | ||
save_method = getattr(collected_data, f"write_{self._file_format}", None) | ||
if save_method: | ||
buf = BytesIO() | ||
save_method(file=buf, **self._save_args) | ||
with self._fs.open(save_path, mode="wb") as fs_file: | ||
fs_file.write(buf.getvalue()) | ||
self._invalidate_cache() | ||
# How the LazyPolarsDataset logic is currently written with | ||
# ACCEPTED_FILE_FORMATS and a check in the `__init__` method, | ||
# this else loop is never reached, hence we exclude it from coverage report | ||
# but leave it in for consistency between the Eager and Lazy classes | ||
else: # pragma: no cover | ||
raise DatasetError( | ||
f"Unable to retrieve 'polars.DataFrame.write_{self._file_format}' " | ||
"method, please ensure that your 'file_format' parameter has been " | ||
"defined correctly as per the Polars API" | ||
"https://pola-rs.github.io/polars/py-polars/html/reference/dataframe/index.html" | ||
) | ||
|
||
def _exists(self) -> bool: | ||
try: | ||
load_path = get_filepath_str(self._get_load_path(), self._protocol) | ||
except DatasetError: # pragma: no cover | ||
return False | ||
|
||
return self._fs.exists(load_path) | ||
|
||
def _release(self) -> None: | ||
super()._release() | ||
self._invalidate_cache() | ||
|
||
def _invalidate_cache(self) -> None: | ||
"""Invalidate underlying filesystem caches.""" | ||
filepath = get_filepath_str(self._filepath, self._protocol) | ||
self._fs.invalidate_cache(filepath) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.