Skip to content

Commit

Permalink
[kedro-datasets ] Add Polars.CSVDataSet (kedro-org#95)
Browse files Browse the repository at this point in the history
Signed-off-by: wmoreiraa <walber3@gmail.com>

Signed-off-by: Danny Farah <danny_farah@mckinsey.com>
  • Loading branch information
wmoreiraa authored and dannyrfar committed Mar 21, 2023
1 parent 5115607 commit 4b5da98
Show file tree
Hide file tree
Showing 7 changed files with 594 additions and 1 deletion.
15 changes: 14 additions & 1 deletion kedro-datasets/RELEASE.md
100755 → 100644
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@

# Upcoming Release:
# Upcoming Release 1.1.0:


## Major features and improvements:

* Added the following new datasets:

| Type | Description | Location |
| ------------------------------------ | -------------------------------------------------------------------------- | ----------------------------- |
| `polars.CSVDataSet` | A `CSVDataSet` backed by [polars](https://www.pola.rs/), a lighting fast dataframe package built entirely using Rust. | `kedro_datasets.polars` |

## Bug fixes and other changes


# Release 1.0.2:

Expand All @@ -13,6 +25,7 @@
## Bug fixes and other changes
* Fixed doc string formatting in `VideoDataSet` causing the documentation builds to fail.


# Release 1.0.0:

First official release of Kedro-Datasets.
Expand Down
8 changes: 8 additions & 0 deletions kedro-datasets/kedro_datasets/polars/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""``AbstractDataSet`` implementations that produce pandas DataFrames."""

__all__ = ["CSVDataSet"]

from contextlib import suppress

with suppress(ImportError):
from .csv_dataset import CSVDataSet
191 changes: 191 additions & 0 deletions kedro-datasets/kedro_datasets/polars/csv_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
"""``CSVDataSet`` loads/saves data from/to a CSV file using an underlying
filesystem (e.g.: local, S3, GCS). It uses polars to handle the CSV file.
"""
import logging
from copy import deepcopy
from io import BytesIO
from pathlib import PurePosixPath
from typing import Any, Dict

import fsspec
import polars as pl
from kedro.io.core import (
PROTOCOL_DELIMITER,
AbstractVersionedDataSet,
DataSetError,
Version,
get_filepath_str,
get_protocol_and_path,
)

logger = logging.getLogger(__name__)


class CSVDataSet(AbstractVersionedDataSet[pl.DataFrame, pl.DataFrame]):
"""``CSVDataSet`` loads/saves data from/to a CSV file using an underlying
filesystem (e.g.: local, S3, GCS). It uses polars to handle the CSV file.
Example adding a catalog entry with
`YAML API
<https://kedro.readthedocs.io/en/stable/data/\
data_catalog.html#use-the-data-catalog-with-the-yaml-api>`_:
.. code-block:: yaml
>>> cars:
>>> type: polars.CSVDataSet
>>> filepath: data/01_raw/company/cars.csv
>>> load_args:
>>> sep: ","
>>> parse_dates: False
>>> save_args:
>>> has_header: False
null_value: "somenullstring"
>>>
>>> motorbikes:
>>> type: polars.CSVDataSet
>>> filepath: s3://your_bucket/data/02_intermediate/company/motorbikes.csv
>>> credentials: dev_s3
Example using Python API:
::
>>> from kedro_datasets.polars import CSVDataSet
>>> import polars as pl
>>>
>>> data = pl.DataFrame({'col1': [1, 2], 'col2': [4, 5],
>>> 'col3': [5, 6]})
>>>
>>> data_set = CSVDataSet(filepath="test.csv")
>>> data_set.save(data)
>>> reloaded = data_set.load()
>>> assert data.frame_equal(reloaded)
"""

DEFAULT_LOAD_ARGS = {"rechunk": True} # type: Dict[str, Any]
DEFAULT_SAVE_ARGS = {} # type: Dict[str, Any]

# pylint: disable=too-many-arguments
def __init__(
self,
filepath: str,
load_args: Dict[str, Any] = None,
save_args: Dict[str, Any] = None,
version: Version = None,
credentials: Dict[str, Any] = None,
fs_args: Dict[str, Any] = None,
) -> None:
"""Creates a new instance of ``CSVDataSet`` pointing to a concrete CSV file
on a specific filesystem.
Args:
filepath: Filepath in POSIX format to a CSV file prefixed with a protocol
`s3://`.
If prefix is not provided, `file` protocol (local filesystem)
will be used.
The prefix should be any protocol supported by ``fsspec``.
Note: `http(s)` doesn't support versioning.
load_args: Polars options for loading CSV files.
Here you can find all available arguments:
https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.read_csv.html#polars.read_csv
All defaults are preserved, but we explicity use `rechunk=True` for `seaborn`
compability.
save_args: Polars options for saving CSV files.
Here you can find all available arguments:
https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.DataFrame.write_csv.html
All defaults are preserved.
version: If specified, should be an instance of
``kedro.io.core.Version``. If its ``load`` attribute is
None, the latest version will be loaded. If its ``save``
attribute is None, save version will be autogenerated.
credentials: Credentials required to get access to the underlying filesystem.
E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
fs_args: Extra arguments to pass into underlying filesystem class constructor
(e.g. `{"project": "my-project"}` for ``GCSFileSystem``).
"""
_fs_args = deepcopy(fs_args) or {}
_credentials = deepcopy(credentials) or {}

protocol, path = get_protocol_and_path(filepath, version)
if protocol == "file":
_fs_args.setdefault("auto_mkdir", True)

self._protocol = protocol
self._storage_options = {**_credentials, **_fs_args}
self._fs = fsspec.filesystem(self._protocol, **self._storage_options)

super().__init__(
filepath=PurePosixPath(path),
version=version,
exists_function=self._fs.exists,
glob_function=self._fs.glob,
)

# Handle default load and save arguments
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
self._load_args.update(load_args)
self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args is not None:
self._save_args.update(save_args)

if "storage_options" in self._save_args or "storage_options" in self._load_args:
logger.warning(
"Dropping 'storage_options' for %s, "
"please specify them under 'fs_args' or 'credentials'.",
self._filepath,
)
self._save_args.pop("storage_options", None)
self._load_args.pop("storage_options", None)

def _describe(self) -> Dict[str, Any]:
return {
"filepath": self._filepath,
"protocol": self._protocol,
"load_args": self._load_args,
"save_args": self._save_args,
"version": self._version,
}

def _load(self) -> pl.DataFrame:
load_path = str(self._get_load_path())
if self._protocol == "file":
# file:// protocol seems to misbehave on Windows
# (<urlopen error file not on local host>),
# so we don't join that back to the filepath;
# storage_options also don't work with local paths
return pl.read_csv(load_path, **self._load_args)

load_path = f"{self._protocol}{PROTOCOL_DELIMITER}{load_path}"
return pl.read_csv(
load_path, storage_options=self._storage_options, **self._load_args
)

def _save(self, data: pl.DataFrame) -> None:
save_path = get_filepath_str(self._get_save_path(), self._protocol)

buf = BytesIO()
data.write_csv(file=buf, **self._save_args)

with self._fs.open(save_path, mode="wb") as fs_file:
fs_file.write(buf.getvalue())

self._invalidate_cache()

def _exists(self) -> bool:
try:
load_path = get_filepath_str(self._get_load_path(), self._protocol)
except DataSetError:
return False

return self._fs.exists(load_path)

def _release(self) -> None:
super()._release()
self._invalidate_cache()

def _invalidate_cache(self) -> None:
"""Invalidate underlying filesystem caches."""
filepath = get_filepath_str(self._filepath, self._protocol)
self._fs.invalidate_cache(filepath)
4 changes: 4 additions & 0 deletions kedro-datasets/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
SPARK = "pyspark>=2.2, <4.0"
HDFS = "hdfs>=2.5.8, <3.0"
S3FS = "s3fs>=0.3.0, <0.5"
POLARS = "polars~=0.15.16"

with open("requirements.txt", "r", encoding="utf-8") as f:
install_requires = [x.strip() for x in f if x.strip()]
Expand Down Expand Up @@ -63,6 +64,7 @@ def _collect_requirements(requires):
"pandas.GenericDataSet": [PANDAS],
}
pillow_require = {"pillow.ImageDataSet": ["Pillow~=9.0"]}
polars_require = {"polars.CSVDataSet": [POLARS],}
video_require = {
"video.VideoDataSet": ["opencv-python~=4.5.5.64"]
}
Expand Down Expand Up @@ -109,6 +111,7 @@ def _collect_requirements(requires):
"networkx": _collect_requirements(networkx_require),
"pandas": _collect_requirements(pandas_require),
"pillow": _collect_requirements(pillow_require),
"polars": _collect_requirements(polars_require),
"video": _collect_requirements(video_require),
"plotly": _collect_requirements(plotly_require),
"redis": _collect_requirements(redis_require),
Expand All @@ -126,6 +129,7 @@ def _collect_requirements(requires):
**networkx_require,
**pandas_require,
**pillow_require,
**polars_require,
**video_require,
**plotly_require,
**spark_require,
Expand Down
1 change: 1 addition & 0 deletions kedro-datasets/test_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pandas-gbq>=0.12.0, <0.18.0
pandas~=1.3 # 1.3 for read_xml/to_xml
Pillow~=9.0
plotly>=4.8.0, <6.0
polars~=0.15.13
pre-commit>=2.9.2, <3.0 # The hook `mypy` requires pre-commit version 2.9.2.
psutil==5.8.0
pyarrow>=1.0, <7.0
Expand Down
Empty file.
Loading

0 comments on commit 4b5da98

Please sign in to comment.