From 1e22087fea2d383795d9a4f6ed05f846b0f4e059 Mon Sep 17 00:00:00 2001 From: Caroline Malin-Mayor Date: Wed, 26 Jun 2024 21:59:02 -0400 Subject: [PATCH] Add fault tolerance, task chaining, and read write conflict examples --- examples/tutorial.py | 428 +++++++++++++++++++++++++++-- examples/tutorial_config.json | 7 + examples/tutorial_worker.py | 2 + examples/tutorial_worker_config.py | 43 +++ 4 files changed, 464 insertions(+), 16 deletions(-) create mode 100644 examples/tutorial_config.json create mode 100644 examples/tutorial_worker_config.py diff --git a/examples/tutorial.py b/examples/tutorial.py index c034bb57..031ae9d6 100644 --- a/examples/tutorial.py +++ b/examples/tutorial.py @@ -30,7 +30,7 @@ # multiprocessing.set_start_method("fork") # %% [markdown] -# ## Building Blocks +# # Building Blocks # %% [markdown] # Daisy is designed for processing volumetric data. Therefore, it has specific ways to describe locations in a volume. We will demonstrate the common terms and utilities using this image of astronaut Eileen Collins. @@ -44,7 +44,7 @@ axes_image = plt.imshow(raw_data, zorder=1, origin="lower") # %% [markdown] -# ### Coordinate +# ## Coordinate # - A daisy Coordinate is essentially a tuple with one value per spatial dimension. In our case, the Coordinates are two-dimensional. # - Daisy coordinates can represent points in the volume, or distances in the volume. # - Daisy mostly passes around abstract placeholders of the data. Therefore, a Coordinate does not contain data, it is simply a pointer to a location in a volume. @@ -74,7 +74,7 @@ def display_coord(axes, coord, color): figure # %% [markdown] -# ### Roi +# ## Roi # A Roi (Region of interest) is a bounding box in a volume. It is defined by two Coordinates: # - offset: the starting corner of the bounding box relative to the origin # - shape: The extent of the bounding box in each dimension @@ -115,7 +115,7 @@ def fresh_image(): display_roi(figure.axes[0], roi, color=color) # %% [markdown] -# ### Array +# ## Array # So far we have seen how to specify regions of the data with Rois and Coordinates, which do not contain any data. However, eventually you will need to access the actual data using your Rois! For this, we use [`funlib.persistence.arrays.Array`](https://github.com/funkelab/funlib.persistence/blob/f5310dddb346585a28f3cb44f577f77d4f5da07c/funlib/persistence/arrays/array.py). If you are familiar with dask, this is the daisy equivalent of dask arrays. # # The core information about the `funlib.persistence.arrays.Array` class is that you can slice them with Rois, along with normal numpy-like slicing. However, in order to support this type of slicing, we need to also know the Roi of the whole Array. Here we show you how to create an array from our raw data that is held in memory as a numpy array. However, we highly recommend using a zarr backend, and will show this in our simple example next! @@ -161,7 +161,7 @@ def fresh_image(): plt.imshow(body_data.transpose(1, 2, 0), origin="lower") # %% [markdown] -# ### Block +# ## Block # # Daisy is a blockwise task scheduler. Therefore, the concept of a block is central to Daisy. To efficiently process large volumes, Daisy splits the whole volume into a set of adjacent blocks that cover the whole image. These blocks are what is passed between the scheduler and the workers. # @@ -195,11 +195,11 @@ def fresh_image(): # You may be wondering why the block has a read roi and a write roi - this will be illustrated next in our simple daisy example! # %% [markdown] -# ## A Simple Example: Local Smoothing +# # A Simple Example: Local Smoothing # TODO: Intro # %% [markdown] -# ### Dataset Preparation +# ## Dataset Preparation # As mentioned earlier, we highly recommend using a zarr backend for your volume. Daisy is designed such that no data is transmitted between the worker and the scheduler, including the output of the processing. That means that each worker is responsible for saving the results in the given block write_roi. With a zarr backend, each worker can write to a specific region of the zarr in parallel, assuming that the chunk size is a multiple of the write_roi. # # TODO: .... @@ -235,13 +235,14 @@ def fresh_image(): # %% [markdown] -# ### Define our Process Function +# ## Define our Process Function # TODO: Describe # %% def smooth_in_roi(block: daisy.Block): from funlib.persistence.arrays import open_ds from skimage import filters + sigma = 5.0 raw_ds = open_ds('sample_data.zarr', 'raw', "r",) data = raw_ds.to_ndarray(block.read_roi) @@ -257,7 +258,7 @@ def smooth_in_roi(block: daisy.Block): plt.imshow(zarr.open('sample_data.zarr', 'r')['smoothed'][:].transpose(1, 2, 0), origin="lower") # %% [markdown] -# ### Run daisy with local multiprocessing +# ## Run daisy with local multiprocessing # TODO: Describe # %% @@ -269,6 +270,7 @@ def smooth_in_roi(block: daisy.Block): read_roi=block_roi, write_roi=block_roi, fit="shrink", + num_workers=5, ) ] ) @@ -277,7 +279,7 @@ def smooth_in_roi(block: daisy.Block): plt.imshow(zarr.open('sample_data.zarr', 'r')['smoothed'][:].transpose(1, 2, 0), origin="lower") # %% [markdown] -# #### Take 2: Add context! +# ### Take 2: Add context! # %% prepare_ds( @@ -318,6 +320,7 @@ def smooth_in_roi_with_context(block: daisy.Block): read_roi=block_roi, write_roi=block_roi, fit="shrink", + num_workers=5, ) ] ) @@ -326,7 +329,7 @@ def smooth_in_roi_with_context(block: daisy.Block): plt.imshow(zarr.open('sample_data.zarr', 'r')['smoothed_with_context'][:].transpose(1, 2, 0), origin="lower") # %% [markdown] -# #### Take 3: The Daisy Way +# ### Take 3: The Daisy Way # %% prepare_ds( @@ -370,6 +373,7 @@ def smooth_in_block(block: daisy.Block): read_roi=read_roi, write_roi=block_roi, fit="shrink", + num_workers=5, ) ] ) @@ -378,7 +382,7 @@ def smooth_in_block(block: daisy.Block): plt.imshow(zarr.open('sample_data.zarr', 'r')['smoothed_blockwise'][:].transpose(1, 2, 0), origin="lower") # %% [markdown] -# ### Conclusion: Dask and Daisy +# ## Conclusion: Dask and Daisy # # Phew! We managed to smooth the image, and along the way you learned the basics of Daisy. In this example, we only parallelized the processing using our local computer's resources, and our "volume" was very small. # @@ -399,17 +403,17 @@ def smooth_in_block(block: daisy.Block): sigma = 5.0 context = int(sigma) * 2 -def smooth_in_block(x): +def smooth_in_block_dask(x): return filters.gaussian(x, sigma=sigma, channel_axis=0) -smoothed = raw.map_overlap(smooth_in_block, depth=(0, context, context)) +smoothed = raw.map_overlap(smooth_in_block_dask, depth=(0, context, context)) plt.imshow(smoothed.transpose((1, 2, 0)), origin="lower") # %% [markdown] # Notice that there is no saving of the output in a zarr - we simply recieve the whole thing in memory. On this small example, this works very well, but when the whole volume is too large to fit in memory, we need to save the output to disk, as done in daisy. Additionally, dask generates more overhead than daisy, which can be a problem with larger and more complex problems. With daisy, the only object that is sent between the scheduler and the worker is a Block, which as we have learned is lightweight. # %% [markdown] -# ## Distributing on the Cluster +# # Distributing on the Cluster # TODO # %% @@ -446,7 +450,7 @@ def start_subprocess_worker(cluster="local"): # scheduler from functools import partial -# note: Must be on submit node to run this +# note: Must be on submit node to run this with bsub argument tutorial_task = daisy.Task( "smoothing_subprocess", total_roi=total_roi, @@ -462,4 +466,396 @@ def start_subprocess_worker(cluster="local"): # %% plt.imshow(zarr.open('sample_data.zarr', 'r')['smoothed_subprocess'][:].transpose(1, 2, 0), origin="lower") +# %% [markdown] +# ## Passing "arguments" to your subprocess + +# %% +prepare_ds( + "sample_data.zarr", + "smoothed_subprocess_config", + total_roi=total_roi, + voxel_size=daisy.Coordinate((1,1)), + dtype=raw_data_float.dtype, + write_size=block_size, + num_channels=n_channels, +) + + +# %% +def start_subprocess_worker_config(cluster="local"): + import subprocess + if cluster == "bsub": + num_cpus_per_worker = 4 + subprocess.run(["bsub", "-I", f"-n {num_cpus_per_worker}", "python", "./tutorial_worker_config.py", "tutorial_config.json"]) + elif cluster== "local": + subprocess.run(["python", "./tutorial_worker_config.py", "tutorial_config.json"]) + else: + raise ValueError("Only bsub and local currently supported for this tutorial") + + + +# %% +# note: Must be on submit node to run this with bsub argument +tutorial_task = daisy.Task( + "smoothing_subprocess_config", + total_roi=total_roi, + read_roi=read_roi, + write_roi=block_roi, + process_function=partial(start_subprocess_worker_config, "local"), + num_workers=2, + fit="shrink", +) + +daisy.run_blockwise([tutorial_task]) + +# %% +plt.imshow(zarr.open('sample_data.zarr', 'r')['smoothed_subprocess_config'][:].transpose(1, 2, 0), origin="lower") + +# %% [markdown] +# # Important Features + +# %% [markdown] +# ## Fault tolerance and the pre-check function + +# %% +prepare_ds( + "sample_data.zarr", + "fault_tolerance", + total_roi=total_roi, + voxel_size=daisy.Coordinate((1,1)), + dtype=raw_data_float.dtype, + write_size=block_size, + num_channels=n_channels, +) + + +# %% +# simulate failing 50% of the time +def smooth_in_block_with_failure(block: daisy.Block): + import random + + if random.random() < 0.5: + raise ValueError("Simulating random failure") + + from funlib.persistence.arrays import open_ds, Array + from skimage import filters + + sigma = 5.0 + + raw_ds = open_ds('sample_data.zarr', 'raw', "r",) + data = raw_ds.to_ndarray(block.read_roi) + smoothed = filters.gaussian(data, sigma=sigma, channel_axis=0) + + output_ds = open_ds('sample_data.zarr', 'fault_tolerance', 'a') + + smoothed = Array(smoothed, roi=block.read_roi, voxel_size=(1, 1)) + output_ds[block.write_roi] = smoothed.to_ndarray(block.write_roi) + + +# %% +sigma = 5 +context = int(sigma) * 2 +read_roi = block_roi.grow(context, context) + +daisy.run_blockwise([ + daisy.Task( + "fault tolerance test", + process_function=smooth_in_block_with_failure, + total_roi=total_roi, + read_roi=read_roi, + write_roi=block_roi, + fit="shrink", + read_write_conflict=False, + num_workers=5, + ) + ] +) + +# %% +plt.imshow(zarr.open('sample_data.zarr', 'r')['fault_tolerance'][:].transpose(1, 2, 0), origin="lower") + +# %% [markdown] +# Why is so much more than 60% done? Answer: "max retries" +# TODO: Also mention log files here + +# %% +root = zarr.open("sample_data.zarr", 'a') +del root['fault_tolerance'] +prepare_ds( + "sample_data.zarr", + "fault_tolerance", + total_roi=total_roi, + voxel_size=daisy.Coordinate((1,1)), + dtype=raw_data_float.dtype, + write_size=block_size, + num_channels=n_channels, +) + +daisy.run_blockwise([ + daisy.Task( + "fault tolerance test", + process_function=smooth_in_block_with_failure, + total_roi=total_roi, + read_roi=read_roi, + write_roi=block_roi, + fit="shrink", + read_write_conflict=False, + max_retries=1, + num_workers=3, + ) + ] +) + +# %% +plt.imshow(zarr.open('sample_data.zarr', 'r')['fault_tolerance'][:].transpose(1, 2, 0), origin="lower") + + +# %% [markdown] +# If we re-run enough times, eventually all the holes will fill. But, we can do something smarter! + +# %% +def check_complete(output_group, block): + from funlib.persistence.arrays import open_ds + import numpy as np + output_ds = open_ds('sample_data.zarr', output_group, 'r') + if np.max(output_ds.to_ndarray(block.write_roi)) > 0: + return True + else: + return False + + +daisy.run_blockwise([ + daisy.Task( + "fault tolerance test", + process_function=smooth_in_block_with_failure, + total_roi=total_roi, + read_roi=read_roi, + write_roi=block_roi, + fit="shrink", + read_write_conflict=False, + max_retries=1, + check_function=partial(check_complete, "fault_tolerance") + ) + ] +) + +# %% +plt.imshow(zarr.open('sample_data.zarr', 'r')['fault_tolerance'][:].transpose(1, 2, 0), origin="lower") + + +# %% [markdown] +# Note: your pre-check function has to be faster than your actual function for this to be worth it. We recommend saving the block id as a file in a shared file system or database at the end of the worker function. + +# %% [markdown] +# ## Task chaining + +# %% [markdown] +# Say we have a function to segment out instances of blue objects in an image, and we want to apply it after smoothing. We can define two tasks and run them sequentially in the scheduler. + +# %% +def smooth_in_block(output_group, block: daisy.Block): + from funlib.persistence.arrays import open_ds, Array + from skimage import filters + + sigma = 5.0 + + raw_ds = open_ds('sample_data.zarr', 'raw', "r",) + data = raw_ds.to_ndarray(block.read_roi) + smoothed = filters.gaussian(data, sigma=sigma, channel_axis=0) + + output_ds = open_ds('sample_data.zarr', output_group, 'a') + + smoothed = Array(smoothed, roi=block.read_roi, voxel_size=(1, 1)) + output_ds[block.write_roi] = smoothed.to_ndarray(block.write_roi) + + +# %% +# %pip install opencv-python + +# %% +def segment_blue_objects(input_group, output_group, block): + import cv2 + from funlib.persistence.arrays import open_ds, Array + import numpy as np + import skimage + + input_ds = open_ds('sample_data.zarr', input_group, "r",) + data = input_ds.to_ndarray(block.read_roi) + + back_to_skimage = (data.transpose(1,2,0) * 255).astype(np.uint8) + cv2_image = cv2.cvtColor(skimage.util.img_as_ubyte(back_to_skimage), cv2.COLOR_RGB2BGR) + hsv_image = cv2.cvtColor(cv2_image, cv2.COLOR_BGR2HSV) + # Define the color range for detection + lower_blue = np.array([100,30,0]) + upper_blue = np.array([150,255,255]) + # Threshold the image to get only blue colors + mask = cv2.inRange(hsv_image, lower_blue, upper_blue) + mask = mask.astype(np.uint16) + + mask = mask // 255 + labels = skimage.measure.label(mask) + # get a unique ID for each element in the whole volume (avoid repeats between blocks) + block_id_mask = mask * (block.block_id[1]) + labels = labels + block_id_mask + + output_ds = open_ds('sample_data.zarr', output_group, 'a') + output_ds[block.write_roi] = labels + + +# %% + +prepare_ds( + "sample_data.zarr", + "smoothed_for_seg", + total_roi=total_roi, + voxel_size=daisy.Coordinate((1,1)), + dtype=raw_data_float.dtype, + write_size=block_size, + num_channels=n_channels, +) + +sigma = 5 +context = int(sigma) * 2 +read_roi = block_roi.grow(context, context) + +smoothing_task = daisy.Task( + "smooth_for_seg", + process_function=partial(smooth_in_block, "smoothed_for_seg"), + total_roi=total_roi, + read_roi=read_roi, + write_roi=block_roi, + fit="shrink", + num_workers=5, + read_write_conflict=False, + check_function=partial(check_complete, "smoothed_for_seg") +) + +# %% +# you can have different block sizes in different tasks +seg_block_roi = daisy.Roi((0,0), (128, 128)) + +prepare_ds( + "sample_data.zarr", + "blue_objects", + total_roi=total_roi, + voxel_size=daisy.Coordinate((1,1)), + dtype=np.uint8, + write_size=seg_block_roi.shape, +) + +seg_task = daisy.Task( + "segmentation", + process_function=partial(segment_blue_objects, "smoothed_for_seg", "blue_objects"), + total_roi=total_roi, + read_roi=seg_block_roi, + write_roi=seg_block_roi, + fit="shrink", + read_write_conflict=False, + num_workers=5, + upstream_tasks=[smoothing_task], +) + +# %% +daisy.run_blockwise([smoothing_task, seg_task]) + +# %% +from skimage.color import label2rgb +figure, axes = plt.subplots(1, 2) +axes[0].imshow(zarr.open('sample_data.zarr', 'r')['smoothed_for_seg'][:].transpose(1,2,0), origin="lower") +axes[1].imshow(label2rgb(zarr.open('sample_data.zarr', 'r')['blue_objects'][:]), origin="lower") + +# %% [markdown] +# ## Process functions that need to read their neighbor's output (and the "read_write_conflict" flag) + +# %% [markdown] +# How can we resolve the labels of adjacent blocks? + +# %% +import logging + +def segment_blue_objects_with_context(input_group, output_group, block): + import cv2 + from funlib.persistence.arrays import open_ds, Array + import numpy as np + import skimage + import logging + import time + + def get_overlapping_labels(array1, array2): + array1 = array1.flatten() + array2 = array2.flatten() + # get indices where both are not zero (ignore background) + # this speeds up computation significantly + non_zero_indices = np.logical_and(array1, array2) + flattened_stacked = np.array([array1[non_zero_indices], array2[non_zero_indices]]) + intersections = np.unique(flattened_stacked, axis=1) + return intersections # a x 2 nparray + + input_ds = open_ds('sample_data.zarr', input_group, "r",) + output_ds = open_ds('sample_data.zarr', output_group, 'a') + data = input_ds.to_ndarray(block.read_roi) + existing_labels = output_ds.to_ndarray(block.read_roi) + + back_to_skimage = (data.transpose(1,2,0) * 255).astype(np.uint8) + cv2_image = cv2.cvtColor(skimage.util.img_as_ubyte(back_to_skimage), cv2.COLOR_RGB2BGR) + hsv_image = cv2.cvtColor(cv2_image, cv2.COLOR_BGR2HSV) + # Define the color range for detection + lower_blue = np.array([100,30,0]) + upper_blue = np.array([150,255,255]) + # Threshold the image to get only blue colors + mask = cv2.inRange(hsv_image, lower_blue, upper_blue) + mask = mask.astype(np.uint16) + + mask = mask // 255 + labels = skimage.measure.label(mask) + # get a unique ID for each element in the whole volume (avoid repeats between blocks) + block_id_mask = mask * (block.block_id[1]) + labels = labels + block_id_mask + + # if there are existing labels, change the label to match + # note: This only works if objects never span multiple rows/columns. If you have long objects like neurons, you need to do true agglomeration + intersections = get_overlapping_labels(labels, existing_labels) + for index in range(intersections.shape[1]): + new_label, old_label = intersections[:, index] + labels[labels == new_label] = old_label + + time.sleep(0.5) + output_ds = open_ds('sample_data.zarr', output_group, 'a') + output_array = Array(labels, roi=block.read_roi, voxel_size=(1, 1)) + output_ds[block.write_roi] = output_array.to_ndarray(block.write_roi) + + +# %% +seg_block_roi = daisy.Roi((0,0), (128, 128)) +seg_block_read_roi = seg_block_roi.grow(10, 10) +root = zarr.open("sample_data.zarr", 'a') +del root['blue_objects_with_context'] +prepare_ds( + "sample_data.zarr", + "blue_objects_with_context", + total_roi=total_roi, + voxel_size=daisy.Coordinate((1,1)), + dtype=np.uint8, + write_size=seg_block_roi.shape, +) + +seg_task = daisy.Task( + "segmentation_with_context", + process_function=partial(segment_blue_objects_with_context, "smoothed_for_seg", "blue_objects_with_context"), + total_roi=total_roi, + read_roi=seg_block_read_roi, + write_roi=seg_block_roi, + fit="shrink", + read_write_conflict=True, + num_workers=10, + upstream_tasks=[smoothing_task], +) +daisy.run_blockwise([seg_task]) + +# %% +from skimage.color import label2rgb +figure, axes = plt.subplots(1, 2) +axes[0].imshow(zarr.open('sample_data.zarr', 'r')['smoothed_for_seg'][:].transpose(1,2,0), origin="lower") +axes[1].imshow(label2rgb(zarr.open('sample_data.zarr', 'r')['blue_objects_with_context'][:]), origin="lower") + # %% diff --git a/examples/tutorial_config.json b/examples/tutorial_config.json new file mode 100644 index 00000000..ddd81faa --- /dev/null +++ b/examples/tutorial_config.json @@ -0,0 +1,7 @@ +{ + "input_zarr": "sample_data.zarr", + "output_zarr": "sample_data.zarr", + "input_group": "raw", + "output_group": "smoothed_subprocess_config", + "sigma": 5 +} \ No newline at end of file diff --git a/examples/tutorial_worker.py b/examples/tutorial_worker.py index d0791a93..80f650c9 100644 --- a/examples/tutorial_worker.py +++ b/examples/tutorial_worker.py @@ -3,6 +3,7 @@ import time from funlib.persistence.arrays import open_ds, Array from skimage import filters +import time logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -32,6 +33,7 @@ def smooth_in_block(block: daisy.Block): break logger.info(f"got block {block}") + time.sleep(0.5) smooth_in_block(block) logger.info(f"releasing block: {block}") diff --git a/examples/tutorial_worker_config.py b/examples/tutorial_worker_config.py new file mode 100644 index 00000000..3c4ae65b --- /dev/null +++ b/examples/tutorial_worker_config.py @@ -0,0 +1,43 @@ +import daisy +import logging +import time +from funlib.persistence.arrays import open_ds, Array +from skimage import filters +import sys +import json + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +def smooth_in_block(block: daisy.Block, config: dict): + sigma = config["sigma"] + + raw_ds = open_ds(config["input_zarr"], config["input_group"], "r",) + data = raw_ds.to_ndarray(block.read_roi) + smoothed = filters.gaussian(data, sigma=sigma, channel_axis=0) + + output_ds = open_ds(config["output_zarr"], config["output_group"], 'a') + + smoothed = Array(smoothed, roi=block.read_roi, voxel_size=(1, 1)) + output_ds[block.write_roi] = smoothed.to_ndarray(block.write_roi) + + +if __name__ == "__main__": + config_path = sys.argv[1] + with open(config_path) as f: + config = json.load(f) + + client = daisy.Client() + print("Client:", client) + + while True: + logger.info("getting block") + with client.acquire_block() as block: + + if block is None: + break + + logger.info(f"got block {block}") + smooth_in_block(block, config) + + logger.info(f"releasing block: {block}")