Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask chunking control for netcdf loading. #4572

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 103 additions & 54 deletions lib/iris/_lazy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,29 @@ def is_lazy_data(data):
return result


def _optimum_chunksize(chunks, shape, limit=None, dtype=np.dtype("f4")):
def _optimum_chunksize(
chunks, shape, limit=None, dtype=np.dtype("f4"), dims_fixed=None
):
"""
Reduce or increase an initial chunk shape to get close to a chosen ideal
size, while prioritising the splitting of the earlier (outer) dimensions
and keeping intact the later (inner) ones.

Args:

* chunks (tuple of int, or None):
* chunks (tuple of int):
Pre-existing chunk shape of the target data : None if unknown.
* shape (tuple of int):
The full array shape of the target data.
* limit (int):
The 'ideal' target chunk size, in bytes. Default from dask.config.
* dtype (np.dtype):
Numpy dtype of target data.
* dims_fixed (list of bool):
If set, a list of values equal in length to 'chunks' or 'shape'.
'True' values indicate a dimension that can not be changed, i.e. that
element of the result must equal the corresponding value in 'chunks' or
data.shape.

Returns:
* chunk (tuple of int):
Expand All @@ -83,6 +90,9 @@ def _optimum_chunksize(chunks, shape, limit=None, dtype=np.dtype("f4")):
"chunks = [c[0] for c in normalise_chunks('auto', ...)]".

"""
if chunks is None:
chunks = list(shape)

# Set the chunksize limit.
if limit is None:
# Fetch the default 'optimal' chunksize from the dask config.
Expand All @@ -92,61 +102,93 @@ def _optimum_chunksize(chunks, shape, limit=None, dtype=np.dtype("f4")):

point_size_limit = limit / dtype.itemsize

# Create result chunks, starting with a copy of the input.
result = list(chunks)

if np.prod(result) < point_size_limit:
# If size is less than maximum, expand the chunks, multiplying later
# (i.e. inner) dims first.
i_expand = len(shape) - 1
while np.prod(result) < point_size_limit and i_expand >= 0:
factor = np.floor(point_size_limit * 1.0 / np.prod(result))
new_dim = result[i_expand] * int(factor)
if new_dim >= shape[i_expand]:
# Clip to dim size : chunk dims must not exceed the full shape.
new_dim = shape[i_expand]
else:
# 'new_dim' is less than the relevant dim of 'shape' -- but it
# is also the largest possible multiple of the input-chunks,
# within the size limit.
# So : 'i_expand' is the outer (last) dimension over which we
# will multiply the input chunks, and 'new_dim' is a value that
# ensures the fewest possible chunks within that dim.

# Now replace 'new_dim' with the value **closest to equal-size
# chunks**, for the same (minimum) number of chunks.
# More-equal chunks are practically better.
# E.G. : "divide 8 into multiples of 2, with a limit of 7",
# produces new_dim=6, which would mean chunks of sizes (6, 2).
# But (4, 4) is clearly better for memory and time cost.

# Calculate how many (expanded) chunks fit into this dimension.
dim_chunks = np.ceil(shape[i_expand] * 1.0 / new_dim)
# Get "ideal" (equal) size for that many chunks.
ideal_equal_chunk_size = shape[i_expand] / dim_chunks
# Use the nearest whole multiple of input chunks >= ideal.
new_dim = int(
result[i_expand]
* np.ceil(ideal_equal_chunk_size / result[i_expand])
)

result[i_expand] = new_dim
i_expand -= 1
if dims_fixed is not None:
if not np.any(dims_fixed):
dims_fixed = None

if dims_fixed is None:
# Get initial result chunks, starting with a copy of the input.
working = list(chunks)
else:
# Adjust the operation to ignore the 'fixed' dims.
# (We reconstruct the original later, before return).
chunks = np.array(chunks)
dims_fixed_arr = np.array(dims_fixed)
# Reduce the target size by the fixed size of all the 'fixed' dims.
point_size_limit = point_size_limit // np.prod(chunks[dims_fixed_arr])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to somehow allow Dask's special chunk values (-1, 'auto' and None), but I guess this is at odds with doing this logic on Iris side. Have you tried to inquire if Dask would be interested to add/extend chunking logic to support such use cases? I think it would make the most sense to boot this out of Iris as per the note here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(background, not really an answer!)

The problem with the existing Dask built-in strategy is that is treats all dimensions as equal and tries to even out the chunking across all dims. Which is pretty clearly unsuitable in most of our cases, given the known contiguity factors in file storage, and especially when the file contains 2d-fields which cannot be efficiently sub-indexed.
Which is why we provided the Iris version, to prioritise splitting of the "outer" dims and keeping the "inner" ones together.

But it's certainly true that we could feedback and engage, as-per the note you mention.
Unfortunately though, the dask code is really complicated and engagement seems a bit of an uphill struggle IMHO.
By which I mean ... I have repeatedly attempted to engage with the dask codebase, and maybe contribute, but I've found it very hard to understand, with a lot of unexplained terminology and framework concepts.
So it seems to me that there is a problem with contributing unless you are an "expert", because there is a lot that you need to know beyond what the "ordinary" user sees.

In fact, as a very relevant example, I considered addressing this same chunking control problem by picking apart the graph of a loaded cube to identify and adjust (or replace) the iris.fileformats.netcdf.NetCDFDataProxy objects which are in there. It's perfectly do-able, but you need to understand the structure of the dask graph and employ various assumptions about it : However, there is no public account of that, or any apparent intention for "ordinary users" to manipulate them in this way. So it would seem that a dask graph is really an "opaque" object, and the internal structure might get refactored and change completely, in which case such a solution, being outside intended usage, would simply need re-writing. So effectively, AFAICT only a dask "expert" is expected to work at that level and, crucially, such a solution only really makes sense if the code lives in the dask codebase.

Copy link
Member Author

@pp-mo pp-mo Feb 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TomekTrzeciak It would be nice to somehow allow Dask's special chunk values (-1, 'auto' and None),

Reminder of how that is described :

Chunks also includes three special values:
   * -1: no chunking along this dimension
   * None: no change to the chunking along this dimension (useful for rechunk)
   * "auto": allow the chunking in this dimension to accommodate ideal chunk sizes

From a technical POV, I guess we could respect -1, but the others are a little tricky :
"Auto" is sort-of what we are already doing, with the dims that don't have an explicit control given.
But it isn't quite what Dask means, since we will still want to apply our dimension ordering priority.
"None" I think could only really apply if we treat chunking specified on a file variable as the initial setting. At present, we do use that as a starting point anyway, so I'm not sure how this would differ from "auto" in this usage.

Can you suggest what the interpretations would be -- how would this work ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately though, the dask code is really complicated and engagement seems a bit of an uphill struggle IMHO.
By which I mean ... I have repeatedly attempted to engage with the dask codebase, and maybe contribute, but I've found it very hard to understand, with a lot of unexplained terminology and framework concepts.

Oh, I didn't mean to suggest you make a PR to Dask, only that it may be good to have a conversation and make them aware of the issue.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a technical POV, I guess we could respect -1, but the others are a little tricky :
"Auto" is sort-of what we are already doing, with the dims that don't have an explicit control given. But it isn't quite what Dask means, since we will still want to apply our dimension ordering priority.
"None" I think could only really apply if we treat chunking specified on a file variable as the initial setting. At present, we do use that as a starting point anyway, so I'm not sure how this would differ from "auto" in this usage.

Can you suggest what the interpretations would be -- how would this work ?

For None the only sensible interpretation I can think of is to keep the chunking from the netcdf file itself as you suggest, but dunno what Dask is doing in that case.

I reckon Auto would require repeating the logic that Dask is doing but restricted only to the specified dimensions. Seem quite tricky, probably best not to support it if the implementation is going to live on the Iris' side.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"None" I think could only really apply if we treat chunking specified on a file variable as the initial setting. At present, we do use that as a starting point anyway, so I'm not sure how this would differ from "auto" in this usage.

For None the only sensible interpretation I can think of is to keep the chunking from the netcdf file itself as you suggest, but dunno what Dask is doing in that case.

Sorry, I've re-read your comment and I think this is not what you suggest. My interpretation of None is that it behaves the same as if you would explicitly specify chunks equal to those already existing on the variable, i.e., this is their final rather than initial value.

# Work on only the 'free' dims.
original_shape = tuple(shape)
shape = tuple(np.array(shape)[~dims_fixed_arr])
working = list(chunks[~dims_fixed_arr])

if len(working) >= 1:
if np.prod(working) < point_size_limit:
# If size is less than maximum, expand the chunks, multiplying
# later (i.e. inner) dims first.
i_expand = len(shape) - 1
while np.prod(working) < point_size_limit and i_expand >= 0:
factor = np.floor(point_size_limit * 1.0 / np.prod(working))
new_dim = working[i_expand] * int(factor)
if new_dim >= shape[i_expand]:
# Clip to dim size : must not exceed the full shape.
new_dim = shape[i_expand]
else:
# 'new_dim' is less than the relevant dim of 'shape' -- but
# it is also the largest possible multiple of the
# input-chunks, within the size limit.
# So : 'i_expand' is the outer (last) dimension over which
# we will multiply the input chunks, and 'new_dim' is a
# value giving the fewest possible chunks within that dim.

# Now replace 'new_dim' with the value **closest to
# equal-size chunks**, for the same (minimum) number of
# chunks. More-equal chunks are practically better.
# E.G. : "divide 8 into multiples of 2, with a limit of 7",
# produces new_dim=6, meaning chunks of sizes (6, 2).
# But (4, 4) is clearly better for memory and time cost.

# Calculate how many (expanded) chunks fit in this dim.
dim_chunks = np.ceil(shape[i_expand] * 1.0 / new_dim)
# Get "ideal" (equal) size for that many chunks.
ideal_equal_chunk_size = shape[i_expand] / dim_chunks
# Use the nearest whole multiple of input chunks >= ideal.
new_dim = int(
working[i_expand]
* np.ceil(ideal_equal_chunk_size / working[i_expand])
)

working[i_expand] = new_dim
i_expand -= 1
else:
# Similarly, reduce if too big, reducing earlier (outer) dims first.
i_reduce = 0
while np.prod(working) > point_size_limit:
factor = np.ceil(np.prod(working) / point_size_limit)
new_dim = int(working[i_reduce] / factor)
if new_dim < 1:
new_dim = 1
working[i_reduce] = new_dim
i_reduce += 1

working = tuple(working)

if dims_fixed is None:
result = working
else:
# Similarly, reduce if too big, reducing earlier (outer) dims first.
i_reduce = 0
while np.prod(result) > point_size_limit:
factor = np.ceil(np.prod(result) / point_size_limit)
new_dim = int(result[i_reduce] / factor)
if new_dim < 1:
new_dim = 1
result[i_reduce] = new_dim
i_reduce += 1
# Reconstruct the original form
result = []
for i_dim in range(len(original_shape)):
if dims_fixed[i_dim]:
dim = chunks[i_dim]
else:
dim = working[0]
working = working[1:]
result.append(dim)

return tuple(result)
return result


def as_lazy_data(data, chunks=None, asarray=False):
def as_lazy_data(data, chunks=None, asarray=False, dims_fixed=None):
"""
Convert the input array `data` to a dask array.

Expand All @@ -165,6 +207,11 @@ def as_lazy_data(data, chunks=None, asarray=False):
If True, then chunks will be converted to instances of `ndarray`.
Set to False (default) to pass passed chunks through unchanged.

* dims_fixed (list of bool):
If set, a list of values equal in length to 'chunks' or data.ndim.
'True' values indicate a dimension which can not be changed, i.e. the
result for that index must equal the value in 'chunks' or data.shape.

Returns:
The input array converted to a dask array.

Expand All @@ -186,7 +233,9 @@ def as_lazy_data(data, chunks=None, asarray=False):
# PPDataProxy of "raw" landsea-masked fields, which have a shape of (0, 0).
if all(elem > 0 for elem in data.shape):
# Expand or reduce the basic chunk shape to an optimum size.
chunks = _optimum_chunksize(chunks, shape=data.shape, dtype=data.dtype)
chunks = _optimum_chunksize(
chunks, shape=data.shape, dtype=data.dtype, dims_fixed=dims_fixed
)

if isinstance(data, ma.core.MaskedConstant):
data = ma.masked_array(data.data, mask=data.mask)
Expand Down
Loading