Skip to content

Commit

Permalink
Refine cluster example for tutorial
Browse files Browse the repository at this point in the history
  • Loading branch information
cmalinmayor committed Jun 27, 2024
1 parent afcaae5 commit ae31f48
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 126 deletions.
128 changes: 65 additions & 63 deletions examples/tutorial.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,102 +420,104 @@ def smooth_in_block_dask(x):

# %% [markdown]
# # Distributing on the Cluster
# TODO
# While daisy can run locally, it is designed to shine in a cluster computing environment. The only information passed between scheduler and workers are Blocks, which are extremely lightweight and are communicated through TCP. Therefore, workers can be distributed on the cluster with minimal communication overhead.
#
# Let's re-do our smoothing, but this time run each worker as a completely separate subprocess, as would be needed on a cluster. First, we prepare the output dataset.

# %%
# tutorial worker code in tutorial_worker.py

# first, prepare the dataset
prepare_smoothing_ds("smoothed_subprocess")

# %%
prepare_ds(
"sample_data.zarr",
"smoothed_subprocess",
total_roi=total_roi,
voxel_size=daisy.Coordinate((1,1)),
dtype=raw_data_float.dtype,
write_size=block_size,
num_channels=n_channels,
)

# %% [markdown]
# Then, we prepare our process function. This time, it has two parts. The first part is the function defined in the cell below, and essentially just calls `subprocess.run` locally or with bsub, as an example compute environment. The second part is the external python script that is actually executed in the `subprocess.run` call.

# %%
# new process function to start the worker subprocess
def start_subprocess_worker(cluster="local"):
import subprocess
if cluster == "bsub":
# this is where you define your cluster arguments specific to your task (gpus, cpus, etc)
num_cpus_per_worker = 1
subprocess.run(["bsub", "-I", f"-n {num_cpus_per_worker}", "python", "./tutorial_worker.py"])
elif cluster== "local":
subprocess.run(["python", "./tutorial_worker.py"])
else:
raise ValueError("Only bsub and local currently supported for this tutorial")



# %%
# scheduler
from functools import partial

# note: Must be on submit node to run this with bsub argument
tutorial_task = daisy.Task(
"smoothing_subprocess",
total_roi=total_read_roi,
read_roi=read_roi,
write_roi=block_roi,
process_function=partial(start_subprocess_worker, "bsub"),
num_workers=2,
fit="shrink",
)

daisy.run_blockwise([tutorial_task])

# %%
plt.imshow(zarr.open('sample_data.zarr', 'r')['smoothed_subprocess'][:].transpose(1, 2, 0), origin="lower")

# %% [markdown]
# ## Passing "arguments" to your subprocess

# %%
prepare_ds(
"sample_data.zarr",
"smoothed_subprocess_config",
total_roi=total_write_roi,
voxel_size=daisy.Coordinate((1,1)),
dtype=raw_data_float.dtype,
write_size=block_size,
num_channels=n_channels,
)


# %%
def start_subprocess_worker_config(cluster="local"):
import subprocess
if cluster == "bsub":
num_cpus_per_worker = 4
subprocess.run(["bsub", "-I", f"-n {num_cpus_per_worker}", "python", "./tutorial_worker_config.py", "tutorial_config.json"])
elif cluster== "local":
subprocess.run(["python", "./tutorial_worker_config.py", "tutorial_config.json"])
else:
raise ValueError("Only bsub and local currently supported for this tutorial")
# Code from tutorial_worker.py, copied here for convenience (Note: running this cell won't run the code, because it is a markdown cell)
# ``` python
# import daisy
# import logging
# import time
# from funlib.persistence.arrays import open_ds, Array
# from skimage import filters
# import sys
# import json
#
#
# # This function is the same as the local function, but we can pass as many different arguments as we want, and we don't need to import inside it
# def smooth_in_block(block: daisy.Block, config: dict):
# sigma = config["sigma"]
# raw_ds = open_ds(config["input_zarr"], config["input_group"], "r",)
# data = raw_ds.to_ndarray(block.read_roi, fill_value=0)
# smoothed = filters.gaussian(data, sigma=sigma, channel_axis=0)
# output_ds = open_ds(config["output_zarr"], config["output_group"], 'a')
# smoothed = Array(smoothed, roi=block.read_roi, voxel_size=(1, 1))
# output_ds[block.write_roi] = smoothed.to_ndarray(block.write_roi)
#
#
# if __name__ == "__main__":
# # load a config path or other parameters from the sysargs (recommended to use argparse argument parser for anything more complex)
# config_path = sys.argv[1]
#
# # load the config
# with open(config_path) as f:
# config = json.load(f)
#
# # simulate long setup time (e.g. loading a model)
# time.sleep(20)
#
# # set up the daisy client (this is done by daisy automatically in the local example)
# # it depends on environment variables to determine configuration
# client = daisy.Client()
#
# while True:
# # ask for a block from the scheduler
# with client.acquire_block() as block:
#
# # The scheduler will return None when there are no more blocks left
# if block is None:
# break
#
# # process your block!
# # Note: you can now define whatever function signature you want, rather than being limited to one block argument
# smooth_in_block(block, config)
#
# ```


# %% [markdown]
# The most important thing to notice about the new worker script is the use of the `client.acquire_block()` function. No longer does our process function accept a block as input - instead, it has no arguments, and is expected to specifically request a block. This means that rather than spawning one worker per block, the workers are persistent for the full time the task is running, and can request process and return many blocks.
#
# This is particularly helpful when worker startup is expensive - loading saved network weights can be more expensive than actually predicting for one block, so you definitely would not want to load the model separately for each block. We have simulated this by using time.sleep() in the setup of the worker, so when you run the next cell, it should take 20 seconds to start up and then the blocks should process quickly after that.

# %%
# note: Must be on submit node to run this with bsub argument
tutorial_task = daisy.Task(
"smoothing_subprocess_config",
"smoothing_subprocess",
total_roi=total_read_roi,
read_roi=read_roi,
write_roi=block_roi,
process_function=partial(start_subprocess_worker_config, "local"),
process_function=partial(start_subprocess_worker, "local"),
num_workers=2,
fit="shrink",
)

daisy.run_blockwise([tutorial_task])

# %%
plt.imshow(zarr.open('sample_data.zarr', 'r')['smoothed_subprocess_config'][:].transpose(1, 2, 0), origin="lower")
plt.imshow(zarr.open('sample_data.zarr', 'r')['smoothed_subprocess'][:].transpose(1, 2, 0), origin="lower")

# %% [markdown]
# # Important Features
Expand Down
2 changes: 1 addition & 1 deletion examples/tutorial_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"input_zarr": "sample_data.zarr",
"output_zarr": "sample_data.zarr",
"input_group": "raw",
"output_group": "smoothed_subprocess_config",
"output_group": "smoothed_subprocess",
"sigma": 5
}
43 changes: 24 additions & 19 deletions examples/tutorial_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,44 @@
import time
from funlib.persistence.arrays import open_ds, Array
from skimage import filters
import time
import sys
import json

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def smooth_in_block(block: daisy.Block):
sigma = 5.0

raw_ds = open_ds('sample_data.zarr', 'raw', "r",)
# This function is the same as the local function, but we can pass as many different arguments as we want, and we don't need to import inside it
def smooth_in_block(block: daisy.Block, config: dict):
sigma = config["sigma"]
raw_ds = open_ds(config["input_zarr"], config["input_group"], "r",)
data = raw_ds.to_ndarray(block.read_roi, fill_value=0)
smoothed = filters.gaussian(data, sigma=sigma, channel_axis=0)

output_ds = open_ds('sample_data.zarr', 'smoothed_subprocess', 'a')

output_ds = open_ds(config["output_zarr"], config["output_group"], 'a')
smoothed = Array(smoothed, roi=block.read_roi, voxel_size=(1, 1))
output_ds[block.write_roi] = smoothed.to_ndarray(block.write_roi)


if __name__ == "__main__":
# load a config path or other parameters from the sysargs (recommended to use argparse argument parser for anything more complex)
config_path = sys.argv[1]

# load the config
with open(config_path) as f:
config = json.load(f)

# simulate long setup time (e.g. loading a model)
time.sleep(20)

# set up the daisy client (this is done by daisy automatically in the local example)
# it depends on environment variables to determine configuration
client = daisy.Client()
print("Client:", client)

# simlate long setup time (e.g. loading a model)
time.sleep(50)

while True:
logger.info("getting block")
# ask for a block from the scheduler
with client.acquire_block() as block:

# The scheduler will return None when there are no more blocks left
if block is None:
break

logger.info(f"got block {block}")
smooth_in_block(block)

logger.info(f"releasing block: {block}")
# process your block!
# Note: you can now define whatever function signature you want, rather than being limited to one block argument
smooth_in_block(block, config)
43 changes: 0 additions & 43 deletions examples/tutorial_worker_config.py

This file was deleted.

0 comments on commit ae31f48

Please sign in to comment.