Skip to content

Commit

Permalink
Merge pull request #373 from davidhassell/dask-all-any
Browse files Browse the repository at this point in the history
dask: `Data.all` and `Data.any`
  • Loading branch information
davidhassell authored Apr 29, 2022
2 parents 38d99de + 62e83d4 commit cd4dc74
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 92 deletions.
8 changes: 8 additions & 0 deletions cf/data/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,11 @@ Inheritance from `cfdm`

Generally, how do we deal with optimisation for objects and logic inherited
from `cfdm`, since the current plan is not to Daskify `cfdm.Data`?

Returned Booleans
-----------------

When a method currently returns a Boolean (such as `Data.all`), should
it in fact return a lazy size 1 `Data` object?. The numpy and dask
`all` functions have an "axis" keyword that allows non-scalar outputs,
and a keepdims argument.
167 changes: 102 additions & 65 deletions cf/data/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ def __bool__(self):
"elements is ambiguous. Use d.any() or d.all()"
)

return bool(self.array)
return bool(self._get_dask())

def __repr__(self):
"""Called by the `repr` built-in function.
Expand Down Expand Up @@ -3732,6 +3732,9 @@ def _set_subspace(self, *args, **kwargs):
"""'cf.Data._set_subspace' is unavailable."""
raise NotImplementedError("'cf.Data._set_subspace' is unavailable.")

def _parse_indices(self, *args, **kwargs):
raise NotImplementedError("Use cf.parse_indices instead")

@classmethod
def concatenate(cls, data, axis=0, _preserve=True):
"""Join a sequence of data arrays together.
Expand Down Expand Up @@ -5233,14 +5236,14 @@ def mask(self):
(12, 73, 96)
"""
mask_data_obj = self.copy()
mask_data_obj = self.copy(array=False)

dx = self.to_dask_array()
mask = da.ma.getmaskarray(dx)

mask_data_obj._set_dask(mask, reset_mask_hardness=False)
mask_data_obj.override_units(_units_None, inplace=True)
mask_data_obj.hardmask = True
mask_data_obj.hardmask = _DEFAULT_HARDMASK

return mask_data_obj

Expand Down Expand Up @@ -5604,56 +5607,70 @@ def arccosh(self, inplace=False):

return d

def all(self):
@daskified(_DASKIFIED_VERBOSE)
def all(self, axis=None, keepdims=True, split_every=None):
"""Test whether all data array elements evaluate to True.
Performs a logical ``and`` over the data array and returns the
result. Masked values are considered as True during computation.
.. seealso:: `allclose`, `any`, `isclose`
:Parameters:
axis: (sequence of) `int`, optional
Axis or axes along which a logical AND reduction is
performed. The default (`None`) is to perform a
logical AND over all the dimensions of the input
array. *axis* may be negative, in which case it counts
from the last to the first axis.
{{collapse keepdims: `bool`, optional}}
{{split_every: `int` or `dict`, optional}}
:Returns:
`bool`
`Data`
Whether or not all data array elements evaluate to True.
**Examples**
>>> d = cf.Data([[1, 3, 2]])
>>> print(d.array)
[[1 3 2]]
>>> d.all()
True
>>> d[0, 2] = cf.masked
>>> print(d.array)
[[1 3 --]]
>>> d = cf.Data([[1, 2], [3, 4]])
>>> d.all()
True
>>> d[0, 0] = 0
<CF Data(1, 1): [[True]]>
>>> d.all(keepdims=False)
<CF Data(1, 1): True>
>>> d.all(axis=0)
<CF Data(1, 2): [[True, True]]>
>>> d.all(axis=1)
<CF Data(2, 1): [[True, True]]>
>>> d.all(axis=())
<CF Data(2, 2): [[True, ..., True]]>
>>> d[0] = cf.masked
>>> d[1, 0] = 0
>>> print(d.array)
[[0 3 --]]
>>> d.all()
False
[[-- --]
[0 4]]
>>> d.all(axis=0)
<CF Data(1, 2): [[False, True]]>
>>> d.all(axis=1)
<CF Data(2, 1): [[--, False]]>
>>> d[...] = cf.masked
>>> print(d.array)
[[-- -- --]]
>>> d.all()
<CF Data(1, 1): [[--]]>
>>> bool(d.all())
True
>>> bool(d.all(keepdims=False))
False
"""
config = self.partition_configuration(readonly=True)

for partition in self.partitions.matrix.flat:
partition.open(config)
array = partition.array
a = array.all()
if not a and a is not np.ma.masked:
partition.close()
return False

partition.close()

return True
d = self.copy(array=False)
dx = self._get_dask()
dx = da.all(dx, axis=axis, keepdims=keepdims, split_every=split_every)
d._set_dask(dx, reset_mask_hardness=False)
d.hardmask = _DEFAULT_HARDMASK
d.override_units(_units_None, inplace=True)
return d

def allclose(self, y, rtol=None, atol=None):
"""Returns True if two broadcastable arrays have equal values,
Expand Down Expand Up @@ -5709,48 +5726,68 @@ def allclose(self, y, rtol=None, atol=None):
"""
return self.isclose(y, rtol=rtol, atol=atol).all()

def any(self):
def any(self, axis=None, keepdims=True, split_every=None):
"""Test whether any data array elements evaluate to True.
Performs a logical or over the data array and returns the
result. Masked values are considered as False during computation.
.. seealso:: `all`, `allclose`, `isclose`
:Parameters:
axis: (sequence of) `int`, optional
Axis or axes along which a logical OR reduction is
performed. The default (`None`) is to perform a
logical OR over all the dimensions of the input
array. *axis* may be negative, in which case it counts
from the last to the first axis.
{{collapse keepdims: `bool`, optional}}
{{split_every: `int` or `dict`, optional}}
:Returns:
`Data`
Whether or any data array elements evaluate to True.
**Examples**
>>> d = cf.Data([[0, 0, 0]])
>>> d = cf.Data([[0, 2], [0, 4]])
>>> d.any()
False
>>> d[0, 0] = cf.masked
>>> print(d.array)
[[-- 0 0]]
>>> d.any()
False
>>> d[0, 1] = 3
>>> print(d.array)
[[0 3 0]]
>>> d.any()
True
<CF Data(1, 1): [[True]]>
>>> d.any(keepdims=False)
<CF Data(1, 1): True>
>>> d.any(axis=0)
<CF Data(1, 2): [[False, True]]>
>>> d.any(axis=1)
<CF Data(2, 1): [[True, True]]>
>>> d.any(axis=())
<CF Data(2, 2): [[False, ..., True]]>
>>> d[0] = cf.masked
>>> print(d.array)
[[-- -- --]]
[[-- --]
[0 4]]
>>> d.any(axis=0)
<CF Data(1, 2): [[False, True]]>
>>> d.any(axis=1)
<CF Data(2, 1): [[--, True]]>
>>> d[...] = cf.masked
>>> d.any()
<CF Data(1, 1): [[--]]>
>>> bool(d.any())
False
>>> bool(d.any(keepdims=False))
False
"""
config = self.partition_configuration(readonly=True)

for partition in self.partitions.matrix.flat:
partition.open(config)
array = partition.array
if array.any():
partition.close()
return True

partition.close()

return False
d = self.copy(array=False)
dx = self._get_dask()
dx = da.any(dx, axis=axis, keepdims=keepdims, split_every=split_every)
d._set_dask(dx, reset_mask_hardness=False)
d.hardmask = _DEFAULT_HARDMASK
d.override_units(_units_None, inplace=True)
return d

@daskified(_DASKIFIED_VERBOSE)
@_inplace_enabled(default=False)
Expand Down
25 changes: 16 additions & 9 deletions cf/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1864,17 +1864,16 @@ def _numpy_isclose(a, b, rtol=None, atol=None):
return a == b


# TODODASK - sort out the "numpy" environment


def parse_indices(shape, indices, cyclic=False, keepdims=True):
"""TODODASK.
"""Parse indices for array access and assignment.
:Parameters:
shape: sequence of `ints`
The shape of the array.
indices: `tuple` (not a `list`!)
indices: `tuple`
The indices to be applied.
keepdims: `bool`, optional
If True then an integral index is converted to a
Expand All @@ -1887,12 +1886,20 @@ def parse_indices(shape, indices, cyclic=False, keepdims=True):
is also returned that contains the parameters needed to
interpret any cyclic slices.
**Examples:**
**Examples**
>>> cf.parse_indices((5, 8), ([1, 2, 4, 6],))
[array([1, 2, 4, 6]), slice(0, 8, 1)]
[array([1, 2, 4, 6]), slice(None, None, None)]
>>> cf.parse_indices((5, 8), (Ellipsis, [2, 4, 6]))
[slice(0, 5, 1), slice(2, 7, 2)]
[slice(None, None, None), [2, 4, 6]]
>>> cf.parse_indices((5, 8), (Ellipsis, 4))
[slice(None, None, None), slice(4, 5, 1)]
>>> cf.parse_indices((5, 8), (Ellipsis, 4), keepdims=False)
[slice(None, None, None), 4]
>>> cf.parse_indices((5, 8), (slice(-2, 2)), cyclic=False)
[slice(-2, 2, None), slice(None, None, None)]
>>> cf.parse_indices((5, 8), (slice(-2, 2)), cyclic=True)
([slice(0, 4, 1), slice(None, None, None)], {0: 2})
"""
parsed_indices = []
Expand Down Expand Up @@ -3025,7 +3032,7 @@ def environment(display=True, paths=True):
"Python": (platform.python_version(), sys.executable),
"netCDF4": _get_module_info("netCDF4"),
"cftime": _get_module_info("cftime"),
"numpy": (_numpy__version__, _os_path_abspath(_numpy__file__)),
"numpy": _get_module_info("numpy"),
"psutil": _get_module_info("psutil"),
"scipy": _get_module_info("scipy", try_except=True),
"matplotlib": _get_module_info("matplotlib", try_except=True),
Expand Down
51 changes: 33 additions & 18 deletions cf/test/test_Data.py
Original file line number Diff line number Diff line change
Expand Up @@ -1447,33 +1447,48 @@ def test_Data_outerproduct(self):
self.assertIsNone(d.outerproduct(e, inplace=True))
self.assertEqual(d.shape, (40, 30, 5), d.shape)

@unittest.skipIf(TEST_DASKIFIED_ONLY, "no attr. 'partition_configuration'")
def test_Data_all(self):
if self.test_only and inspect.stack()[0][3] not in self.test_only:
return

d = cf.Data(np.array([[0] * 1000]))
self.assertTrue(not d.all())
d[-1, -1] = 1
self.assertFalse(d.all())
d[...] = 1
d = cf.Data([[1, 2], [3, 4]], "m")
self.assertTrue(d.all())
self.assertEqual(d.all(keepdims=False).shape, ())
self.assertEqual(d.all(axis=()).shape, d.shape)
self.assertTrue((d.all(axis=0).array == [True, True]).all())
self.assertTrue((d.all(axis=1).array == [True, True]).all())
self.assertEqual(d.all().Units, cf.Units())

d[0] = cf.masked
d[1, 0] = 0
self.assertTrue((d.all(axis=0).array == [False, True]).all())
self.assertTrue(
(
d.all(axis=1).array == np.ma.array([True, False], mask=[1, 0])
).all()
)

d[...] = cf.masked
self.assertTrue(d.all())
self.assertFalse(d.all(keepdims=False))

@unittest.skipIf(TEST_DASKIFIED_ONLY, "no attr. 'partition_configuration'")
def test_Data_any(self):
if self.test_only and inspect.stack()[0][3] not in self.test_only:
return

d = cf.Data(np.array([[0] * 1000]))
self.assertFalse(d.any())
d[-1, -1] = 1
self.assertTrue(d.any())
d[...] = 1
d = cf.Data([[0, 2], [0, 4]])
self.assertTrue(d.any())
self.assertEqual(d.any(keepdims=False).shape, ())
self.assertEqual(d.any(axis=()).shape, d.shape)
self.assertTrue((d.any(axis=0).array == [False, True]).all())
self.assertTrue((d.any(axis=1).array == [True, True]).all())
self.assertEqual(d.any().Units, cf.Units())

d[0] = cf.masked
self.assertTrue((d.any(axis=0).array == [False, True]).all())
self.assertTrue(
(
d.any(axis=1).array == np.ma.array([True, True], mask=[1, 0])
).all()
)

d[...] = cf.masked
self.assertFalse(d.any())
self.assertFalse(d.any(keepdims=False))

def test_Data_array(self):
if self.test_only and inspect.stack()[0][3] not in self.test_only:
Expand Down

0 comments on commit cd4dc74

Please sign in to comment.