Skip to content

Commit

Permalink
Appending to zarr store (#2706)
Browse files Browse the repository at this point in the history
* Initial version of appending to zarr store

* Added docs

* Resolve PEP8 incompliances

* Added write and append test for mode 'a'

* Merged repaired master

* Resolved pep8 issue

* Put target store encoding in appended variable

* Rewrite test with appending along time dimension

* Add chunk_size parameter for rechunking appended coordinate

* Add chunk_dim test

* Add type check and tests for it.

In append mode storing any datatype apart from number
subtypes and fixed size strings raises and error.

* Add documentation

* Add test for compute=False and commented it out

* Remove python 3.7 string formatting

* Fix PEP8 incompliance

* Add missing whitespaces

* allowed for compute=False when appending to a zarr store

* Fixed empty array data error

When using create_append_test_data we used np.arange(...), which was incidently also
the default value of the zarr array when fill_value is set to None. So appending to the data
with compute=False and then expecting an error when asserting the source and target to be the same
failed the tests. Using random data passes the tests

* flake8 fixes

* removed chunk_dim argument to to_zarr function

* implemented requested changes

* Update xarray/backends/api.py

Co-Authored-By: Stephan Hoyer <shoyer@gmail.com>

* added contributors and example of using append to zarr

* fixed docs fail

* fixed docs

* removed unnecessary condition

* attempt at clean string encoding and variable length strings

* implemented suggestions

* * append_dim does not need to be specified if creating a new array with Dataset.to_zarr(store, mode='a')
* cleand up to_zarr append mode tests

* raise ValueError when append_dim is not a valid dimension

* flake8 fix

* removed unused comment

* * raise error when appending with encoding provided for existing variable
* add test for encoding consistency when appending
* implemented: #2706 (comment)
* refactored tests
  • Loading branch information
jendrikjoe authored and shoyer committed Jun 29, 2019
1 parent d30635c commit 18f35da
Show file tree
Hide file tree
Showing 8 changed files with 394 additions and 56 deletions.
23 changes: 23 additions & 0 deletions doc/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,29 @@ store is already present at that path, an error will be raised, preventing it
from being overwritten. To override this behavior and overwrite an existing
store, add ``mode='w'`` when invoking ``to_zarr``.

It is also possible to append to an existing store. For that, add ``mode='a'``
and set ``append_dim`` to the name of the dimension along which to append.

.. ipython:: python
:suppress:
! rm -rf path/to/directory.zarr
.. ipython:: python
ds1 = xr.Dataset({'foo': (('x', 'y', 't'), np.random.rand(4, 5, 2))},
coords={'x': [10, 20, 30, 40],
'y': [1,2,3,4,5],
't': pd.date_range('2001-01-01', periods=2)})
ds1.to_zarr('path/to/directory.zarr')
ds2 = xr.Dataset({'foo': (('x', 'y', 't'), np.random.rand(4, 5, 2))},
coords={'x': [10, 20, 30, 40],
'y': [1,2,3,4,5],
't': pd.date_range('2001-01-03', periods=2)})
ds2.to_zarr('path/to/directory.zarr', mode='a', append_dim='t')
To store variable length strings use ``dtype=object``.

To read back a zarr dataset that has been created this way, we use the
:py:func:`~xarray.open_zarr` method:

Expand Down
5 changes: 5 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,11 @@ Other enhancements
report showing what exactly differs between the two objects (dimensions /
coordinates / variables / attributes) (:issue:`1507`).
By `Benoit Bovy <https://github.com/benbovy>`_.
- Added append capability to the zarr store.
By `Jendrik Jördening <https://github.com/jendrikjoe>`_,
`David Brochart <https://github.com/davidbrochart>`_,
`Ryan Abernathey <https://github.com/rabernat>`_ and
`Shikhar Goenka<https://github.com/shikharsg>`_.
- Resampling of standard and non-standard calendars indexed by
:py:class:`~xarray.CFTimeIndex` is now possible. (:issue:`2191`).
By `Jwen Fai Low <https://github.com/jwenfai>`_ and
Expand Down
57 changes: 54 additions & 3 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,21 @@
from io import BytesIO
from numbers import Number
from pathlib import Path
import re

import numpy as np
import pandas as pd

from .. import Dataset, DataArray, backends, conventions
from .. import Dataset, DataArray, backends, conventions, coding
from ..core import indexing
from .. import auto_combine
from ..core.combine import (combine_by_coords, _nested_combine,
_infer_concat_order_from_positions)
from ..core.utils import close_on_error, is_grib_path, is_remote_uri
from ..core.variable import Variable
from .common import ArrayWriter
from .locks import _get_scheduler
from ..coding.variables import safe_setitem, unpack_for_encoding

DATAARRAY_NAME = '__xarray_dataarray_name__'
DATAARRAY_VARIABLE = '__xarray_dataarray_variable__'
Expand Down Expand Up @@ -1024,8 +1028,48 @@ def save_mfdataset(datasets, paths, mode='w', format=None, groups=None,
for w, s in zip(writes, stores)])


def _validate_datatypes_for_zarr_append(dataset):
"""DataArray.name and Dataset keys must be a string or None"""
def check_dtype(var):
if (not np.issubdtype(var.dtype, np.number)
and not coding.strings.is_unicode_dtype(var.dtype)
and not var.dtype == object):
# and not re.match('^bytes[1-9]+$', var.dtype.name)):
raise ValueError('Invalid dtype for data variable: {} '
'dtype must be a subtype of number, '
'a fixed sized string, a fixed size '
'unicode string or an object'.format(var))
for k in dataset.data_vars.values():
check_dtype(k)


def _validate_append_dim_and_encoding(ds_to_append, store, append_dim,
encoding, **open_kwargs):
try:
ds = backends.zarr.open_zarr(store, **open_kwargs)
except ValueError: # store empty
return
if append_dim:
if append_dim not in ds.dims:
raise ValueError(
"{} not a valid dimension in the Dataset".format(append_dim)
)
for data_var in ds_to_append:
if data_var in ds:
if append_dim is None:
raise ValueError(
"variable '{}' already exists, but append_dim "
"was not set".format(data_var)
)
if data_var in encoding.keys():
raise ValueError(
"variable '{}' already exists, but encoding was"
"provided".format(data_var)
)


def to_zarr(dataset, store=None, mode='w-', synchronizer=None, group=None,
encoding=None, compute=True, consolidated=False):
encoding=None, compute=True, consolidated=False, append_dim=None):
"""This function creates an appropriate datastore for writing a dataset to
a zarr ztore
Expand All @@ -1040,11 +1084,18 @@ def to_zarr(dataset, store=None, mode='w-', synchronizer=None, group=None,
_validate_dataset_names(dataset)
_validate_attrs(dataset)

if mode == "a":
_validate_datatypes_for_zarr_append(dataset)
_validate_append_dim_and_encoding(dataset, store, append_dim,
group=group,
consolidated=consolidated,
encoding=encoding)

zstore = backends.ZarrStore.open_group(store=store, mode=mode,
synchronizer=synchronizer,
group=group,
consolidate_on_close=consolidated)

zstore.append_dim = append_dim
writer = ArrayWriter()
# TODO: figure out how to properly handle unlimited_dims
dump_to_store(dataset, zstore, writer, encoding=encoding)
Expand Down
13 changes: 10 additions & 3 deletions xarray/backends/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,26 +158,33 @@ class ArrayWriter:
def __init__(self, lock=None):
self.sources = []
self.targets = []
self.regions = []
self.lock = lock

def add(self, source, target):
def add(self, source, target, region=None):
if isinstance(source, dask_array_type):
self.sources.append(source)
self.targets.append(target)
self.regions.append(region)
else:
target[...] = source
if region:
target[region] = source
else:
target[...] = source

def sync(self, compute=True):
if self.sources:
import dask.array as da
# TODO: consider wrapping targets with dask.delayed, if this makes
# for any discernable difference in perforance, e.g.,
# targets = [dask.delayed(t) for t in self.targets]

delayed_store = da.store(self.sources, self.targets,
lock=self.lock, compute=compute,
flush=True)
flush=True, regions=self.regions)
self.sources = []
self.targets = []
self.regions = []
return delayed_store


Expand Down
148 changes: 116 additions & 32 deletions xarray/backends/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from ..core import indexing
from ..core.pycompat import integer_types
from ..core.utils import FrozenOrderedDict, HiddenKeyDict
from .common import AbstractWritableDataStore, BackendArray
from .common import AbstractWritableDataStore, BackendArray, \
_encode_variable_name

# need some special secret attributes to tell us the dimensions
_DIMENSION_KEY = '_ARRAY_DIMENSIONS'
Expand Down Expand Up @@ -212,7 +213,7 @@ def encode_zarr_variable(var, needs_copy=True, name=None):
# zarr allows unicode, but not variable-length strings, so it's both
# simpler and more compact to always encode as UTF-8 explicitly.
# TODO: allow toggling this explicitly via dtype in encoding.
coder = coding.strings.EncodedStringCoder(allows_unicode=False)
coder = coding.strings.EncodedStringCoder(allows_unicode=True)
var = coder.encode(var, name=name)
var = coding.strings.ensure_fixed_length_bytes(var)

Expand Down Expand Up @@ -257,6 +258,7 @@ def __init__(self, zarr_group, consolidate_on_close=False):
self._synchronizer = self.ds.synchronizer
self._group = self.ds.path
self._consolidate_on_close = consolidate_on_close
self.append_dim = None

def open_store_variable(self, name, zarr_array):
data = indexing.LazilyOuterIndexedArray(ZarrArrayWrapper(name, self))
Expand Down Expand Up @@ -313,40 +315,122 @@ def encode_variable(self, variable):
def encode_attribute(self, a):
return _encode_zarr_attr_value(a)

def prepare_variable(self, name, variable, check_encoding=False,
unlimited_dims=None):

attrs = variable.attrs.copy()
dims = variable.dims
dtype = variable.dtype
shape = variable.shape

fill_value = attrs.pop('_FillValue', None)
if variable.encoding == {'_FillValue': None} and fill_value is None:
variable.encoding = {}

encoding = _extract_zarr_variable_encoding(
variable, raise_on_invalid=check_encoding)

encoded_attrs = OrderedDict()
# the magic for storing the hidden dimension data
encoded_attrs[_DIMENSION_KEY] = dims
for k, v in attrs.items():
encoded_attrs[k] = self.encode_attribute(v)

zarr_array = self.ds.create(name, shape=shape, dtype=dtype,
fill_value=fill_value, **encoding)
zarr_array.attrs.put(encoded_attrs)

return zarr_array, variable.data

def store(self, variables, attributes, *args, **kwargs):
AbstractWritableDataStore.store(self, variables, attributes,
*args, **kwargs)
def store(self, variables, attributes, check_encoding_set=frozenset(),
writer=None, unlimited_dims=None):
"""
Top level method for putting data on this store, this method:
- encodes variables/attributes
- sets dimensions
- sets variables
Parameters
----------
variables : dict-like
Dictionary of key/value (variable name / xr.Variable) pairs
attributes : dict-like
Dictionary of key/value (attribute name / attribute) pairs
check_encoding_set : list-like
List of variables that should be checked for invalid encoding
values
writer : ArrayWriter
unlimited_dims : list-like
List of dimension names that should be treated as unlimited
dimensions.
dimension on which the zarray will be appended
only needed in append mode
"""

existing_variables = set([vn for vn in variables
if _encode_variable_name(vn) in self.ds])
new_variables = set(variables) - existing_variables
variables_without_encoding = OrderedDict([(vn, variables[vn])
for vn in new_variables])
variables_encoded, attributes = self.encode(
variables_without_encoding, attributes)

if len(existing_variables) > 0:
# there are variables to append
# their encoding must be the same as in the store
ds = open_zarr(self.ds.store, chunks=None)
variables_with_encoding = OrderedDict()
for vn in existing_variables:
variables_with_encoding[vn] = variables[vn].copy(deep=False)
variables_with_encoding[vn].encoding = ds[vn].encoding
variables_with_encoding, _ = self.encode(variables_with_encoding,
{})
variables_encoded.update(variables_with_encoding)

self.set_attributes(attributes)
self.set_dimensions(variables_encoded, unlimited_dims=unlimited_dims)
self.set_variables(variables_encoded, check_encoding_set, writer,
unlimited_dims=unlimited_dims)

def sync(self):
pass

def set_variables(self, variables, check_encoding_set, writer,
unlimited_dims=None):
"""
This provides a centralized method to set the variables on the data
store.
Parameters
----------
variables : dict-like
Dictionary of key/value (variable name / xr.Variable) pairs
check_encoding_set : list-like
List of variables that should be checked for invalid encoding
values
writer :
unlimited_dims : list-like
List of dimension names that should be treated as unlimited
dimensions.
"""

for vn, v in variables.items():
name = _encode_variable_name(vn)
check = vn in check_encoding_set
attrs = v.attrs.copy()
dims = v.dims
dtype = v.dtype
shape = v.shape

fill_value = attrs.pop('_FillValue', None)
if v.encoding == {'_FillValue': None} and fill_value is None:
v.encoding = {}
if name in self.ds:
zarr_array = self.ds[name]
if self.append_dim in dims:
# this is the DataArray that has append_dim as a
# dimension
append_axis = dims.index(self.append_dim)
new_shape = list(zarr_array.shape)
new_shape[append_axis] += v.shape[append_axis]
new_region = [slice(None)] * len(new_shape)
new_region[append_axis] = slice(
zarr_array.shape[append_axis],
None
)
zarr_array.resize(new_shape)
writer.add(v.data, zarr_array,
region=tuple(new_region))
else:
# new variable
encoding = _extract_zarr_variable_encoding(
v, raise_on_invalid=check)
encoded_attrs = OrderedDict()
# the magic for storing the hidden dimension data
encoded_attrs[_DIMENSION_KEY] = dims
for k2, v2 in attrs.items():
encoded_attrs[k2] = self.encode_attribute(v2)

if coding.strings.check_vlen_dtype(dtype) == str:
dtype = str
zarr_array = self.ds.create(name, shape=shape, dtype=dtype,
fill_value=fill_value, **encoding)
zarr_array.attrs.put(encoded_attrs)
writer.add(v.data, zarr_array)

def close(self):
if self._consolidate_on_close:
import zarr
Expand Down
Loading

0 comments on commit 18f35da

Please sign in to comment.