Skip to content

Commit

Permalink
Use pbapply. Add vignette
Browse files Browse the repository at this point in the history
  • Loading branch information
keller-mark committed Jun 21, 2024
1 parent b978a5c commit ac03900
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 57 deletions.
5 changes: 3 additions & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Suggests:
crul,
Rarr,
vcr (>= 0.6.0),
foreach,
doParallel,
pbapply,
parallel,
future,
bench
1 change: 0 additions & 1 deletion NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,3 @@ export(zarr_open_array)
export(zarr_open_group)
export(zarr_save_array)
export(zb_slice)
importFrom(foreach,"%dopar%")
21 changes: 19 additions & 2 deletions R/options.R
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,28 @@ pizzarr_option_defaults <- list(
pizzarr.parallel_write_enabled = FALSE
)

#' @keywords internal
parse_parallel_option <- function(val) {
if(val == "future") {
return("future")
}
logical_val <- as.logical(val)
integer_val <- as.integer(val)

if(is.na(integer_val)) {
return(logical_val)
}
if(integer_val <= 1) {
return(as.logical(integer_val))
}
return(integer_val)
}

#' @keywords internal
from_env <- list(
PIZZARR_HTTP_STORE_CACHE_TIME_SECONDS = as.integer,
PIZZARR_PARALLEL_READ_ENABLED = as.logical,
PIZZARR_PARALLEL_WRITE_ENABLED = as.logical
PIZZARR_PARALLEL_READ_ENABLED = parse_parallel_option,
PIZZARR_PARALLEL_WRITE_ENABLED = parse_parallel_option
)

# converts e.g. jupyter.log_level to JUPYTER_LOG_LEVEL
Expand Down
7 changes: 5 additions & 2 deletions R/stores.R
Original file line number Diff line number Diff line change
Expand Up @@ -369,9 +369,12 @@ HttpStore <- R6::R6Class("HttpStore",
key <- item_to_key(item)
path <- paste(private$base_path, key, sep="/")

if(getOption("pizzarr.parallel_read_enabled")) {
parallel_option <- getOption("pizzarr.parallel_read_enabled")
is_parallel <- is_truthy_parallel_option(parallel_option)

if(is_parallel) {
# For some reason, the crul::HttpClient fails in parallel settings
# (when used inside foreach %dopar% loops). This alternative
# This alternative
# with HttpRequest and AsyncVaried seems to work.
# Reference: https://docs.ropensci.org/crul/articles/async.html
req <- crul::HttpRequest$new(
Expand Down
8 changes: 8 additions & 0 deletions R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,14 @@ item_to_key <- function(item) {
key
}

#' @keywords internal
is_truthy_parallel_option <- function(val) {
if(val == "future") {
return(TRUE)
}
return(as.logical(as.integer(val)))
}

try_from_zmeta <- function(key, store) {
store$get_consolidated_metadata()$metadata[[key]]
}
Expand Down
68 changes: 38 additions & 30 deletions R/zarr-array.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#' The Zarr Array class.
#' @title ZarrArray Class
#' @docType class
#' @importFrom foreach %dopar%
#' @description
#' Instantiate an array from an initialized store.
#' @param selection Selections are lists containing either scalars, strings, or Slice objects. Two character
Expand Down Expand Up @@ -324,23 +323,27 @@ ZarrArray <- R6::R6Class("ZarrArray",
return(out)
}

if(getOption("pizzarr.parallel_read_enabled")) {
if(!requireNamespace("foreach", quietly = TRUE)) {
stop("Parallel reading requires the 'foreach' package.")
}
parts <- indexer$iter()
part1_results <- foreach::foreach(proj=parts) %dopar% {
private$chunk_getitem_part1(proj$chunk_coords, proj$chunk_sel, out, proj$out_sel, drop_axes = indexer$drop_axes)
}
for(i in seq_along(parts)) {
proj <- parts[[i]]
part1_result <- part1_results[[i]]
private$chunk_getitem_part2(part1_result, proj$chunk_coords, proj$chunk_sel, out, proj$out_sel, drop_axes = indexer$drop_axes)
}
} else {
for(proj in indexer$iter()) {
private$chunk_getitem(proj$chunk_coords, proj$chunk_sel, out, proj$out_sel, drop_axes = indexer$drop_axes)
parallel_option <- getOption("pizzarr.parallel_read_enabled")
cl <- parse_parallel_option(parallel_option)
is_parallel <- is_truthy_parallel_option(cl)

apply_func <- lapply
if(is_parallel) {
if(!requireNamespace("pbapply", quietly = TRUE)) {
stop("Parallel reading requires the 'pbapply' package.")
}
apply_func <- pbapply::pblapply
}

parts <- indexer$iter()
part1_results <- apply_func(parts, function(proj, cl = NA) {
private$chunk_getitem_part1(proj$chunk_coords, proj$chunk_sel, out, proj$out_sel, drop_axes = indexer$drop_axes)
}, cl = cl)

for(i in seq_along(parts)) {
proj <- parts[[i]]
part1_result <- part1_results[[i]]
private$chunk_getitem_part2(part1_result, proj$chunk_coords, proj$chunk_sel, out, proj$out_sel, drop_axes = indexer$drop_axes)
}

# Return scalar instead of zero-dimensional array.
Expand Down Expand Up @@ -455,21 +458,26 @@ ZarrArray <- R6::R6Class("ZarrArray",
stop("Unknown data type for setting :(")
}

if(getOption("pizzarr.parallel_write_enabled")) {
if(!requireNamespace("foreach", quietly=TRUE)) {
stop("Parallel writing requires the 'foreach' package.")
}
foreach::foreach(proj=indexer$iter(), .combine = c, .inorder = FALSE, .init = NULL) %dopar% {
chunk_value <- private$get_chunk_value(proj, indexer, value, selection_shape)
private$chunk_setitem(proj$chunk_coords, proj$chunk_sel, chunk_value)
NULL # return null since we are not using the combined result
}
} else {
for (proj in indexer$iter()) {
chunk_value <- private$get_chunk_value(proj, indexer, value, selection_shape)
private$chunk_setitem(proj$chunk_coords, proj$chunk_sel, chunk_value)
parallel_option <- getOption("pizzarr.parallel_write_enabled")
cl <- parse_parallel_option(parallel_option)
is_parallel <- is_truthy_parallel_option(cl)

apply_func <- lapply
if(is_parallel) {
if(!requireNamespace("pbapply", quietly=TRUE)) {
stop("Parallel writing requires the 'pbapply' package.")
}
apply_func <- pbapply::pblapply
}

parts <- indexer$iter()
apply_func(parts, function(proj, cl = NA) {
chunk_value <- private$get_chunk_value(proj, indexer, value, selection_shape)
private$chunk_setitem(proj$chunk_coords, proj$chunk_sel, chunk_value)
NULL
}, cl = cl)

return()
}
},
#' @description
Expand Down
1 change: 1 addition & 0 deletions pkgdown/_pkgdown.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ articles:
- ome-ngff
- remote-ome-ngff
- remote-anndata
- parallel
- title: Articles
navbar: Troubleshooting
contents:
Expand Down
29 changes: 9 additions & 20 deletions tests/testthat/test-parallel.R
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
library(pizzarr)

pbapply::pboptions(type = "none")

SlowGettingDirectoryStore <- R6::R6Class("SlowGettingDirectoryStore",
inherit = DirectoryStore,
public = list(
get_item = function(key) {
# Simulate a slow read such as an HTTP request.
Sys.sleep(1.0/50.0)
Sys.sleep(1.0/25.0)
return(super$get_item(key))
}
)
Expand All @@ -16,7 +18,7 @@ SlowSettingDirectoryStore <- R6::R6Class("SlowSettingDirectoryStore",
public = list(
set_item = function(key, value) {
# Simulate a slow write such as an HTTP request.
Sys.sleep(1.0/50.0)
Sys.sleep(1.0/25.0)
return(super$set_item(key, value))
}
)
Expand All @@ -33,37 +35,25 @@ get_dog_arr <- function(slow_setting = FALSE) {
store <- SlowGettingDirectoryStore$new(root)
}

g <- zarr_open_group(store)

# Using the OME metadata, get the path to the first resolution of the image pyramid.
attrs <- g$get_attrs()$to_list()
resolution_paths <- attrs$multiscales[[1]]$datasets[[1]]$path
first_resolution <- resolution_paths[[1]]

# Load the 3-dimensional array of RGB pixels (as a ZarrArray instance).
zarr_arr <- g$get_item(first_resolution)

zarr_arr <- zarr_open(store = store, path = "/0")
return(zarr_arr)
}

run_parallel_get <- function(num_workers) {
options(pizzarr.parallel_read_enabled = TRUE)
doParallel::registerDoParallel(num_workers)

options(pizzarr.parallel_read_enabled = num_workers)

zarr_arr <- get_dog_arr()
arr <- zarr_arr$get_item("...")$data

doParallel::stopImplicitCluster()
options(pizzarr.parallel_read_enabled = FALSE)

return(sum(arr))
}


run_parallel_set <- function(num_workers) {
options(pizzarr.parallel_write_enabled = TRUE)
doParallel::registerDoParallel(num_workers)

options(pizzarr.parallel_write_enabled = num_workers)

zarr_arr <- get_dog_arr(slow_setting = TRUE)
arr <- zarr_arr$get_item("...")$data

Expand All @@ -72,7 +62,6 @@ run_parallel_set <- function(num_workers) {

doubled_arr <- zarr_arr$get_item("...")$data

doParallel::stopImplicitCluster()
options(pizzarr.parallel_write_enabled = FALSE)

return(sum(doubled_arr))
Expand Down
128 changes: 128 additions & 0 deletions vignettes/parallel.Rmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
---
title: "Read and write in parallel"
output: rmarkdown::html_vignette
vignette: >
%\VignetteIndexEntry{Read and write in parallel}
%\VignetteEngine{knitr::rmarkdown}
%\VignetteEncoding{UTF-8}
---

```{r, include = FALSE}
knitr::opts_chunk$set(
collapse = TRUE,
comment = "#>",
out.width = "100%"
)
devtools::load_all()
```

By default, reads and writes are performed sequentially (i.e., not in parallel).
Users can opt-in to parallel read/write functionality via `options`.

## Simulate slow operations

```{r}
SlowDirectoryStore <- R6::R6Class("SlowDirectoryStore",
inherit = DirectoryStore,
public = list(
get_item = function(key) {
Sys.sleep(0.5) # Simulate a slow read.
return(super$get_item(key))
},
set_item = function(key, value) {
Sys.sleep(0.5) # Simulate a slow write.
return(super$set_item(key, value))
}
)
)
```

## Read in parallel

Provide an integer >= 2 to the option to use forking-based parallelism.
This value will be passed to the `cl` parameter of `pbapply::pblapply`.

```{r}
options(pizzarr.parallel_read_enabled = 4)
root <- pizzarr_sample("dog.ome.zarr")
store <- SlowDirectoryStore$new(root)
zarr_arr <- zarr_open(store = store, path = "/0")
arr <- zarr_arr$get_item("...")$data
sum(arr)
```

## Write in parallel

```{r}
options(pizzarr.parallel_write_enabled = 4)
root <- pizzarr_sample("dog.ome.zarr")
store <- SlowDirectoryStore$new(root)
zarr_arr <- zarr_open(store = store, path = "/0")
arr <- zarr_arr$get_item("...")$data
zarr_arr$set_item("...", arr * 2.0)
doubled_arr <- zarr_arr$get_item("...")$data
sum(doubled_arr)
```

## Parallel operations with future backend

To use the `future` backend for `pbapply`, set the value of the option to the string `"future"`.

Cluster-based:

```{r}
options(pizzarr.parallel_read_enabled = "future")
cl <- parallel::makeCluster(2)
future::plan(future::cluster, workers = cl)
root <- pizzarr_sample("dog.ome.zarr")
store <- SlowDirectoryStore$new(root)
zarr_arr <- zarr_open(store = store, path = "/0")
arr <- zarr_arr$get_item("...")$data
sum(arr)
parallel::stopCluster(cl)
```

Multisession-based:

```{r}
options(pizzarr.parallel_read_enabled = "future")
future::plan(future::multisession, workers = 4)
root <- pizzarr_sample("dog.ome.zarr")
store <- SlowDirectoryStore$new(root)
zarr_arr <- zarr_open(store = store, path = "/0")
arr <- zarr_arr$get_item("...")$data
sum(arr)
```

## Sequential operations

To return to sequential mode, run:

```{r}
options(
pizzarr.parallel_read_enabled = FALSE,
pizzarr.parallel_write_enabled = FALSE
)
```

## Disable progress bar

Parallel operations are implemented with `pbapply`.
To disable the progress bar, run:

```{r}
pbapply::pboptions(type = "none")
```

To re-enable, run:

```{r}
pbapply::pboptions(type = "timer")
```

0 comments on commit ac03900

Please sign in to comment.