-
Notifications
You must be signed in to change notification settings - Fork 90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Xarray NetCDFDataSet #165
Comments
@lucasjamar to my knowledge this hasn't been requested before - we'd absolutely appreciate a PR :) |
Always interesting to hear about communities I'm not familiar with and how they might use kedro 🙂 🎉 👍 FYI @noklam - am I right in thinking you're from geoscience originally? |
@AntonyMilneQB Close enough! I did Earth System Science. |
@lucasjamar Hi Lucas, just checking in to see if you need any help with this. |
Hi @noklam, Quite busy right now sorry. Im hoping to have a look at this over easter |
@lucasjamar No worries! Looking forward to see your PR😀. |
@lucasjamar Are you still interested in this? |
@noklam I started during the holiday break but didn't get very far. Im afraid i wont find the time to get any further with this... Terribly sorry |
Sorry this is as far as i could get: """``GenericDataSet`` loads/saves data from/to a NetCDF file using an underlying
filesystem (e.g.: local, S3, GCS). It uses xarray to handle the NetCDF file.
"""
import logging
from copy import deepcopy
from io import BytesIO
from pathlib import PurePosixPath
from typing import Any, Dict
import fsspec
import xarray as xr
from kedro.io.core import (
PROTOCOL_DELIMITER,
AbstractVersionedDataSet,
DataSetError,
Version,
get_filepath_str,
get_protocol_and_path,
)
logger = logging.getLogger(__name__)
class GenericDataSet(AbstractVersionedDataSet):
"""``GenericDataSet`` loads/saves data from/to a file using an underlying
filesystem (e.g.: local, S3, GCS). It uses xarray to handle the file.
Example adding a catalog entry with
`YAML API
<https://kedro.readthedocs.io/en/stable/data/\
data_catalog.html#using-the-data-catalog-with-the-yaml-api>`_:
.. code-block:: yaml
>>> january:
>>> type: xarray.GenericDataSet
>>> filepath: data/01_raw/weather/january.nc
>>> load_args:
>>> engine: netcdf4
>>> decode_times: True
>>> save_args:
>>> index: False
>>> date_format: "%Y-%m-%d %H:%M"
>>>
>>> motorbikes:
>>> type: xarray.GenericDataSet
>>> filepath: gcs://your_bucket/weater.ya
>>> credentials: dev_s3
>>>
Example using Python API:
::
>>> from kedro.extras.datasets.xarray import GenericDataSet
>>> import numpy as np
>>> import pandas as pd
>>> import xarray as xr
>>>
>>> data = xr.Dataset(
>>> {"foo": (("x", "y"), np.random.rand(4, 5))},
>>> coords={
>>> "x": [10, 20, 30, 40],
>>> "y": pd.date_range("2000-01-01", periods=5),
>>> "z": ("x", list("abcd")),
>>> },
>>> )
>>>
>>> # data_set = GenericDataSet(filepath="gcs://bucket/test.nc")
>>> data_set = GenericDataSet(filepath="test.nc")
>>> data_set.save(data)
>>> reloaded = data_set.load()
>>> assert data.equals(reloaded)
"""
DEFAULT_LOAD_ARGS = {} # type: Dict[str, Any]
DEFAULT_SAVE_ARGS = {"index": False} # type: Dict[str, Any]
# pylint: disable=too-many-arguments
def __init__(
self,
filepath: str,
load_args: Dict[str, Any] = None,
save_args: Dict[str, Any] = None,
version: Version = None,
credentials: Dict[str, Any] = None,
fs_args: Dict[str, Any] = None,
) -> None:
"""Creates a new instance of ``GenericDataSet`` pointing to a concrete NetCDF file
on a specific filesystem.
Args:
filepath: Filepath in POSIX format to a NetCDF file prefixed with a protocol like `s3://`.
If prefix is not provided, `file` protocol (local filesystem) will be used.
The prefix should be any protocol supported by ``fsspec``.
Note: `http(s)` doesn't support versioning.
load_args: Pandas options for loading NetCDF files.
Here you can find all available arguments:
https://xarray.pydata.org/xarray-docs/stable/generated/xarray.read_nc.html
All defaults are preserved.
save_args: Pandas options for saving NetCDF files.
Here you can find all available arguments:
https://xarray.pydata.org/xarray-docs/stable/generated/xarray.DataFrame.to_nc.html
All defaults are preserved, but "index", which is set to False.
version: If specified, should be an instance of
``kedro.io.core.Version``. If its ``load`` attribute is
None, the latest version will be loaded. If its ``save``
attribute is None, save version will be autogenerated.
credentials: Credentials required to get access to the underlying filesystem.
E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
fs_args: Extra arguments to pass into underlying filesystem class constructor
(e.g. `{"project": "my-project"}` for ``GCSFileSystem``).
"""
_fs_args = deepcopy(fs_args) or {}
_credentials = deepcopy(credentials) or {}
protocol, path = get_protocol_and_path(filepath, version)
if protocol == "file":
_fs_args.setdefault("auto_mkdir", True)
self._protocol = protocol
self._storage_options = {**_credentials, **_fs_args}
self._fs = fsspec.filesystem(self._protocol, **self._storage_options)
super().__init__(
filepath=PurePosixPath(path),
version=version,
exists_function=self._fs.exists,
glob_function=self._fs.glob,
)
# Handle default load and save arguments
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
self._load_args.update(load_args)
self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args is not None:
self._save_args.update(save_args)
if "storage_options" in self._save_args or "storage_options" in self._load_args:
logger.warning(
"Dropping `storage_options` for %s, "
"please specify them under `fs_args` or `credentials`.",
self._filepath,
)
self._save_args.pop("storage_options", None)
self._load_args.pop("storage_options", None)
def _describe(self) -> Dict[str, Any]:
return dict(
filepath=self._filepath,
protocol=self._protocol,
load_args=self._load_args,
save_args=self._save_args,
version=self._version,
)
def _load(self) -> xr.Dataset:
load_path = str(self._get_load_path())
if self._protocol == "file":
# file:// protocol seems to misbehave on Windows
# (<urlopen error file not on local host>),
# so we don't join that back to the filepath;
# storage_options also don't work with local paths
return xr.open_dataset(load_path, **self._load_args)
load_path = f"{self._protocol}{PROTOCOL_DELIMITER}{load_path}"
return xr.open_dataset(
load_path, storage_options=self._storage_options, **self._load_args
)
def _save(self, data: xr.DataArray) -> None:
save_path = get_filepath_str(self._get_save_path(), self._protocol)
buf = BytesIO()
data.to_nc(path_or_buf=buf, **self._save_args)
with self._fs.open(save_path, mode="wb") as fs_file:
fs_file.write(buf.getvalue())
self._invalidate_cache()
def _exists(self) -> bool:
try:
load_path = get_filepath_str(self._get_load_path(), self._protocol)
except DataSetError:
return False
return self._fs.exists(load_path)
def _release(self) -> None:
super()._release()
self._invalidate_cache()
def _invalidate_cache(self) -> None:
"""Invalidate underlying filesystem caches."""
filepath = get_filepath_str(self._filepath, self._protocol)
self._fs.invalidate_cache(filepath) |
@lucasjamar Hey thanks for making time for this! Do you want to make this a PR? It would be easier for me to drop comments on it. :) |
Xarray functionality in the Data Catalog would be a big deal for me. Is there an update on this? |
No, unfortunately not. Feel free to try building on @lucasjamar's start above and raise a PR for it though 🙂 |
At a glance making just NetCDF works is straight forward, but seems @lucasjamar is going with a GenericXarrayDataSet. I wonder if it makes sense to take the Here is a quick implementation, mostly just copy from above class NetCDFDataSet(AbstractVersionedDataSet):
"""``GenericDataSet`` loads/saves data from/to a file using an underlying
filesystem (e.g.: local, S3, GCS). It uses xarray to handle the file.
Example adding a catalog entry with
`YAML API
<https://kedro.readthedocs.io/en/stable/data/\
data_catalog.html#using-the-data-catalog-with-the-yaml-api>`_:
.. code-block:: yaml
>>> january:
>>> type: xarray.GenericDataSet
>>> filepath: data/01_raw/weather/january.nc
>>> load_args:
>>> engine: netcdf4
>>> decode_times: True
>>> save_args:
>>> index: False
>>> date_format: "%Y-%m-%d %H:%M"
>>>
>>> motorbikes:
>>> type: xarray.GenericDataSet
>>> filepath: gcs://your_bucket/weater.ya
>>> credentials: dev_s3
>>>
Example using Python API:
::
>>> from kedro.extras.datasets.xarray import GenericDataSet
>>> import numpy as np
>>> import pandas as pd
>>> import xarray as xr
>>>
>>> data = xr.Dataset(
>>> {"foo": (("x", "y"), np.random.rand(4, 5))},
>>> coords={
>>> "x": [10, 20, 30, 40],
>>> "y": pd.date_range("2000-01-01", periods=5),
>>> "z": ("x", list("abcd")),
>>> },
>>> )
>>>
>>> # data_set = GenericDataSet(filepath="gcs://bucket/test.nc")
>>> data_set = GenericDataSet(filepath="test.nc")
>>> data_set.save(data)
>>> reloaded = data_set.load()
>>> assert data.equals(reloaded)
"""
DEFAULT_LOAD_ARGS = {} # type: Dict[str, Any]
DEFAULT_SAVE_ARGS = {} # type: Dict[str, Any]
# pylint: disable=too-many-arguments
def __init__(
self,
filepath: str,
load_args: Dict[str, Any] = None,
save_args: Dict[str, Any] = None,
version: Version = None,
credentials: Dict[str, Any] = None,
fs_args: Dict[str, Any] = None,
) -> None:
"""Creates a new instance of ``GenericDataSet`` pointing to a concrete NetCDF file
on a specific filesystem.
Args:
filepath: Filepath in POSIX format to a NetCDF file prefixed with a protocol like `s3://`.
If prefix is not provided, `file` protocol (local filesystem) will be used.
The prefix should be any protocol supported by ``fsspec``.
Note: `http(s)` doesn't support versioning.
load_args: Pandas options for loading NetCDF files.
Here you can find all available arguments:
https://xarray.pydata.org/xarray-docs/stable/generated/xarray.read_nc.html
All defaults are preserved.
save_args: Pandas options for saving NetCDF files.
Here you can find all available arguments:
https://xarray.pydata.org/xarray-docs/stable/generated/xarray.DataFrame.to_nc.html
All defaults are preserved, but "index", which is set to False.
version: If specified, should be an instance of
``kedro.io.core.Version``. If its ``load`` attribute is
None, the latest version will be loaded. If its ``save``
attribute is None, save version will be autogenerated.
credentials: Credentials required to get access to the underlying filesystem.
E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
fs_args: Extra arguments to pass into underlying filesystem class constructor
(e.g. `{"project": "my-project"}` for ``GCSFileSystem``).
"""
_fs_args = deepcopy(fs_args) or {}
_fs_open_args_load = _fs_args.pop("open_args_load", {})
_fs_open_args_save = _fs_args.pop("open_args_save", {})
_credentials = deepcopy(credentials) or {}
protocol, path = get_protocol_and_path(filepath, version)
if protocol == "file":
_fs_args.setdefault("auto_mkdir", True)
self._protocol = protocol
self._storage_options = {**_credentials, **_fs_args}
self._fs = fsspec.filesystem(self._protocol, **self._storage_options)
super().__init__(
filepath=PurePosixPath(path),
version=version,
exists_function=self._fs.exists,
glob_function=self._fs.glob,
)
# Handle default load and save arguments
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
self._load_args.update(load_args)
self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args is not None:
self._save_args.update(save_args)
if "storage_options" in self._save_args or "storage_options" in self._load_args:
logger.warning(
"Dropping `storage_options` for %s, "
"please specify them under `fs_args` or `credentials`.",
self._filepath,
)
self._save_args.pop("storage_options", None)
self._load_args.pop("storage_options", None)
_fs_open_args_save.setdefault("mode", "wb")
self._fs_open_args_load = _fs_open_args_load
self._fs_open_args_save = _fs_open_args_save
def _describe(self) -> Dict[str, Any]:
return dict(
filepath=self._filepath,
protocol=self._protocol,
load_args=self._load_args,
save_args=self._save_args,
version=self._version,
)
def _load(self) -> xr.Dataset:
load_path = str(self._get_load_path())
if self._protocol == "file":
# file:// protocol seems to misbehave on Windows
# (<urlopen error file not on local host>),
# so we don't join that back to the filepath;
# storage_options also don't work with local paths
return xr.open_dataset(load_path, **self._load_args)
load_path = f"{self._protocol}{PROTOCOL_DELIMITER}{load_path}"
return xr.open_dataset(
load_path, storage_options=self._storage_options, **self._load_args
)
def _save(self, data: xr.DataArray) -> None:
save_path = get_filepath_str(self._get_save_path(), self._protocol)
with self._fs.open(save_path, **self._fs_open_args_save) as fs_file:
data.to_netcdf(fs_file, **self._save_args)
self._invalidate_cache()
def _exists(self) -> bool:
try:
load_path = get_filepath_str(self._get_load_path(), self._protocol)
except DataSetError:
return False
return self._fs.exists(load_path)
def _release(self) -> None:
super()._release()
self._invalidate_cache()
def _invalidate_cache(self) -> None:
"""Invalidate underlying filesystem caches."""
filepath = get_filepath_str(self._filepath, self._protocol)
self._fs.invalidate_cache(filepath) |
Would it make sense to have a |
I am actually not aware of the pandas.GenericDataSet😅 do people actually use that and do we keep both for backward compatiblitity reason? I think it is possible since the API is quite similar. Btw I saw the invalidate_cache method for some datasets are public but the majority are private, does that make sense if we just remove that and put it in the abstractClass? We have repeated that 71 times😅 |
Yeah,
Probably yes. The same could probably be said for |
Not sure if I like the idea of deprecating the dedicated one, but this is not urgent so I will think more about it. I added the |
Yeah the main reason why I pushed for The pandas API is also really in consistently, for file paths I had to rely on the assumption that the first positional argument is going to be the path argument since the library uses a different In the Spark world we're able to only provide a generic version of the dataset since it's much better designed, I think in situations like |
I really appreciate the responses. 🙏 Re: the "pandas approach" mentioned by @noklam, I think there are certainly a few xarray-supported file types that could benefit from built-in handling in kedro. NetCDF, Zarr, and GeoTIFF (via rioxarray) specifically. Maybe that supports moving away from a generic dataset, but I also appreciate the difficulty in supporting everything. In the selfish short term it looks like I can use the implementation provided by @noklam above in combination with these instructions to get done what I need to get done. I can test out these implementations and report back changes I see as necessary. |
Definitely that sounds like a good plan in the short term - you can easily use the custom dataset implementation without it being added to core kedro. Just copy and paste the above into a file and modify the dataset |
An update and a couple questions about partitioned datasets and lazy evaluation. The implementation discussed above works well. I've only tested it on local file systems. TL;DR: To not lose advantages of working with xarray we need to concatenate many .nc files into a big dataset and then operate on it as lazily as possible. I want to keep benefits of both xarray and Kedro.
More on this: This approach (example shown below) works, but I'm not sure if it is loading all this data into memory and ruining the lazy evaluation aspect. The only way I know to test this is to In this pipeline I would have a node defined in
With the corresponding function definition (which as shown here is a very simplified/broken example to highlight dictionary creation aspect) in
I've used this pattern in multiple places, and it works, but again it's not clear to me that it is the best way to go. Another example, when I start a processing pipeline on this downloaded data, I'll open the downloaded data files but concatenate them into a single, large (but not loaded into memory) xarray.DataSet. In doing so (as shown below) I am attempting to do what xr.open_mfdataset will do when given a directory full of
Where the
and where the
|
@jamespolly thank you for your very detailed write up - @deepyaman do you want to chime in with your thoughts? |
One issue I've run into with |
Talking to some folks working with satellite data, would be cool to have this 🛰️ |
Looking to open a PR on this in the next week or so. |
@riley-brady that's awesome! |
As a PyMC user I am excited by this news! |
Glad to see the excitement! I'm bundling it with a Zarr implementation, since those are commonly used together. We've got a nicely working implementation on our climate team for both, targeting an AWS platform. Let's start with stripping it down to a local-working implementation and then see how we can build it up from there. Zarr should work out of the box for remote read/write. NetCDF can't be read remotely natively (e.g. from an S3 bucket), so we pull it down to temp storage and then do the read. One could also do something like a |
I talked to @tgoelles today and he is interested to contribute a dataset for geotiff and NetCDF. I am excited to see more contributions from the scientific community🔥 |
@noklam I implemented geotiff for now but the build on github actions fails for windows du to the dependency on GDAL fork is here: https://github.com/tgoelles/kedro-plugins Collecting rasterio (from rioxarray>=0.9.0->kedro-datasets==1.7.0) Getting requirements to build wheel did not run successfully. [2 lines of output] Is there a way to support geotiff only for Ubuntu for now? I don't want to look into windows issues, and I know that GDAL makes installations complex |
I can help with GDAL on Windows 👍🏽 go ahead and open the PR! |
I'm also really looking forward to this PR! |
Hi folks! I put a decent stake in the ground here: https://github.com/kedro-org/kedro-plugins/pull/360/files. There's a few TODO's to deal with at the moment. I have some pressing work deadlines I need to focus on so I might not get back to this for a couple of weeks. But wanted to provide the initial snippet for folks to test out/provide feedback on instead of going silent here. I'd like to get this implemented with file syncing for load-from-remote, since it's most straight-forward. A future PR could work with kerchunk to allow direct loading from remote storage. This is a really nice toolkit, but requires management of a lot of JSON metadata files that are generated, and which sometimes can be quite slow to generate. It will take a little bit of tweaking to implement this nicely, since the first run would need to generate and cache/store all of the reference JSONs to make future loads much much faster. |
Hi everyone! The PR is fully implemented with testing and ready for review. #360 |
Of interest for people subscribed to this issue: https://guide.cloudnativegeo.org (source) |
I think this can be closed via #360! |
Closed in #360 indeed! 🚀 |
Description
Read and write netcdf file into Xarray.
https://xarray.pydata.org/en/stable/user-guide/io.html#netcdf
Context
Should attract the weather data science community to use kedro :)
Possible Implementation
Should be quite similar to pandas.CSVDataSet. I'll give this implementation a shot in my free time.
Has anyone ever implemented such a custom dataset already?
The text was updated successfully, but these errors were encountered: