diff --git a/ci/requirements-py36.yml b/ci/requirements-py36.yml index fd63fe26130..fc272984237 100644 --- a/ci/requirements-py36.yml +++ b/ci/requirements-py36.yml @@ -21,8 +21,10 @@ dependencies: - bottleneck - zarr - pseudonetcdf>=3.0.1 + - eccodes - pip: - coveralls - pytest-cov - pydap - lxml + - cfgrib diff --git a/doc/installing.rst b/doc/installing.rst index eb74eb7162b..64751eea637 100644 --- a/doc/installing.rst +++ b/doc/installing.rst @@ -34,7 +34,9 @@ For netCDF and IO - `rasterio `__: for reading GeoTiffs and other gridded raster datasets. - `iris `__: for conversion to and from iris' - Cube objects. + Cube objects +- `cfgrib `__: for reading GRIB files via the + *ECMWF ecCodes* library. For accelerating xarray ~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/doc/io.rst b/doc/io.rst index 093ee773e15..e841e665308 100644 --- a/doc/io.rst +++ b/doc/io.rst @@ -635,6 +635,28 @@ For example: Not all native zarr compression and filtering options have been tested with xarray. +.. _io.cfgrib: + +GRIB format via cfgrib +---------------------- + +xarray supports reading GRIB files via ECMWF cfgrib_ python driver and ecCodes_ +C-library, if they are installed. To open a GRIB file supply ``engine='cfgrib'`` +to :py:func:`~xarray.open_dataset`: + +.. ipython:: + :verbatim: + + In [1]: ds_grib = xr.open_dataset('example.grib', engine='cfgrib') + +We recommend installing ecCodes via conda:: + + conda install -c conda-forge eccodes + pip install cfgrib + +.. _cfgrib: https://github.com/ecmwf/cfgrib +.. _ecCodes: https://confluence.ecmwf.int/display/ECC/ecCodes+Home + .. _io.pynio: Formats supported by PyNIO diff --git a/doc/whats-new.rst b/doc/whats-new.rst index de7e6c8f6ff..61da801badb 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -72,7 +72,11 @@ Enhancements :py:meth:`~xarray.DataArray.interp`, and :py:meth:`~xarray.Dataset.interp`. By `Spencer Clark `_. - +- Added a new backend for the GRIB file format based on ECMWF *cfgrib* + python driver and *ecCodes* C-library. (:issue:`2475`) + By `Alessandro Amici `_, + sponsored by `ECMWF `_. + Bug fixes ~~~~~~~~~ diff --git a/xarray/backends/__init__.py b/xarray/backends/__init__.py index a2f0d79a6d1..9b9e04d9346 100644 --- a/xarray/backends/__init__.py +++ b/xarray/backends/__init__.py @@ -5,6 +5,7 @@ """ from .common import AbstractDataStore from .file_manager import FileManager, CachingFileManager, DummyFileManager +from .cfgrib_ import CfGribDataStore from .memory import InMemoryDataStore from .netCDF4_ import NetCDF4DataStore from .pydap_ import PydapDataStore @@ -18,6 +19,7 @@ 'AbstractDataStore', 'FileManager', 'CachingFileManager', + 'CfGribDataStore', 'DummyFileManager', 'InMemoryDataStore', 'NetCDF4DataStore', diff --git a/xarray/backends/api.py b/xarray/backends/api.py index 65112527045..3fb7338e171 100644 --- a/xarray/backends/api.py +++ b/xarray/backends/api.py @@ -162,7 +162,8 @@ def open_dataset(filename_or_obj, group=None, decode_cf=True, decode_coords : bool, optional If True, decode the 'coordinates' attribute to identify coordinates in the resulting dataset. - engine : {'netcdf4', 'scipy', 'pydap', 'h5netcdf', 'pynio', 'pseudonetcdf'}, optional + engine : {'netcdf4', 'scipy', 'pydap', 'h5netcdf', 'pynio', 'cfgrib', + 'pseudonetcdf'}, optional Engine to use when reading files. If not provided, the default engine is chosen based on available dependencies, with a preference for 'netcdf4'. @@ -296,6 +297,9 @@ def maybe_decode_store(store, lock=False): elif engine == 'pseudonetcdf': store = backends.PseudoNetCDFDataStore.open( filename_or_obj, lock=lock, **backend_kwargs) + elif engine == 'cfgrib': + store = backends.CfGribDataStore( + filename_or_obj, lock=lock, **backend_kwargs) else: raise ValueError('unrecognized engine for open_dataset: %r' % engine) @@ -356,7 +360,8 @@ def open_dataarray(filename_or_obj, group=None, decode_cf=True, decode_coords : bool, optional If True, decode the 'coordinates' attribute to identify coordinates in the resulting dataset. - engine : {'netcdf4', 'scipy', 'pydap', 'h5netcdf', 'pynio'}, optional + engine : {'netcdf4', 'scipy', 'pydap', 'h5netcdf', 'pynio', 'cfgrib'}, + optional Engine to use when reading files. If not provided, the default engine is chosen based on available dependencies, with a preference for 'netcdf4'. @@ -486,7 +491,8 @@ def open_mfdataset(paths, chunks=None, concat_dim=_CONCAT_DIM_DEFAULT, of all non-null values. preprocess : callable, optional If provided, call this function on each dataset prior to concatenation. - engine : {'netcdf4', 'scipy', 'pydap', 'h5netcdf', 'pynio'}, optional + engine : {'netcdf4', 'scipy', 'pydap', 'h5netcdf', 'pynio', 'cfgrib'}, + optional Engine to use when reading files. If not provided, the default engine is chosen based on available dependencies, with a preference for 'netcdf4'. diff --git a/xarray/backends/cfgrib_.py b/xarray/backends/cfgrib_.py new file mode 100644 index 00000000000..c0a7c025606 --- /dev/null +++ b/xarray/backends/cfgrib_.py @@ -0,0 +1,78 @@ +from __future__ import absolute_import, division, print_function + +import numpy as np + +from .. import Variable +from ..core import indexing +from ..core.utils import Frozen, FrozenOrderedDict +from .common import AbstractDataStore, BackendArray +from .locks import ensure_lock, SerializableLock + +# FIXME: Add a dedicated lock, even if ecCodes is supposed to be thread-safe +# in most circumstances. See: +# https://confluence.ecmwf.int/display/ECC/Frequently+Asked+Questions +ECCODES_LOCK = SerializableLock() + + +class CfGribArrayWrapper(BackendArray): + def __init__(self, datastore, array): + self.datastore = datastore + self.shape = array.shape + self.dtype = array.dtype + self.array = array + + def __getitem__(self, key): + return indexing.explicit_indexing_adapter( + key, self.shape, indexing.IndexingSupport.OUTER, self._getitem) + + def _getitem(self, key): + with self.datastore.lock: + return self.array[key] + + +class CfGribDataStore(AbstractDataStore): + """ + Implements the ``xr.AbstractDataStore`` read-only API for a GRIB file. + """ + def __init__(self, filename, lock=None, **backend_kwargs): + import cfgrib + if lock is None: + lock = ECCODES_LOCK + self.lock = ensure_lock(lock) + + # NOTE: filter_by_keys is a dict, but CachingFileManager only accepts + # hashable types. + if 'filter_by_keys' in backend_kwargs: + filter_by_keys_items = backend_kwargs['filter_by_keys'].items() + backend_kwargs['filter_by_keys'] = tuple(filter_by_keys_items) + + self.ds = cfgrib.open_file(filename, mode='r', **backend_kwargs) + + def open_store_variable(self, name, var): + if isinstance(var.data, np.ndarray): + data = var.data + else: + wrapped_array = CfGribArrayWrapper(self, var.data) + data = indexing.LazilyOuterIndexedArray(wrapped_array) + + encoding = self.ds.encoding.copy() + encoding['original_shape'] = var.data.shape + + return Variable(var.dimensions, data, var.attributes, encoding) + + def get_variables(self): + return FrozenOrderedDict((k, self.open_store_variable(k, v)) + for k, v in self.ds.variables.items()) + + def get_attrs(self): + return Frozen(self.ds.attributes) + + def get_dimensions(self): + return Frozen(self.ds.dimensions) + + def get_encoding(self): + dims = self.get_dimensions() + encoding = { + 'unlimited_dims': {k for k, v in dims.items() if v is None}, + } + return encoding diff --git a/xarray/tests/__init__.py b/xarray/tests/__init__.py index 5f724dd6713..56ecfa30c4d 100644 --- a/xarray/tests/__init__.py +++ b/xarray/tests/__init__.py @@ -77,6 +77,7 @@ def LooseVersion(vstring): has_zarr, requires_zarr = _importorskip('zarr', minversion='2.2') has_np113, requires_np113 = _importorskip('numpy', minversion='1.13.0') has_iris, requires_iris = _importorskip('iris') +has_cfgrib, requires_cfgrib = _importorskip('cfgrib') # some special cases has_scipy_or_netCDF4 = has_scipy or has_netCDF4 diff --git a/xarray/tests/data/example.grib b/xarray/tests/data/example.grib new file mode 100644 index 00000000000..596a54d98a0 Binary files /dev/null and b/xarray/tests/data/example.grib differ diff --git a/xarray/tests/test_backends.py b/xarray/tests/test_backends.py index 75aaba718c8..a274b41c424 100644 --- a/xarray/tests/test_backends.py +++ b/xarray/tests/test_backends.py @@ -33,7 +33,7 @@ has_dask, has_netCDF4, has_scipy, network, raises_regex, requires_cftime, requires_dask, requires_h5netcdf, requires_netCDF4, requires_pathlib, requires_pseudonetcdf, requires_pydap, requires_pynio, requires_rasterio, - requires_scipy, requires_scipy_or_netCDF4, requires_zarr) + requires_scipy, requires_scipy_or_netCDF4, requires_zarr, requires_cfgrib) from .test_dataset import create_test_data try: @@ -2463,6 +2463,28 @@ def test_weakrefs(self): assert_identical(actual, expected) +@requires_cfgrib +class TestCfGrib(object): + + def test_read(self): + expected = {'number': 2, 'time': 3, 'air_pressure': 2, 'latitude': 3, + 'longitude': 4} + with open_example_dataset('example.grib', engine='cfgrib') as ds: + assert ds.dims == expected + assert list(ds.data_vars) == ['z', 't'] + assert ds['z'].min() == 12660. + + def test_read_filter_by_keys(self): + kwargs = {'filter_by_keys': {'shortName': 't'}} + expected = {'number': 2, 'time': 3, 'air_pressure': 2, 'latitude': 3, + 'longitude': 4} + with open_example_dataset('example.grib', engine='cfgrib', + backend_kwargs=kwargs) as ds: + assert ds.dims == expected + assert list(ds.data_vars) == ['t'] + assert ds['t'].min() == 231. + + @requires_pseudonetcdf @pytest.mark.filterwarnings('ignore:IOAPI_ISPH is assumed to be 6370000') class TestPseudoNetCDFFormat(object): diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index 7c77a62d3c9..1837a0fe4ef 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -20,12 +20,13 @@ import xarray as xr from xarray.backends.locks import HDF5_LOCK, CombinedLock from xarray.tests.test_backends import (ON_WINDOWS, create_tmp_file, - create_tmp_geotiff) + create_tmp_geotiff, + open_example_dataset) from xarray.tests.test_dataset import create_test_data from . import ( assert_allclose, has_h5netcdf, has_netCDF4, requires_rasterio, has_scipy, - requires_zarr, raises_regex) + requires_zarr, requires_cfgrib, raises_regex) # this is to stop isort throwing errors. May have been easier to just use # `isort:skip` in retrospect @@ -142,6 +143,20 @@ def test_dask_distributed_rasterio_integration_test(loop): assert_allclose(actual, expected) +@requires_cfgrib +def test_dask_distributed_cfgrib_integration_test(loop): + with cluster() as (s, [a, b]): + with Client(s['address'], loop=loop) as c: + with open_example_dataset('example.grib', + engine='cfgrib', + chunks={'time': 1}) as ds: + with open_example_dataset('example.grib', + engine='cfgrib') as expected: + assert isinstance(ds['t'].data, da.Array) + actual = ds.compute() + assert_allclose(actual, expected) + + @pytest.mark.skipif(distributed.__version__ <= '1.19.3', reason='Need recent distributed version to clean up get') @gen_cluster(client=True, timeout=None)