Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data.func and friends: migrate to Dask #300

Merged
88 changes: 46 additions & 42 deletions cf/data/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2822,6 +2822,7 @@ def can_compute(self, functions=None, log_levels=None, override=False):
]
)

@daskified(_DASKIFIED_VERBOSE)
@_deprecated_kwarg_check("i")
def ceil(self, inplace=False, i=False):
"""The ceiling of the data, element-wise.
Expand Down Expand Up @@ -2854,7 +2855,7 @@ def ceil(self, inplace=False, i=False):
[-1. -1. -1. -1. 0. 1. 2. 2. 2.]

"""
return self.func(np.ceil, out=True, inplace=inplace)
return self.func(np.ceil, inplace=inplace)

@daskified(_DASKIFIED_VERBOSE)
@_inplace_enabled(default=False)
Expand Down Expand Up @@ -6649,6 +6650,7 @@ def seterr(all=None, divide=None, over=None, under=None, invalid=None):
return old

# `arctan2`, AT2 seealso
@daskified(_DASKIFIED_VERBOSE)
@_inplace_enabled(default=False)
def arctan(self, inplace=False):
"""Take the trigonometric inverse tangent of the data element-
Expand Down Expand Up @@ -6735,6 +6737,7 @@ def arctan(self, inplace=False):
# '''
# return cls(numpy_arctan2(y, x), units=_units_radians)

@daskified(_DASKIFIED_VERBOSE)
@_inplace_enabled(default=False)
def arctanh(self, inplace=False):
"""Take the inverse hyperbolic tangent of the data element-wise.
Expand Down Expand Up @@ -6787,6 +6790,7 @@ def arctanh(self, inplace=False):

return d

@daskified(_DASKIFIED_VERBOSE)
@_inplace_enabled(default=False)
def arcsin(self, inplace=False):
"""Take the trigonometric inverse sine of the data element-wise.
Expand Down Expand Up @@ -6839,6 +6843,7 @@ def arcsin(self, inplace=False):

return d

@daskified(_DASKIFIED_VERBOSE)
@_inplace_enabled(default=False)
def arcsinh(self, inplace=False):
"""Take the inverse hyperbolic sine of the data element-wise.
Expand Down Expand Up @@ -6883,6 +6888,7 @@ def arcsinh(self, inplace=False):

return d

@daskified(_DASKIFIED_VERBOSE)
@_inplace_enabled(default=False)
def arccos(self, inplace=False):
"""Take the trigonometric inverse cosine of the data element-
Expand Down Expand Up @@ -6936,6 +6942,7 @@ def arccos(self, inplace=False):

return d

@daskified(_DASKIFIED_VERBOSE)
@_inplace_enabled(default=False)
def arccosh(self, inplace=False):
"""Take the inverse hyperbolic cosine of the data element-wise.
Expand Down Expand Up @@ -8521,6 +8528,7 @@ def compressed(self, inplace=False):

return d

@daskified(_DASKIFIED_VERBOSE)
@_deprecated_kwarg_check("i")
@_inplace_enabled(default=False)
def cos(self, inplace=False, i=False):
Expand Down Expand Up @@ -9180,6 +9188,7 @@ def equals(
else:
return True

@daskified(_DASKIFIED_VERBOSE)
@_deprecated_kwarg_check("i")
@_inplace_enabled(default=False)
def exp(self, inplace=False, i=False):
Expand Down Expand Up @@ -10126,6 +10135,7 @@ def flatten(self, axes=None, inplace=False):

return out

@daskified(_DASKIFIED_VERBOSE)
@_deprecated_kwarg_check("i")
def floor(self, inplace=False, i=False):
"""Return the floor of the data array.
Expand Down Expand Up @@ -10153,7 +10163,7 @@ def floor(self, inplace=False, i=False):
[-2. -2. -2. -1. 0. 1. 1. 1. 1.]

"""
return self.func(np.floor, out=True, inplace=inplace)
return self.func(np.floor, inplace=inplace)

@_deprecated_kwarg_check("i")
def outerproduct(self, e, inplace=False, i=False):
Expand Down Expand Up @@ -11010,6 +11020,7 @@ def isclose(self, y, rtol=None, atol=None):
except (TypeError, NotImplementedError, IndexError):
return self == y

@daskified(_DASKIFIED_VERBOSE)
@_deprecated_kwarg_check("i")
def rint(self, inplace=False, i=False):
"""Round the data to the nearest integer, element-wise.
Expand Down Expand Up @@ -11039,7 +11050,7 @@ def rint(self, inplace=False, i=False):
[-2. -2. -1. -1. 0. 1. 1. 2. 2.]

"""
return self.func(np.rint, out=True, inplace=inplace)
return self.func(np.rint, inplace=inplace)

def root_mean_square(
self,
Expand Down Expand Up @@ -11120,6 +11131,7 @@ def root_mean_square(
_preserve_partitions=_preserve_partitions,
)

@daskified(_DASKIFIED_VERBOSE)
@_deprecated_kwarg_check("i")
def round(self, decimals=0, inplace=False, i=False):
"""Evenly round elements of the data array to the given number
Expand Down Expand Up @@ -11164,9 +11176,7 @@ def round(self, decimals=0, inplace=False, i=False):
[-0., -0., -0., -0., 0., 0., 0., 0., 0.]

"""
return self.func(
np.round, out=True, inplace=inplace, decimals=decimals
)
return self.func(np.round, inplace=inplace, decimals=decimals)

def stats(
self,
Expand Down Expand Up @@ -11760,6 +11770,7 @@ def where(

return d

@daskified(_DASKIFIED_VERBOSE)
@_deprecated_kwarg_check("i")
@_inplace_enabled(default=False)
def sin(self, inplace=False, i=False):
Expand Down Expand Up @@ -11816,6 +11827,7 @@ def sin(self, inplace=False, i=False):

return d

@daskified(_DASKIFIED_VERBOSE)
@_deprecated_kwarg_check("i")
@_inplace_enabled(default=False)
def sinh(self, inplace=False):
Expand Down Expand Up @@ -11872,6 +11884,7 @@ def sinh(self, inplace=False):
d.func(np.sinh, units=_units_1, inplace=True)
return d

@daskified(_DASKIFIED_VERBOSE)
@_inplace_enabled(default=False)
def cosh(self, inplace=False):
"""Take the hyperbolic cosine of the data element-wise.
Expand Down Expand Up @@ -11927,6 +11940,7 @@ def cosh(self, inplace=False):

return d

@daskified(_DASKIFIED_VERBOSE)
@_deprecated_kwarg_check("i")
@_inplace_enabled(default=False)
def tanh(self, inplace=False):
Expand Down Expand Up @@ -11985,6 +11999,9 @@ def tanh(self, inplace=False):

return d

# TODOASK: daskified except in the case of arbitrary base (not e, 2 or 10)
# which requires `__itruediv__` to be daskified.
# @daskified(_DASKIFIED_VERBOSE)
@_deprecated_kwarg_check("i")
@_inplace_enabled(default=False)
def log(self, base=None, inplace=False, i=False):
Expand Down Expand Up @@ -12130,6 +12147,7 @@ def squeeze(self, axes=None, inplace=False, i=False):
return d

# `arctan2`, AT2 seealso
@daskified(_DASKIFIED_VERBOSE)
@_deprecated_kwarg_check("i")
@_inplace_enabled(default=False)
def tan(self, inplace=False, i=False):
Expand Down Expand Up @@ -12282,6 +12300,7 @@ def transpose(self, axes=None, inplace=False, i=False):

return d

@daskified(_DASKIFIED_VERBOSE)
@_deprecated_kwarg_check("i")
def trunc(self, inplace=False, i=False):
"""Return the truncated values of the data array.
Expand Down Expand Up @@ -12313,7 +12332,7 @@ def trunc(self, inplace=False, i=False):
[-1. -1. -1. -1. 0. 1. 1. 1. 1.]

"""
return self.func(np.trunc, out=True, inplace=inplace)
return self.func(np.trunc, inplace=inplace)

@classmethod
def empty(
Expand Down Expand Up @@ -12447,6 +12466,8 @@ def zeros(cls, shape, dtype=None, units=None, calendar=None, chunk=True):
shape, 0, dtype=dtype, units=units, calendar=calendar, chunk=chunk
)

@daskified(_DASKIFIED_VERBOSE)
@_deprecated_kwarg_check("out")
@_deprecated_kwarg_check("i")
@_inplace_enabled(default=False)
def func(
Expand All @@ -12468,7 +12489,7 @@ def func(

units: `Units`, optional

out: `bool`, optional
out: deprecated at version 4.0.0

{{inplace: `bool`, optional}}

Expand Down Expand Up @@ -12516,44 +12537,27 @@ def func(

sadielbartholomew marked this conversation as resolved.
Show resolved Hide resolved
"""
d = _inplace_enabled_define_and_cleanup(self)
dx = d._get_dask()

config = d.partition_configuration(readonly=False)

datatype = d.dtype

for partition in d.partitions.matrix.flat:
partition.open(config)
array = partition.array

# Steps for masked data when want to preserve invalid values:
# Step 1. extract the non-masked data and the mask separately
detach_mask = preserve_invalid and np.ma.isMA(array)
if detach_mask:
mask = array.mask # must store mask before detach it below
array = array.data # mask detached

if out:
f(array, out=array, **kwargs)
else:
# Step 2: apply operation to data alone
array = f(array, **kwargs)

p_datatype = array.dtype
if datatype != p_datatype:
datatype = np.result_type(p_datatype, datatype)

if detach_mask:
# Step 3: reattach original mask onto the output data
array = np.ma_array(array, mask=mask)

partition.subarray = array
# TODODASK: Steps to preserve invalid values shown, taking same
# approach as pre-daskification, but maybe we can now change approach
# to avoid finding mask and data, which requires early compute...
# Step 1. extract the non-masked data and the mask separately
if preserve_invalid:
# Assume all inputs are masked, as checking for a mask to confirm
# is expensive. If unmasked, effective mask will be all False.
dx_mask = da.ma.getmaskarray(dx) # store original mask
dx = da.ma.getdata(dx)

if units is not None:
partition.Units = units
# Step 2: apply operation to data alone
axes = tuple(range(dx.ndim))
dx = da.blockwise(f, axes, dx, axes, **kwargs)

partition.close()
if preserve_invalid:
# Step 3: reattach original mask onto the output data
dx = da.ma.masked_array(dx, mask=dx_mask)

d.dtype = datatype
d._set_dask(dx, reset_mask_hardness=True)

if units is not None:
d._Units = units
Expand Down
Loading