Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kedro-datasets ] Add Polars.CSVDataSet #95

Merged
merged 13 commits into from
Feb 9, 2023
15 changes: 14 additions & 1 deletion kedro-datasets/RELEASE.md
100755 → 100644
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@

# Upcoming Release:
# Upcoming Release 1.1.0:


## Major features and improvements:

* Added the following new datasets:

| Type | Description | Location |
| ------------------------------------ | -------------------------------------------------------------------------- | ----------------------------- |
| `polars.CSVDataSet` | A `CSVDataSet` backed by [polars](https://www.pola.rs/), a lighting fast dataframe package built entirely using Rust. | `kedro_datasets.polars` |
wmoreiraa marked this conversation as resolved.
Show resolved Hide resolved

## Bug fixes and other changes


# Release 1.0.2:

Expand All @@ -13,6 +25,7 @@
## Bug fixes and other changes
* Fixed doc string formatting in `VideoDataSet` causing the documentation builds to fail.


# Release 1.0.0:

First official release of Kedro-Datasets.
Expand Down
8 changes: 8 additions & 0 deletions kedro-datasets/kedro_datasets/polars/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""``AbstractDataSet`` implementations that produce pandas DataFrames."""

__all__ = ["CSVDataSet"]

from contextlib import suppress

with suppress(ImportError):
from .csv_dataset import CSVDataSet
191 changes: 191 additions & 0 deletions kedro-datasets/kedro_datasets/polars/csv_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
"""``CSVDataSet`` loads/saves data from/to a CSV file using an underlying
filesystem (e.g.: local, S3, GCS). It uses polars to handle the CSV file.
"""
import logging
from copy import deepcopy
from io import BytesIO
from pathlib import PurePosixPath
from typing import Any, Dict

import fsspec
import polars as pl
from kedro.io.core import (
PROTOCOL_DELIMITER,
AbstractVersionedDataSet,
DataSetError,
Version,
get_filepath_str,
get_protocol_and_path,
)

logger = logging.getLogger(__name__)


class CSVDataSet(AbstractVersionedDataSet[pl.DataFrame, pl.DataFrame]):
"""``CSVDataSet`` loads/saves data from/to a CSV file using an underlying
filesystem (e.g.: local, S3, GCS). It uses polars to handle the CSV file.

Example adding a catalog entry with
`YAML API
<https://kedro.readthedocs.io/en/stable/data/\
data_catalog.html#use-the-data-catalog-with-the-yaml-api>`_:

.. code-block:: yaml

>>> cars:
>>> type: polars.CSVDataSet
>>> filepath: data/01_raw/company/cars.csv
>>> load_args:
>>> sep: ","
>>> parse_dates: False
>>> save_args:
>>> has_header: False
null_value: "somenullstring"
>>>
>>> motorbikes:
>>> type: polars.CSVDataSet
>>> filepath: s3://your_bucket/data/02_intermediate/company/motorbikes.csv
>>> credentials: dev_s3

Example using Python API:
::

>>> from kedro_datasets.polars import CSVDataSet
>>> import polars as pl
>>>
>>> data = pl.DataFrame({'col1': [1, 2], 'col2': [4, 5],
>>> 'col3': [5, 6]})
>>>
>>> data_set = CSVDataSet(filepath="test.csv")
>>> data_set.save(data)
>>> reloaded = data_set.load()
>>> assert data.frame_equal(reloaded)

"""

DEFAULT_LOAD_ARGS = {"rechunk": True} # type: Dict[str, Any]
DEFAULT_SAVE_ARGS = {} # type: Dict[str, Any]

# pylint: disable=too-many-arguments
def __init__(
self,
filepath: str,
load_args: Dict[str, Any] = None,
save_args: Dict[str, Any] = None,
version: Version = None,
credentials: Dict[str, Any] = None,
fs_args: Dict[str, Any] = None,
) -> None:
"""Creates a new instance of ``CSVDataSet`` pointing to a concrete CSV file
on a specific filesystem.

Args:
filepath: Filepath in POSIX format to a CSV file prefixed with a protocol
`s3://`.
If prefix is not provided, `file` protocol (local filesystem)
will be used.
The prefix should be any protocol supported by ``fsspec``.
Note: `http(s)` doesn't support versioning.
load_args: Polars options for loading CSV files.
Here you can find all available arguments:
https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.read_csv.html#polars.read_csv
All defaults are preserved, but we explicity use `rechunk=True` for `seaborn`
compability.
save_args: Polars options for saving CSV files.
Here you can find all available arguments:
https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.DataFrame.write_csv.html
All defaults are preserved.
version: If specified, should be an instance of
``kedro.io.core.Version``. If its ``load`` attribute is
None, the latest version will be loaded. If its ``save``
attribute is None, save version will be autogenerated.
credentials: Credentials required to get access to the underlying filesystem.
E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
fs_args: Extra arguments to pass into underlying filesystem class constructor
(e.g. `{"project": "my-project"}` for ``GCSFileSystem``).
"""
_fs_args = deepcopy(fs_args) or {}
_credentials = deepcopy(credentials) or {}

protocol, path = get_protocol_and_path(filepath, version)
if protocol == "file":
_fs_args.setdefault("auto_mkdir", True)

self._protocol = protocol
self._storage_options = {**_credentials, **_fs_args}
self._fs = fsspec.filesystem(self._protocol, **self._storage_options)

super().__init__(
filepath=PurePosixPath(path),
version=version,
exists_function=self._fs.exists,
glob_function=self._fs.glob,
)

# Handle default load and save arguments
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
self._load_args.update(load_args)
self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args is not None:
self._save_args.update(save_args)

if "storage_options" in self._save_args or "storage_options" in self._load_args:
logger.warning(
"Dropping 'storage_options' for %s, "
"please specify them under 'fs_args' or 'credentials'.",
self._filepath,
)
self._save_args.pop("storage_options", None)
self._load_args.pop("storage_options", None)

def _describe(self) -> Dict[str, Any]:
return {
"filepath": self._filepath,
"protocol": self._protocol,
"load_args": self._load_args,
"save_args": self._save_args,
"version": self._version,
}

def _load(self) -> pl.DataFrame:
load_path = str(self._get_load_path())
if self._protocol == "file":
# file:// protocol seems to misbehave on Windows
# (<urlopen error file not on local host>),
# so we don't join that back to the filepath;
# storage_options also don't work with local paths
return pl.read_csv(load_path, **self._load_args)

load_path = f"{self._protocol}{PROTOCOL_DELIMITER}{load_path}"
return pl.read_csv(
load_path, storage_options=self._storage_options, **self._load_args
)

def _save(self, data: pl.DataFrame) -> None:
save_path = get_filepath_str(self._get_save_path(), self._protocol)

buf = BytesIO()
data.write_csv(file=buf, **self._save_args)

with self._fs.open(save_path, mode="wb") as fs_file:
fs_file.write(buf.getvalue())

self._invalidate_cache()

def _exists(self) -> bool:
try:
load_path = get_filepath_str(self._get_load_path(), self._protocol)
except DataSetError:
return False

return self._fs.exists(load_path)

def _release(self) -> None:
super()._release()
self._invalidate_cache()

def _invalidate_cache(self) -> None:
"""Invalidate underlying filesystem caches."""
filepath = get_filepath_str(self._filepath, self._protocol)
self._fs.invalidate_cache(filepath)
4 changes: 4 additions & 0 deletions kedro-datasets/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
SPARK = "pyspark>=2.2, <4.0"
HDFS = "hdfs>=2.5.8, <3.0"
S3FS = "s3fs>=0.3.0, <0.5"
POLARS = "polars~=0.15.16"

with open("requirements.txt", "r", encoding="utf-8") as f:
install_requires = [x.strip() for x in f if x.strip()]
Expand Down Expand Up @@ -62,6 +63,7 @@ def _collect_requirements(requires):
"pandas.GenericDataSet": [PANDAS],
}
pillow_require = {"pillow.ImageDataSet": ["Pillow~=9.0"]}
polars_require = {"polars.CSVDataSet": [POLARS],}
video_require = {
"video.VideoDataSet": ["opencv-python~=4.5.5.64"]
}
Expand Down Expand Up @@ -107,6 +109,7 @@ def _collect_requirements(requires):
"networkx": _collect_requirements(networkx_require),
"pandas": _collect_requirements(pandas_require),
"pillow": _collect_requirements(pillow_require),
"polars": _collect_requirements(polars_require),
"video": _collect_requirements(video_require),
"plotly": _collect_requirements(plotly_require),
"redis": _collect_requirements(redis_require),
Expand All @@ -123,6 +126,7 @@ def _collect_requirements(requires):
**networkx_require,
**pandas_require,
**pillow_require,
**polars_require,
**video_require,
**plotly_require,
**spark_require,
Expand Down
1 change: 1 addition & 0 deletions kedro-datasets/test_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pandas-gbq>=0.12.0, <0.18.0
pandas~=1.3 # 1.3 for read_xml/to_xml
Pillow~=9.0
plotly>=4.8.0, <6.0
polars~=0.15.13
pre-commit>=2.9.2, <3.0 # The hook `mypy` requires pre-commit version 2.9.2.
psutil==5.8.0
pyarrow>=1.0, <7.0
Expand Down
Empty file.
Loading