Skip to content

Commit

Permalink
Replace dask 'compute()' usage with a common realisation call. (#2) (#…
Browse files Browse the repository at this point in the history
…2447)

Replace dask 'compute()' usage with a common realisation call.
  • Loading branch information
lbdreyer authored and bjlittle committed May 30, 2017
1 parent 0e26f81 commit e1ff306
Show file tree
Hide file tree
Showing 13 changed files with 190 additions and 49 deletions.
38 changes: 35 additions & 3 deletions lib/iris/_lazy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ def is_lazy_data(data):

def as_lazy_data(data, chunks=_MAX_CHUNK_SIZE):
"""
Convert the input array `data` to a lazy dask array.
Convert the input array `data` to a dask array.
Args:
* data:
An array. This will be converted to a lazy dask array.
An array. This will be converted to a dask array.
Kwargs:
Expand All @@ -63,7 +63,7 @@ def as_lazy_data(data, chunks=_MAX_CHUNK_SIZE):
http://dask.pydata.org/en/latest/array-creation.html#chunks.
Returns:
The input array converted to a lazy dask array.
The input array converted to a dask array.
"""
if not is_lazy_data(data):
Expand All @@ -73,6 +73,38 @@ def as_lazy_data(data, chunks=_MAX_CHUNK_SIZE):
return data


def as_concrete_data(data, **kwargs):
"""
Return the actual content of a lazy array, as a numpy array.
If the input data is a NumPy `ndarray` or masked array, return it
unchanged.
If the input data is lazy, return the realised result.
Where lazy data contains NaNs these are translated by filling or converting
to masked data, using the :func:`~iris._lazy_data.convert_nans_array`
function.
Args:
* data:
A dask array, NumPy `ndarray` or masked array
Kwargs are passed through to :func:`~iris._lazy_data.convert_nans_array`.
Returns:
A NumPy `ndarray` or masked array.
"""
if is_lazy_data(data):
# Realise dask array.
data = data.compute()
# Convert any missing data as requested.
data = convert_nans_array(data, **kwargs)

return data


def array_masked_to_nans(array):
"""
Convert a masked array to a NumPy `ndarray` filled with NaN values. Input
Expand Down
10 changes: 6 additions & 4 deletions lib/iris/_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
import numpy as np
import numpy.ma as ma

from iris._lazy_data import as_lazy_data, is_lazy_data, multidim_lazy_stack
from iris._lazy_data import (as_lazy_data, as_concrete_data, is_lazy_data,
multidim_lazy_stack)
import iris.cube
import iris.coords
import iris.exceptions
Expand Down Expand Up @@ -1217,10 +1218,11 @@ def merge(self, unique=True):
if all_have_data:
# All inputs were concrete, so turn the result back into a
# normal array.
merged_data = merged_data.compute()
# Unmask the array only if it is filled.
merged_data = as_concrete_data(merged_data,
nans_replacement=ma.masked)
# Unmask the array if it has no masked points.
if (ma.isMaskedArray(merged_data) and
ma.count_masked(merged_data) == 0):
not ma.is_masked(merged_data)):
merged_data = merged_data.data
merged_cube = self._get_cube(merged_data)
merged_cubes.append(merged_cube)
Expand Down
20 changes: 15 additions & 5 deletions lib/iris/coords.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@
import warnings
import zlib

from iris._lazy_data import is_lazy_data
import dask.array as da
import netcdftime
import numpy as np

from iris._lazy_data import as_concrete_data, is_lazy_data
import iris.aux_factory
import iris.exceptions
import iris.time
Expand Down Expand Up @@ -1635,7 +1635,11 @@ def _sanitise_array(self, src, ndmin):
def points(self):
"""Property containing the points values as a numpy array"""
if is_lazy_data(self._points):
self._points = self._points.compute()
self._points = as_concrete_data(self._points,
nans_replacement=np.ma.masked)
# NOTE: we probably don't have full support for masked aux-coords.
# We certainly *don't* handle a _FillValue attribute (and possibly
# the loader will throw one away ?)
return self._points.view()

@points.setter
Expand Down Expand Up @@ -1673,7 +1677,11 @@ def bounds(self):
if self._bounds is not None:
bounds = self._bounds
if is_lazy_data(bounds):
bounds = bounds.compute()
bounds = as_concrete_data(bounds,
nans_replacement=np.ma.masked)
# NOTE: we probably don't fully support for masked aux-coords.
# We certainly *don't* handle a _FillValue attribute (and
# possibly the loader will throw one away ?)
self._bounds = bounds
bounds = bounds.view()
else:
Expand Down Expand Up @@ -1764,9 +1772,11 @@ def measure(self):
@property
def data(self):
"""Property containing the data values as a numpy array"""
data = self._data
if is_lazy_data(self._data):
self._data = self._data.compute()
self._data = as_concrete_data(self._data,
nans_replacement=np.ma.masked)
# NOTE: like AuxCoords, we probably don't fully support masks, and
# we certainly don't handle any _FillValue attribute.
return self._data.view()

@data.setter
Expand Down
16 changes: 7 additions & 9 deletions lib/iris/cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
import iris._concatenate
import iris._constraints
from iris._deprecation import warn_deprecated
from iris._lazy_data import (array_masked_to_nans, as_lazy_data,
convert_nans_array, is_lazy_data)
from iris._lazy_data import as_concrete_data, as_lazy_data, is_lazy_data

import iris._merge
import iris.analysis
from iris.analysis.cartography import wrap_lons
Expand Down Expand Up @@ -1750,13 +1750,11 @@ def data(self):
"""
if self.has_lazy_data():
try:
data = self._dask_array.compute()
# Now convert the data payload from a NaN array to a
# masked array, and if appropriate cast to the specified
# cube result dtype.
result = convert_nans_array(data,
nans_replacement=ma.masked,
result_dtype=self.dtype)
# Realise the data, convert from a NaN array to a masked array,
# and if appropriate cast to the specified cube result dtype.
result = as_concrete_data(self._dask_array,
nans_replacement=ma.masked,
result_dtype=self.dtype)
self._numpy_array = result
self.dtype = None

Expand Down
10 changes: 6 additions & 4 deletions lib/iris/fileformats/pp.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@
import netcdftime

from iris._deprecation import warn_deprecated
from iris._lazy_data import (array_masked_to_nans, as_concrete_data,
as_lazy_data, is_lazy_data)
import iris.config
import iris.fileformats.rules
import iris.fileformats.pp_rules
import iris.coord_systems
from iris._lazy_data import array_masked_to_nans, as_lazy_data, is_lazy_data


try:
import mo_pack
Expand Down Expand Up @@ -1280,11 +1282,11 @@ def data(self):
"""
# Cache the real data on first use
if is_lazy_data(self._data):
self._data = self._data.compute()
if self._data.dtype.kind == 'i' and self.bmdi == -1e30:
self.bmdi = -9999
self._data[np.isnan(self._data)] = self.bmdi
if is_lazy_data(self._data):
self._data = as_concrete_data(self._data,
nans_replacement=self.bmdi)
return self._data

@data.setter
Expand Down
13 changes: 6 additions & 7 deletions lib/iris/tests/unit/analysis/test_MEAN.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import numpy.ma as ma

from iris.analysis import MEAN
from iris._lazy_data import as_concrete_data


class Test_lazy_aggregate(tests.IrisTest):
Expand All @@ -42,24 +43,21 @@ def setUp(self):

def test_mdtol_default(self):
agg = MEAN.lazy_aggregate(self.array, axis=self.axis)
result = agg.compute()
masked_result = ma.masked_array(result, mask=np.isnan(result))
masked_result = as_concrete_data(agg, nans_replacement=ma.masked)
self.assertMaskedArrayAlmostEqual(masked_result,
self.expected_masked)

def test_mdtol_below(self):
agg = MEAN.lazy_aggregate(self.array, axis=self.axis, mdtol=0.3)
result = agg.compute()
masked_result = ma.masked_array(result, mask=np.isnan(result))
masked_result = as_concrete_data(agg, nans_replacement=ma.masked)
expected_masked = self.expected_masked
expected_masked.mask = [False, True, True, True]
self.assertMaskedArrayAlmostEqual(masked_result,
expected_masked)

def test_mdtol_above(self):
agg = MEAN.lazy_aggregate(self.array, axis=self.axis, mdtol=0.4)
result = agg.compute()
masked_result = ma.masked_array(result, mask=np.isnan(result))
masked_result = as_concrete_data(agg, nans_replacement=ma.masked)
self.assertMaskedArrayAlmostEqual(masked_result,
self.expected_masked)

Expand All @@ -68,8 +66,9 @@ def test_multi_axis(self):
collapse_axes = (0, 2)
lazy_data = as_lazy_data(data)
agg = MEAN.lazy_aggregate(lazy_data, axis=collapse_axes)
result = as_concrete_data(agg, nans_replacement=ma.masked)
expected = np.mean(data, axis=collapse_axes)
self.assertArrayAllClose(agg.compute(), expected)
self.assertArrayAllClose(result, expected)


class Test_name(tests.IrisTest):
Expand Down
11 changes: 6 additions & 5 deletions lib/iris/tests/unit/analysis/test_STD_DEV.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
# importing anything else.
import iris.tests as tests

from iris._lazy_data import as_lazy_data
import numpy as np

from iris._lazy_data import as_concrete_data, as_lazy_data
from iris.analysis import STD_DEV


Expand All @@ -37,21 +37,22 @@ def test_mdtol(self):
[1., 2., na, na]])
array = as_lazy_data(array)
var = STD_DEV.lazy_aggregate(array, axis=1, mdtol=0.3)
result = var.compute()
masked_result = np.ma.masked_array(result, mask=np.isnan(result))
masked_result = as_concrete_data(var, nans_replacement=np.ma.masked)
masked_expected = np.ma.masked_array([0.57735, 1., 0.707107],
mask=[0, 0, 1])
self.assertMaskedArrayAlmostEqual(masked_result, masked_expected)

def test_ddof_one(self):
array = as_lazy_data(np.arange(8))
var = STD_DEV.lazy_aggregate(array, axis=0, ddof=1)
self.assertArrayAlmostEqual(var.compute(), np.array(2.449489))
result = as_concrete_data(var)
self.assertArrayAlmostEqual(result, np.array(2.449489))

def test_ddof_zero(self):
array = as_lazy_data(np.arange(8))
var = STD_DEV.lazy_aggregate(array, axis=0, ddof=0)
self.assertArrayAlmostEqual(var.compute(), np.array(2.291287))
result = as_concrete_data(var)
self.assertArrayAlmostEqual(result, np.array(2.291287))


class Test_name(tests.IrisTest):
Expand Down
9 changes: 6 additions & 3 deletions lib/iris/tests/unit/analysis/test_VARIANCE.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@
# importing anything else.
import iris.tests as tests

from iris._lazy_data import as_lazy_data
import numpy as np
import numpy.ma as ma

from iris._lazy_data import as_lazy_data, as_concrete_data
from iris.analysis import VARIANCE
import iris.cube
from iris.coords import DimCoord

from iris.tests import mock


Expand Down Expand Up @@ -71,12 +72,14 @@ class Test_lazy_aggregate(tests.IrisTest):
def test_ddof_one(self):
array = as_lazy_data(np.arange(8))
var = VARIANCE.lazy_aggregate(array, axis=0, ddof=1)
self.assertArrayAlmostEqual(var.compute(), np.array(6.0))
result = as_concrete_data(var)
self.assertArrayAlmostEqual(result, np.array(6.0))

def test_ddof_zero(self):
array = as_lazy_data(np.arange(8))
var = VARIANCE.lazy_aggregate(array, axis=0, ddof=0)
self.assertArrayAlmostEqual(var.compute(), np.array(5.25))
result = as_concrete_data(var)
self.assertArrayAlmostEqual(result, np.array(5.25))


class Test_name(tests.IrisTest):
Expand Down
Loading

0 comments on commit e1ff306

Please sign in to comment.