Skip to content

Commit

Permalink
Use meta for data proxies
Browse files Browse the repository at this point in the history
  • Loading branch information
bouweandela committed Apr 16, 2024
1 parent 9497443 commit 0d1fc27
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 36 deletions.
23 changes: 10 additions & 13 deletions lib/iris/_lazy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,7 @@ def _optimum_chunksize(
)


def as_lazy_data(
data, chunks=None, asarray=False, meta=None, dims_fixed=None, dask_chunking=False
):
def as_lazy_data(data, chunks=None, asarray=False, meta=None, dims_fixed=None):
"""Convert the input array `data` to a :class:`dask.array.Array`.
Parameters
Expand All @@ -233,6 +231,8 @@ def as_lazy_data(
This will be converted to a :class:`dask.array.Array`.
chunks : list of int, optional
If present, a source chunk shape, e.g. for a chunked netcdf variable.
If set to "auto", Iris chunking optimisation will be bypassed, and dask's
default chunking will be used instead.
asarray : bool, default=False
If True, then chunks will be converted to instances of `ndarray`.
Set to False (default) to pass passed chunks through unchanged.
Expand All @@ -243,10 +243,6 @@ def as_lazy_data(
If set, a list of values equal in length to 'chunks' or data.ndim.
'True' values indicate a dimension which can not be changed, i.e. the
result for that index must equal the value in 'chunks' or data.shape.
dask_chunking : bool, default=False
If True, Iris chunking optimisation will be bypassed, and dask's default
chunking will be used instead. Including a value for chunks while dask_chunking
is set to True will result in a failure.
Returns
-------
Expand All @@ -265,12 +261,13 @@ def as_lazy_data(
if isinstance(data, ma.core.MaskedConstant):
data = ma.masked_array(data.data, mask=data.mask)

if dask_chunking:
if chunks is not None:
raise ValueError(
f"Dask chunking chosen, but chunks already assigned value {chunks}"
)
else:
if meta is None and not isinstance(data, (np.ndarray, da.Array)):
raise ValueError(
"For performance reasons, `meta` cannot be `None` if `data` is "
"anything other than a Numpy or Dask array."
)

if chunks != "auto":
if chunks is None:
# No existing chunks : Make a chunk the shape of the entire input array
# (but we will subdivide it if too big).
Expand Down
4 changes: 4 additions & 0 deletions lib/iris/fileformats/netcdf/_thread_safe_nc.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,10 @@ def ndim(self):
# noqa: D102
return len(self.shape)

@property
def meta(self):
return np.ma.array(np.empty((0,) * self.ndim, dtype=self.dtype), mask=True)

def __getitem__(self, keys):
# Using a DatasetWrapper causes problems with invalid ID's and the
# netCDF4 library, presumably because __getitem__ gets called so many
Expand Down
7 changes: 2 additions & 5 deletions lib/iris/fileformats/netcdf/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,8 @@ def _get_cf_var_data(cf_var, filename):
)
# Get the chunking specified for the variable : this is either a shape, or
# maybe the string "contiguous".
meta = np.ma.array(
np.empty((0,) * proxy.ndim, dtype=proxy.dtype), mask=True
)
if CHUNK_CONTROL.mode is ChunkControl.Modes.AS_DASK:
result = as_lazy_data(proxy, meta=meta, chunks=None, dask_chunking=True)
result = as_lazy_data(proxy, meta=proxy.meta, chunks="auto")
else:
chunks = cf_var.cf_data.chunking()
if chunks is None:
Expand Down Expand Up @@ -288,7 +285,7 @@ def _get_cf_var_data(cf_var, filename):
if dims_fixed is None:
dims_fixed = [dims_fixed]
result = as_lazy_data(
proxy, meta=meta, chunks=chunks, dims_fixed=tuple(dims_fixed)
proxy, meta=proxy.meta, chunks=chunks, dims_fixed=tuple(dims_fixed)
)
return result

Expand Down
7 changes: 5 additions & 2 deletions lib/iris/fileformats/pp.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,10 @@ def fill_value(self):
def ndim(self):
return len(self.shape)

@property
def meta(self):
return np.empty((0,) * self.ndim, dtype=self.dtype)

def __getitem__(self, keys):
with open(self.path, "rb") as pp_file:
pp_file.seek(self.offset, os.SEEK_SET)
Expand Down Expand Up @@ -1756,8 +1760,7 @@ def _create_field_data(field, data_shape, land_mask_field=None):
if land_mask_field is None:
# For a "normal" (non-landsea-masked) field, the proxy can be
# wrapped directly as a deferred array.
meta = np.empty((0,) * proxy.ndim, dtype=proxy.dtype)
field.data = as_lazy_data(proxy, meta=meta, chunks=block_shape)
field.data = as_lazy_data(proxy, meta=proxy.meta, chunks=block_shape)
else:
# This is a landsea-masked field, and its data must be handled in
# a different way : Because data shape/size is not known in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def test_as_dask(tmp_filepath, save_cubelist_with_sigma):
except RuntimeError as e:
if str(e) != message:
raise e
as_lazy_data.assert_called_with(ANY, chunks=None, dask_chunking=True)
as_lazy_data.assert_called_with(ANY, meta=ANY, chunks="auto")


def test_pinned_optimisation(tmp_filepath, save_cubelist_with_sigma):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def test_deferred_bytes(self):
data_shape = (100, 120)
proxy = mock.Mock(
dtype=np.dtype("f4"),
meta=np.empty((0,) * len(data_shape), dtype=np.dtype("f4")),
shape=data_shape,
spec=pp.PPDataProxy,
ndim=len(data_shape),
Expand Down
6 changes: 4 additions & 2 deletions lib/iris/tests/unit/lazy_data/test_as_concrete_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ def test_lazy_mask_data(self):
def test_lazy_scalar_proxy(self):
a = np.array(5)
proxy = MyProxy(a)
lazy_array = as_lazy_data(proxy)
meta = np.empty((0,) * proxy.ndim, dtype=proxy.dtype)
lazy_array = as_lazy_data(proxy, meta=meta)
self.assertTrue(is_lazy_data(lazy_array))
result = as_concrete_data(lazy_array)
self.assertFalse(is_lazy_data(result))
Expand All @@ -69,7 +70,8 @@ def test_lazy_scalar_proxy(self):
def test_lazy_scalar_proxy_masked(self):
a = np.ma.masked_array(5, True)
proxy = MyProxy(a)
lazy_array = as_lazy_data(proxy)
meta = np.ma.array(np.empty((0,) * proxy.ndim, dtype=proxy.dtype), mask=True)
lazy_array = as_lazy_data(proxy, meta=meta)
self.assertTrue(is_lazy_data(lazy_array))
result = as_concrete_data(lazy_array)
self.assertFalse(is_lazy_data(result))
Expand Down
26 changes: 14 additions & 12 deletions lib/iris/tests/unit/lazy_data/test_as_lazy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,26 +46,28 @@ def test_dask_chunking(self):
chunks = (12,)
optimum = self.patch("iris._lazy_data._optimum_chunksize")
optimum.return_value = chunks
_ = as_lazy_data(data, chunks=None, dask_chunking=True)
_ = as_lazy_data(data, chunks="auto")
self.assertFalse(optimum.called)

def test_dask_chunking_error(self):
data = np.arange(24)
chunks = (12,)
optimum = self.patch("iris._lazy_data._optimum_chunksize")
optimum.return_value = chunks
with self.assertRaisesRegex(
ValueError,
r"Dask chunking chosen, but chunks already assigned value",
):
as_lazy_data(data, chunks=chunks, dask_chunking=True)

def test_with_masked_constant(self):
masked_data = ma.masked_array([8], mask=True)
masked_constant = masked_data[0]
result = as_lazy_data(masked_constant)
self.assertIsInstance(result, da.core.Array)

def test_missing_meta(self):
class MyProxy:
pass

data = MyProxy()

with self.assertRaisesRegex(
ValueError,
r"`meta` cannot be `None` if `data` is anything other than a Numpy "
r"or Dask array.",
):
as_lazy_data(data)


class Test__optimised_chunks(tests.IrisTest):
# Stable, known chunksize for testing.
Expand Down
3 changes: 2 additions & 1 deletion lib/iris/tests/unit/lazy_data/test_co_realise_cubes.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def __init__(self, array):
self.ndim = array.ndim
self._array = array
self.access_count = 0
self.meta = np.empty((0,) * array.ndim, dtype=array.dtype)

def __getitem__(self, keys):
self.access_count += 1
Expand Down Expand Up @@ -55,7 +56,7 @@ def test_multi(self):

def test_combined_access(self):
wrapped_array = ArrayAccessCounter(np.arange(3.0))
lazy_array = as_lazy_data(wrapped_array)
lazy_array = as_lazy_data(wrapped_array, meta=wrapped_array.meta)
derived_a = lazy_array + 1
derived_b = lazy_array + 2
derived_c = lazy_array + 3
Expand Down

0 comments on commit 0d1fc27

Please sign in to comment.