Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/dask poc #192

Merged
merged 48 commits into from
Aug 26, 2022
Merged

Feature/dask poc #192

merged 48 commits into from
Aug 26, 2022

Conversation

toloudis
Copy link
Contributor

@toloudis toloudis commented Apr 15, 2022

Admittedly I am an extremely naive dask user but I'm taking a stab at this because I really want to pass a large dask array in to the ome zarr writer here.

I wonder if anyone can chime in on how to make the resize work inside of Scaler - or indeed how to make this code more correct. This is heavily WIP just to try to get anything working at first. (For the array I'm passing in now, there's some size broadcast mismatch error when it gets to compute in the resizing stuff.)

I am loading some data using aicsimageio's get_image_dask_data and then basically forwarding it into this writer (through the WIP aicsimageio.writers.OmeZarrWriter)

Possibly addressing #187

@codecov
Copy link

codecov bot commented Apr 15, 2022

Codecov Report

Merging #192 (607b85c) into master (9343c9c) will increase coverage by 0.29%.
The diff coverage is 88.99%.

@@            Coverage Diff             @@
##           master     #192      +/-   ##
==========================================
+ Coverage   83.90%   84.19%   +0.29%     
==========================================
  Files          12       13       +1     
  Lines        1379     1474      +95     
==========================================
+ Hits         1157     1241      +84     
- Misses        222      233      +11     
Impacted Files Coverage Δ
ome_zarr/reader.py 85.00% <0.00%> (ø)
ome_zarr/dask_utils.py 71.42% <71.42%> (ø)
ome_zarr/writer.py 95.45% <93.75%> (-0.83%) ⬇️
ome_zarr/scale.py 68.42% <95.65%> (+4.70%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 9343c9c...607b85c. Read the comment docs.

)
if isinstance(out, dask.array.Array):
new_stack = dask.array.zeros(
(*shape_dims, out.shape[0], out.shape[1]),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the use of out.shape here is questionable because the result of resize was delayed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is now resolved because the dask chunks were set appropriately

@toloudis
Copy link
Contributor Author

after the last couple of commits, the writer seems to be working in my simple test case (only the "nearest" method has been modified to use dask in the Scaler)

@toloudis
Copy link
Contributor Author

calling compute_mip before trying to store anything out is potentially problematic for very large data even in dask land.

@joshmoore
Copy link
Member

cc: @jakirkham here just in case he has any https://data-apis.org/ insight.

@toloudis
Copy link
Contributor Author

the delayed create_mip is definitely slow with dask and is one of those steps where it appears like nothing is happening in between writing out levels. something seems inefficient, like maybe we could process all levels for each plane while we have it in memory, but then the write pattern changes completely. on the other hand, maybe this is as good as it gets?

@will-moore
Copy link
Member

I tested a simple case using sample code at https://ome-zarr.readthedocs.io/en/stable/python.html#writing-ome-ngff-images
but with data = da.from_array(data) and this seems to work fine.

I tried a more 'real' example, loading data from "https://uk1s3.embassy.ebi.ac.uk/idr/zarr/v0.1/6001247.zarr" (see below). This takes a while and I end up with the 6001247.zarr dir, including .zattrs and resolutions dirs /0 - /4, each containing only the .zarray but no chunks.

import dask.array as da
import zarr
from zarr.storage import FSStore
import os

from ome_zarr.io import parse_url
from ome_zarr.writer import write_image

url = "https://uk1s3.embassy.ebi.ac.uk/idr/zarr/v0.1/6001247.zarr"
kwargs = {
    "dimension_separator": "/",
    "normalize_keys": False,
}

store = FSStore(url, mode='r', **kwargs)
data = da.from_zarr(store, '0')
print('data', data)

path = "6001247.zarr"
os.mkdir(path)

write_store = parse_url(path, mode="w").store
root = zarr.group(store=write_store)
write_image(image=data, group=root, axes="tczyx")

I haven't looked at any resize issues yet.
I'll try downloading some sample data...

@toloudis do you want to share your code you're using to test this?

@toloudis
Copy link
Contributor Author

toloudis commented Apr 19, 2022

The code I am using to test is inside of aicsimageio's feature/zarrwriter branch

https://github.com/AllenCellModeling/aicsimageio/tree/feature/zarrwriter
also known as
AllenCellModeling/aicsimageio#381

The key files are aicsimageio/writers/ome_zarr_writer.py and scripts/makezarr.ipynb which loads some data using aicsimageio into a dask array, and then calls the ome_zarr_writer to save it to zarr.

One interesting thing is that the OmeZarrWriter (which calls this branch of ome-zarr-py) selects chunk sizes using a selectable limit (I chose 16MB for testing but could be parameterized)

@joshmoore
Copy link
Member

cc: @aeisenbarth if there's anything from the https://github.com/aeisenbarth/ngff-writer front

@joshmoore
Copy link
Member

url = "https://uk1s3.embassy.ebi.ac.uk/idr/zarr/v0.1/6001247.zarr"
kwargs = {
    "dimension_separator": "/",
    "normalize_keys": False,
}

@will-moore, I think you're missing data is due to the fact that you're testing with an v0.1 dataset that uses . separators. I can get the code working with a freshly converted dataset.

@aeisenbarth
Copy link
Contributor

  • dask.array.to_zarr: In my experience, computing pyramid levels with dask and writing them immediately is inefficient (dask.array.to_zarr(…, compute=True)). The next pyramid level will not receive the computed array from the previous level, only the delayed object. The next step computes 2 times downscaling, then the next step 3 times etc. Instead, we get the delayed objects for to_zarr and let dask compute them all at once. This gives a single dask graph with several sinks instead of several independent dask graphs.

  • _by_plane: The changes look good to me, also performance-wise. We use a somewhat different approach which is generalized for arbitrary axis order: We move/reshape axes so that we have blocks of to-be-resized axes (spatial 2D/or 3D), then iterate and resize them and restore the desired shape. Using the array protocol the same code works for Numpy and Dask arrays.

@will-moore
Copy link
Member

Thanks to @joshmoore for pointing out the errors in my script above (wrong dimension_separator for "v0.1" data).
I have fixed that, and also added a "re-writing" logic, so that the first time you run the script, it downloads the zarr from remote URL and the 2nd time you run it, it instead reads from the downloaded zarr and writes a copy (to test the speed of writing dask without network delays).

That script is here (it takes ~4 minutes to rewrite the zarr, using this PR):

write_dask_to_omezarr.py
from importlib.resources import path
import zarr
import os
from datetime import datetime

from ome_zarr.io import parse_url
from ome_zarr.writer import write_image

url = "https://uk1s3.embassy.ebi.ac.uk/idr/zarr/v0.3/idr0095B/11511419.zarr"
target = "11511419.zarr"

# if we've already downloaded, try local re-write (perf not due to network speed)
if os.path.exists(target):
    url = target
    target = "out_" + target
    print("re-writing to", target)
else:
    print("downloading from", url)

reader = parse_url(url, mode="r")
# read full-size resolution
data = reader.load('0')

print('data', data)
print('attrs', reader.root_attrs)

ndims = len(data.shape)
default_axes = "tczyx"[5-ndims:]

axes = reader.root_attrs["multiscales"][0].get("axes", default_axes)
print("axes", axes)

os.mkdir(target)

start_time = datetime.now()
print("start_time", start_time)

# write the image data
write_store = parse_url(target, mode="w").store
root = zarr.group(store=write_store)
# Scaler will downsample dask data in X and Y per plane
write_image(image=data, group=root, axes=axes)

print("done. Duration:", datetime.now() - start_time)

To compare to ngff-writer, here's another script that does the same thing, using ngff-writer.
It performs the re-write in less that 0.01 seconds (Thanks to @aeisenbarth for explaining the logic of this):

ngff-writer-from-url.py
from ngff_writer.writer import open_ngff_zarr
import os
from datetime import datetime

from ome_zarr.io import parse_url

url = "https://uk1s3.embassy.ebi.ac.uk/idr/zarr/v0.3/idr0095B/11511419.zarr"
target = "11511419.zarr"

output_dir = "output"
collection_name = "my_images"

target_path = os.path.join(output_dir, collection_name, target)

# if we've already downloaded, try local re-write (perf not due to network speed)
if os.path.exists(target_path):
    url = target_path
    target = "rewrite_" + target
    print("reading from ", target_path)
    print("re-writing to", target)
else:
    print("downloading from", url)

reader = parse_url(url, mode="r")
multiscale = reader.root_attrs["multiscales"][0]
# read full-size resolution
data = reader.load(multiscale["datasets"][0]["path"])
axes = multiscale.get("axes")
print("axes", axes)
axes_names = None
if axes is not None:
    axes_names = axes
print("axes_names", axes_names)

start_time = datetime.now()
print("start_time", start_time)

with open_ngff_zarr(
    store=output_dir,
    dimension_separator="/",
    overwrite=True,
) as f:
    collection = f.add_collection(name=collection_name)

    image = collection.add_image(
        image_name=target,
        dimension_axes = axes_names,
        array=data
    )

print("done. Duration:", datetime.now() - start_time)

It might take a bit of work to adopt the approach that @aeisenbarth uses, but it's certainly what's needed!
@toloudis Do you want to look into that, or I can have a go if you want?

@will-moore
Copy link
Member

Looking into this, I noticed that ngff-writer is using a faster way of down-sampling: by slicing the image (https://github.com/aeisenbarth/ngff-writer/blob/74bd10f169a8953ac560136785dc02c41db6997a/ngff_writer/dask_utils.py#L43)
Using that same function in ome_zarr.scale.nearest() to simply return downscale_nearest(plane, factors=(self.downscale, self.downscale)) fixes the performance issue in this PR (for the small sample image in my scripts above).

@will-moore
Copy link
Member

However, if I also tell ngff-writer to use resize instead of slicing (see diff) then it is still very fast.

diff
$ git diff
diff --git a/ngff_writer/array_utils.py b/ngff_writer/array_utils.py
index a07f571..63a796f 100644
--- a/ngff_writer/array_utils.py
+++ b/ngff_writer/array_utils.py
@@ -9,7 +9,7 @@ import zarr
 from dask import array as da
 
 from ngff_writer.constants import DIMENSION_AXES, SPATIAL_DIMENSIONS
-from ngff_writer.dask_utils import downscale_nearest
+from ngff_writer.dask_utils import downscale_nearest, resize
 from ngff_writer.typing import DimensionAxisType
 from ngff_writer.validation import validate_axes_names
 
@@ -74,7 +74,7 @@ def ngff_spatially_rescale(
             min(dim, round(1 / scale)) if name in SPATIAL_DIMENSIONS else 1
             for dim, name in zip(image.shape, axes_names)
         )
-        _resize = lambda image, out_shape, **kwargs: downscale_nearest(image, factors)
+        _resize = lambda image, out_shape, **kwargs: resize(image, out_shape)
         _smooth = dask_image.ndfilters.gaussian_filter
     else:
         _resize = skimage.transform.resize

@will-moore
Copy link
Member

Another issue with the current writing to disk is that the dype seems to get written as float64 ("dtype": "<f8") even though the dask array dtype was uint16. I don't really know what's causing this.

@toloudis
Copy link
Contributor Author

It might take a bit of work to adopt the approach that @aeisenbarth uses, but it's certainly what's needed! @toloudis Do you want to look into that, or I can have a go if you want?

I am unable to work on this over the next couple of weeks. I will return to it when I can but please feel free to move forward.

resize() gets messed-up, so pyramid gets bigger instead of smaller
@will-moore
Copy link
Member

That's a fair point - there are certainly cases where the data may be delayed, but not necessarily suffers the performance hit we saw.

So we should probably have tests for dask in all those write multiscale/image/labels.
I added just the 1 test earlier, but it might be easier to extend coverage by adding a use_dask parametrize to an existing test(s) with:

if use_dask:
    data = da.from_array(data)`

@aeisenbarth
Copy link
Contributor

In my tests, I use a similar pattern for testing multiple array implementations:

@pytest.mark.parametrize("array_constructor", [np.array, da.from_array, zarr.array])
def test_func(array_constructor):
    ...
    data = array_constructor(data)

@joshmoore
Copy link
Member

Re-launched workflows. @toloudis, anything outstanding from your side?

@toloudis
Copy link
Contributor Author

toloudis commented Jul 11, 2022

There is probably some work to be done to consolidate different methods inside of scaler (by_plane vs the dask codepath that does something different), and, possibly related, I still have performance concerns for non-nearest scaling with large dask arrays resulting in large dask compute graphs... but I think functionally this is working and all tests passing.

@@ -302,6 +315,7 @@ def write_multiscales_metadata(
dict(
version=fmt.version,
datasets=_validate_datasets(datasets, ndim, fmt),
name=name if name else group.name,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line might be too heavyhanded, as the spec says that name "SHOULD" be provided but not "MUST". Is it ok for this code to write out group.name? I believe there were tests that depended on name having been written.

Copy link
Member

@sbesson sbesson Jul 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting proposal. I would also tend to agree that writers should attempt to be as compliant with the recommended specification as possible.
I also suspect that using the group name is already the fallback behavior of consumers like napari when no name metadata is found. In the case of HCS specification (and other nested layout), this behavior might lead to uninformative names (like 0, 1...) but I don't know that it leaves us in a worse place than now.

@toloudis
Copy link
Contributor Author

The thing I said about refining the Scaler is probably good for a separate PR and not this one. Merging this PR should not be disruptive to the non-dask workflow, and only enables dask arrays to be passed in.

@jakirkham
Copy link

Is there anything else needed here before merging? Sounds like no, but wanted to check 🙂

Copy link
Member

@sbesson sbesson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few changes requested inline but nothing major.

Coming back to the API questions raised in #192 (review), I think the following commits addressed most of my questions but the resolution state is a bit unclear. Would it possible to have a short summary of the list of APIs which have been updated to support dask either as a comment or as an update of the PR description? This can also be used directly in the release notes.

A few additional question regarding integration & release:

  • what is the status in terms of functional testing with the latest state of this proposed change? Note Best practices for generating multiscale zarr data? #215 might also provide a potential use case for the functionality
  • are there any follow-up todos planned on top of this work before they can be included in an upcoming 0.x.0 release?
  • as this is probably the biggest ongoing piece of work on this library, are we happy to merge once all outstanding comments have been cleared and possibly release as a alpha/beta/release candidate to gain some experience before making a final release?

ome_zarr/scale.py Outdated Show resolved Hide resolved
@@ -176,6 +178,7 @@ def write_multiscale(
axes: Union[str, List[str], List[Dict[str, str]]] = None,
coordinate_transformations: List[List[Dict[str, Any]]] = None,
storage_options: Union[JSONDict, List[JSONDict]] = None,
name: str = None,
Copy link
Member

@sbesson sbesson Jul 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was confused that this was not already implemented since write_multiscale_labels delegates name=name but I think this is handled via the **metadata kwargs.

I am unaware of any issue with being explicit but the API doc of write_multiscale will need to be reviewed and updated accordingly. For the same API, the docstring of pyramid could also be specified to be explicit about the type of arrays supported

@@ -302,6 +315,7 @@ def write_multiscales_metadata(
dict(
version=fmt.version,
datasets=_validate_datasets(datasets, ndim, fmt),
name=name if name else group.name,
Copy link
Member

@sbesson sbesson Jul 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting proposal. I would also tend to agree that writers should attempt to be as compliant with the recommended specification as possible.
I also suspect that using the group name is already the fallback behavior of consumers like napari when no name metadata is found. In the case of HCS specification (and other nested layout), this behavior might lead to uninformative names (like 0, 1...) but I don't know that it leaves us in a worse place than now.

ome_zarr/writer.py Outdated Show resolved Hide resolved
@@ -573,7 +703,7 @@ def write_multiscale_labels(


def write_labels(
labels: np.ndarray,
labels: Union[np.ndarray, da.Array],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, could use ArrayLike if we define this alias in the class

ome_zarr/writer.py Show resolved Hide resolved
@@ -125,6 +130,30 @@ def __create_group(
series.append({"path": path})
return grp

def resize_image(self, image: ArrayLike) -> ArrayLike:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is different from all the others on Scaler class as it returns a single Array (not a Pyramid of arrays). It is only used if the image in writer.write_image(image, group, scaler) is a dask array. Otherwise, mip = scaler.nearest(image) is used to create a pyramid for non-dask data. If you pass in a scaler instance that doesn't implement scaler.resize_image() then write_image(dask_image, group, scaler) will fail. This is unexpected and not documented anywhere.

When a scaler instance is passed in to write_image() none of the other methods on it are used (gaussian(), laplacian(), local_mean(), zoom()). If you wish to use one of these methods for downsampling (non-dask only), you need to do it before passing the data to write_multiscale(). So maybe these methods should be on a different Scaler class than the Scaler that is used for write_image(i, g, scaler) and the same scaling method should be used for dask and non-dask data (to return a single Image, not a pyramid)?

The write_image() docs for scaler parameter state If this is None, no downsampling will be performed. but that isn't true for dask_image data - this will fail if scaler is None.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that none of these issues is a blocker - Improving the Scaler documentation and usage - Using the scaler.resize_image() instead of scaler.nearest() for dask AND non-dask data, possibly removing other methods from the class etc could come in a follow-up PR (even a follow-up release). cc @joshmoore?

Probably the most important fix should be handling write_image(dask_data, group, scaler=None) which I think would currently fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would agree that dask versions of gaussian, laplacian, etc could be done in later PRs.
I can try to do one more pass this afternoon to make sure scaler=None does not break with dask array.
As I'm sure is true with many others, I am splitting my time among many different projects, so thanks for living with these intermittent pieces of work on this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a quick look and I am already handling scaler=None in _write_dask_image: (1) max_layer set to 0 if scaler is None, and (2) don't call scaler.resize_image if scaler is None

@toloudis
Copy link
Contributor Author

I'm wondering what the status is on this PR.
If this is still on the good path to being merged, what work is remaining? Maybe time for a re-review?
(I'm eager to get an aicsimageio writer published using this code.)
Also-- if it would make life easier for additional contributions, I'm happy to just have all the changes moved to an internal branch or someone else's fork. I'm not sure what the thoughts are on merging and then doing additional refinements in followup PRs.

@sbesson
Copy link
Member

sbesson commented Aug 26, 2022

@toloudis thanks for re-upping the priority of this now that everyone is more or less back from holidays. The questions you bring up pretty much echo what I said in #192 (review) so I believe we are all on the same page although a bit looking at each other :)

As a preamble, I am transitioning to a new role where I may or may not remain actively involved in the code base. In this context, it feels sensible to let you guys drive the next set of additions to the library and I'll make sure that Will and the rest of the OME team have the necessary permissions to do so.

My personal opinion is that the dask support is demanded feature, this is a large contribution so there is interest in prioritizing its integration and planning its officual release. Both you and @will-moore have the best knowledge of the state of things. Assuming you are both happy with the current state, my inclination would be to get this merged in master and immediately cut a pre-release so that various use cases can start consuming it and providing feedback and/or contributions.

This leaves the wider question of the roadmap/timeline towards a final release with dask support. If that's something that cannot be decided at this time, maybe it is worth creating a 0.6.0 milestone so that we can capture issues from this PR as well as future feedback as issues.

@will-moore
Copy link
Member

Just tested this again using the sample at #219 and this seems to be working.

We'll need some follow-up additions to the documentation, but this is looking good for a pre-release as @sbesson suggested.

@will-moore will-moore merged commit 4f9cd24 into ome:master Aug 26, 2022
@jakirkham
Copy link

Thanks all! It is great to see this in 😄

Comment on lines +28 to +29
ListOfArrayLike = Union[List[da.Array], List[np.ndarray]]
ArrayLike = Union[da.Array, np.ndarray]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps in the future these could be some kind of general Array (once that is more fleshed out).

@will-moore
Copy link
Member

This is now released in ome-zarr-py 0.6.0.
Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants