diff --git a/DESCRIPTION b/DESCRIPTION index 38ac3ed..848a8aa 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,6 +1,6 @@ Package: pizzarr Type: Package -Title: Slice into Zarr arrays in R +Title: Slice into Zarr Arrays in R Version: 0.1.0 Authors@R: c( person( diff --git a/R/options.R b/R/options.R index c886bc7..0fd5b29 100644 --- a/R/options.R +++ b/R/options.R @@ -1,21 +1,29 @@ # Adapted from https://github.com/IRkernel/IRkernel/blob/master/R/options.r #' pizzarr_option_defaults +#' @description +#' * pizzarr.http_store_cache_time_seconds how long to cache web requests +#' * pizzarr.parallel_backend "future", a cluster object, or an integer (if not on windows) +#' * pizzarr.parallel_write_enabled logical, whether to use parallel backend for writing +#' * pizzarr.progress_bar logical whether to use `pbapply` to emit a progress bar #' @export pizzarr_option_defaults <- list( pizzarr.http_store_cache_time_seconds = 3600, - pizzarr.parallel_read_enabled = FALSE, - pizzarr.parallel_write_enabled = FALSE + pizzarr.parallel_backend = NA, + pizzarr.parallel_write_enabled = FALSE, + pizzarr.progress_bar = FALSE ) #' @keywords internal parse_parallel_option <- function(val) { + if(is.na(val)) return(val) + if(inherits(val, "cluster")) { return(val) } - if(val == "future") { + if(!is.na(val) && val == "future") { return("future") } @@ -34,8 +42,9 @@ parse_parallel_option <- function(val) { #' @keywords internal from_env <- list( PIZZARR_HTTP_STORE_CACHE_TIME_SECONDS = as.integer, - PIZZARR_PARALLEL_READ_ENABLED = parse_parallel_option, - PIZZARR_PARALLEL_WRITE_ENABLED = parse_parallel_option + PIZZARR_PARALLEL_BACKEND = parse_parallel_option, + PIZZARR_PARALLEL_WRITE_ENABLED = as.logical, + PIZZARR_PROGRESS_BAR = as.logical ) # converts e.g. jupyter.log_level to JUPYTER_LOG_LEVEL diff --git a/R/stores.R b/R/stores.R index 18179db..38d258c 100644 --- a/R/stores.R +++ b/R/stores.R @@ -348,12 +348,14 @@ MemoryStore <- R6::R6Class("MemoryStore", #' @description #' Store class that uses HTTP requests. #' Read-only. Depends on the `crul` package. -#' @details For parallel operation, set the "pizzarr.parallel_read_enabled" option +#' @details For parallel operation, set the "pizzarr.parallel_backend" option #' to one of: #' * `"future"` if a future plan has been set up #' * `integer` if you would like a one-time use cluster created per call #' * `cluster` object created with `parallel::make_cluster()` if you want to reuse a cluster #' +#' Set the option "pizzarr.progress_bar" to TRUE to get a progress bar for long running reads. +#' #' For more, see `vignette("parallel").` #' @rdname HttpStore #' @importFrom memoise memoise timeout @@ -375,7 +377,11 @@ HttpStore <- R6::R6Class("HttpStore", key <- item_to_key(item) path <- paste(private$base_path, key, sep="/") - parallel_option <- getOption("pizzarr.parallel_read_enabled") + ret <- try_from_zmeta(key, self) + + if(!is.null(ret)) return(ret) + + parallel_option <- getOption("pizzarr.parallel_backend", NA) parallel_option <- parse_parallel_option(parallel_option) is_parallel <- is_truthy_parallel_option(parallel_option) diff --git a/R/utils.R b/R/utils.R index a88129f..749aab9 100644 --- a/R/utils.R +++ b/R/utils.R @@ -374,12 +374,12 @@ item_to_key <- function(item) { #' @keywords internal is_truthy_parallel_option <- function(val) { - if(inherits(val, "cluster")) { - return(TRUE) - } - if(val == "future") { - return(TRUE) - } + if(is.na(val)) return(FALSE) + + if(inherits(val, "cluster")) return(TRUE) + + if(val == "future") return(TRUE) + return(as.logical(as.integer(val))) } diff --git a/R/zarr-array.R b/R/zarr-array.R index ce0d80a..7c6bba9 100644 --- a/R/zarr-array.R +++ b/R/zarr-array.R @@ -16,71 +16,70 @@ ZarrArray <- R6::R6Class("ZarrArray", # store Array store, already initialized. #' @keywords internal store = NULL, - #' chunk_store Separate storage for chunks. If not provided, `store` will be used for storage of both chunks and metadata. + # chunk_store Separate storage for chunks. If not provided, `store` will be used for storage of both chunks and metadata. #' @keywords internal chunk_store = NULL, - #' path Storage path. String, optional. + # path Storage path. String, optional. #' @keywords internal path = NULL, - #' read_only True if array should be protected against modification. + # read_only True if array should be protected against modification. #' @keywords internal read_only = NULL, - #' synchronizer Array synchronizer. Object, optional. + # synchronizer Array synchronizer. Object, optional. #' @keywords internal synchronizer = NULL, - #' cache_metadata If True (default), array configuration metadata will be cached. If False, metadata will be reloaded prior to all data access and modification. + # cache_metadata If True (default), array configuration metadata will be cached. If False, metadata will be reloaded prior to all data access and modification. #' @keywords internal cache_metadata = NULL, - #' cache_attrs If True (default), user attributes will be cached. If False, attributes will be reloaded prior to all data access and modification. + # cache_attrs If True (default), user attributes will be cached. If False, attributes will be reloaded prior to all data access and modification. #' @keywords internal cache_attrs = NULL, - #' write_empty_chunks If True, all chunks will be stored regardless of their contents. If False (default), each chunk is compared to the array's fill value prior to storing. If a chunk is uniformly equal to the fill value, then that chunk is not be stored, and the store entry for that chunk's key is deleted. + # write_empty_chunks If True, all chunks will be stored regardless of their contents. If False (default), each chunk is compared to the array's fill value prior to storing. If a chunk is uniformly equal to the fill value, then that chunk is not be stored, and the store entry for that chunk's key is deleted. #' @keywords internal write_empty_chunks = NULL, - #' key_prefix TODO + # key_prefix TODO #' @keywords internal key_prefix = NULL, - #' is_view TODO + # is_view TODO #' @keywords internal is_view = NULL, - #' attrs TODO + # attrs TODO #' @keywords internal attrs = NULL, - #' meta TODO + # meta TODO #' @keywords internal meta = NULL, - #' shape TODO + # shape TODO #' @keywords internal shape = NULL, - #' chunks TODO + # chunks TODO #' @keywords internal chunks = NULL, - #' dtype TODO + # dtype TODO #' @keywords internal dtype = NULL, - #' fill_value TODO + # fill_value TODO #' @keywords internal fill_value = NULL, - #' order TODO + # order TODO #' @keywords internal order = NULL, - #' dimension_separator TODO + # dimension_separator TODO #' @keywords internal dimension_separator = NULL, - #' compressor TODO + # compressor TODO #' @keywords internal compressor = NULL, - #' filters TODO + # filters TODO #' @keywords internal filters = NULL, - #' vindex TODO + # vindex TODO #' @keywords internal vindex = NULL, - #' oindex TODO + # oindex TODO #' @keywords internal oindex = NULL, - #' method_description - #' (Re)load metadata from store without synchronization (file locking). + # (Re)load metadata from store without synchronization (file locking). load_metadata_nosync = function() { mkey <- paste0(private$key_prefix, ARRAY_META_KEY) @@ -135,28 +134,28 @@ ZarrArray <- R6::R6Class("ZarrArray", } private$dtype <- normalize_dtype(meta$dtype, object_codec = object_codec) }, - #' method_description - #' Load or reload metadata from store. + # method_description + # Load or reload metadata from store. load_metadata = function() { private$load_metadata_nosync() # TODO: support for synchronization }, - #' method_description - #' Referesh metadata if not cached without synchronization (file locking). + # method_description + # Referesh metadata if not cached without synchronization (file locking). refresh_metadata_nosync = function() { if(!private$cache_metadata && !private$is_view) { private$load_metadata_nosync() } }, - #' method_description - #' Refresh metadata from store if not cached. + # method_description + # Refresh metadata from store if not cached. refresh_metadata = function() { if(!private$cache_metadata) { private$load_metadata() } }, - #' method_description - #' Write metadata to store without synchronization (file locking). + # method_description + # Write metadata to store without synchronization (file locking). flush_metadata_nosync = function() { if(private$is_view) { stop("Operation not permitted for views") @@ -188,14 +187,14 @@ ZarrArray <- R6::R6Class("ZarrArray", encoded_meta <- private$store$metadata_class$encode_array_metadata(zarray_meta) private$store$set_item(mkey, encoded_meta) }, - #' method_description - #' TODO + # method_description + # TODO chunk_key = function(chunk_coords) { # Reference: https://github.com/zarr-developers/zarr-python/blob/5dd4a0/zarr/core.py#L2063 return(paste0(private$key_prefix, do.call(paste, c(as.list(chunk_coords), sep = private$dimension_separator)))) }, - #' method_description - #' TODO + # method_description + # TODO compute_cdata_shape = function() { # Reference: https://github.com/zarr-developers/zarr-python/blob/5dd4a0/zarr/core.py#L428 if(is.null(private$shape)) { @@ -212,8 +211,8 @@ ZarrArray <- R6::R6Class("ZarrArray", cdata_shape <- as.numeric(cdata_shape) return(cdata_shape) }, - #' method_description - #' Resize an array without synchronization (file locking) + # method_description + # Resize an array without synchronization (file locking) resize_nosync = function(...) { # Note: When resizing an array, the data are not rearranged in any way. # If one or more dimensions are shrunk, any chunks falling outside the @@ -252,8 +251,8 @@ ZarrArray <- R6::R6Class("ZarrArray", } } }, - #' method_description - #' TODO + # method_description + # TODO get_basic_selection_zd = function(selection = NA, out = NA, fields = NA) { # Special case basic selection for zero-dimensional array # Check selection is valid @@ -296,14 +295,14 @@ ZarrArray <- R6::R6Class("ZarrArray", } return(out) }, - #' method_description - #' TODO + # method_description + # TODO get_basic_selection_nd = function(selection = NA, out = NA, fields = NA) { indexer <- BasicIndexer$new(selection, self) return(private$get_selection(indexer, out = out, fields = fields)) }, - #' method_description - #' TODO + # method_description + # TODO get_selection = function(indexer, out = NA, fields = NA) { # Reference: https://github.com/gzuidhof/zarr.js/blob/292804/src/core/index.ts#L304 # We iterate over all chunks which overlap the selection and thus contain data @@ -323,38 +322,13 @@ ZarrArray <- R6::R6Class("ZarrArray", return(out) } - parallel_option <- getOption("pizzarr.parallel_read_enabled") - cl <- parse_parallel_option(parallel_option) - is_parallel <- is_truthy_parallel_option(cl) - apply_func <- lapply - if(is_parallel) { - if(!requireNamespace("pbapply", quietly = TRUE)) { - stop("Parallel reading requires the 'pbapply' package.") - } - - if(is.integer(cl) & .Platform$OS.type == "windows") { - # See #105 - cl <- parallel::makeCluster(cl) - on.exit(parallel::stopCluster(cl)) - } - - apply_func <- function(X, FUN, ..., cl = NULL) { - - if(isTRUE(cl == "future")) { - pbapply::pblapply(X, FUN, ..., - future.packages = "Rarr", - future.seed=TRUE, cl = cl) - } else { - pbapply::pblapply(X, FUN, ..., cl = cl) - } - } - - } + ps <- get_parallel_settings(parallel_option = getOption("pizzarr.parallel_backend", NA)) + if(ps$close) on.exit(try(parallel::stopCluster(ps$cl), silent = TRUE)) parts <- indexer$iter() - part1_results <- apply_func(parts, function(proj, cl = NA) { + part1_results <- ps$apply_func(parts, function(proj) { private$chunk_getitem_part1(proj$chunk_coords, proj$chunk_sel, out, proj$out_sel, drop_axes = indexer$drop_axes) - }, cl = cl) + }, cl = ps$cl) for(i in seq_along(parts)) { proj <- parts[[i]] @@ -369,8 +343,8 @@ ZarrArray <- R6::R6Class("ZarrArray", return(out) }, - #' method_description - #' TODO + # method_description + # TODO set_basic_selection_zd = function(selection, value, fields = NA) { # Reference: https://github.com/zarr-developers/zarr-python/blob/5dd4a0e6cdc04c6413e14f57f61d389972ea937c/zarr/core.py#L1625 @@ -432,14 +406,14 @@ ZarrArray <- R6::R6Class("ZarrArray", c_data <- private$encode_chunk(chunk_raw) self$get_chunk_store()$set_item(c_key, c_data) }, - #' method_description - #' TODO + # method_description + # TODO set_basic_selection_nd = function(selection, value, fields = NA) { indexer <- BasicIndexer$new(selection, self) return(private$set_selection(indexer, value = value, fields = fields)) }, - #' method_description - #' TODO + # method_description + # TODO set_selection = function(indexer, value, fields = NA) { # Reference: https://github.com/zarr-developers/zarr-python/blob/5dd4a0/zarr/core.py#L1682 # Reference: https://github.com/gzuidhof/zarr.js/blob/15e3a3f00eb19f0133018fb65f002311ea53bb7c/src/core/index.ts#L566 @@ -474,55 +448,33 @@ ZarrArray <- R6::R6Class("ZarrArray", stop("Unknown data type for setting :(") } - parallel_option <- getOption("pizzarr.parallel_write_enabled") - cl <- parse_parallel_option(parallel_option) - is_parallel <- is_truthy_parallel_option(cl) + par_opt <- NA - apply_func <- lapply - if(is_parallel) { - if(!requireNamespace("pbapply", quietly=TRUE)) { - stop("Parallel writing requires the 'pbapply' package.") - } - - if(is.integer(cl) & .Platform$OS.type == "windows") { - # See #105 - cl <- parallel::makeCluster(cl) - on.exit(parallel::stopCluster(cl)) - - } - - apply_func <- function(X, FUN, ..., cl = NULL) { - - if(isTRUE(cl == "future")) { - pbapply::pblapply(X, FUN, ..., - future.packages = "Rarr", - future.seed=TRUE, cl = cl) - } else { - pbapply::pblapply(X, FUN, ..., cl = cl) - } - } - + if(getOption("pizzarr.parallel_write_enabled", FALSE)) { + par_opt <- getOption("pizzarr.parallel_backend", NA) } - + ps <- get_parallel_settings(parallel_option = par_opt) + if(ps$close) on.exit(try(parallel::stopCluster(ps$cl), silent = TRUE)) + parts <- indexer$iter() - apply_func(parts, function(proj, cl = NA) { + ps$apply_func(parts, function(proj) { chunk_value <- private$get_chunk_value(proj, indexer, value, selection_shape) private$chunk_setitem(proj$chunk_coords, proj$chunk_sel, chunk_value) NULL - }, cl = cl) + }, cl = ps$cl) return() } }, - #' method_description - #' TODO + # method_description + # TODO process_chunk = function(out, cdata, chunk_selection, drop_axes, out_is_ndarray, fields, out_selection, partial_read_decode = FALSE) { # Reference: https://github.com/zarr-developers/zarr-python/blob/5dd4a0/zarr/core.py#L1755 # TODO }, - #' method_description - #' TODO + # method_description + # TODO get_chunk_value = function(proj, indexer, value, selection_shape) { # Reference: https://github.com/gzuidhof/zarr.js/blob/15e3a3f00eb19f0133018fb65f002311ea53bb7c/src/core/index.ts#L550 @@ -541,13 +493,13 @@ ZarrArray <- R6::R6Class("ZarrArray", } return(chunk_value) }, - #' method_description - #' TODO + # method_description + # TODO chunk_buffer_to_raw_array = function(decoded_chunk) { # TODO }, - #' method_description - #' For parallel usage + # method_description + # For parallel usage chunk_getitem_part1 = function(chunk_coords, chunk_selection, out, out_selection, drop_axes = NA, fields = NA) { if(length(chunk_coords) != length(private$chunks)) { stop("Inconsistent shapes: chunkCoordsLength: ${chunkCoords.length}, cDataShapeLength: ${this.chunkDataShape.length}") @@ -567,8 +519,8 @@ ZarrArray <- R6::R6Class("ZarrArray", }) return(result) }, - #' method_description - #' For parallel usage + # method_description + # For parallel usage chunk_getitem_part2 = function(part1_result, chunk_coords, chunk_selection, out, out_selection, drop_axes = NA, fields = NA) { c_key <- private$chunk_key(chunk_coords) @@ -610,8 +562,8 @@ ZarrArray <- R6::R6Class("ZarrArray", } } }, - #' method_description - #' For non-parallel usage + # method_description + # For non-parallel usage chunk_getitem = function(chunk_coords, chunk_selection, out, out_selection, drop_axes = NA, fields = NA) { # TODO # Reference: https://github.com/gzuidhof/zarr.js/blob/15e3a3f00eb19f0133018fb65f002311ea53bb7c/src/core/index.ts#L380 @@ -658,13 +610,13 @@ ZarrArray <- R6::R6Class("ZarrArray", } }) }, - #' method_description - #' TODO + # method_description + # TODO chunk_getitems = function(lchunk_coords, lchunk_selection, out, lout_selection, drop_axes = NA, fields = NA) { # TODO }, - #' method_description - #' TODO + # method_description + # TODO chunk_setitem = function(chunk_coords, chunk_selection, value, fields = NA) { # Reference: https://github.com/gzuidhof/zarr.js/blob/15e3a3f00eb19f0133018fb65f002311ea53bb7c/src/core/index.ts#L625 @@ -741,31 +693,31 @@ ZarrArray <- R6::R6Class("ZarrArray", chunk_data <- private$encode_chunk(chunk_raw) self$get_chunk_store()$set_item(chunk_key, chunk_data) }, - #' method_description - #' TODO + # method_description + # TODO chunk_setitem_nosync = function(chunk_coords, chunk_selection, value, fields = NA) { # TODO }, - #' method_description - #' TODO + # method_description + # TODO chunk_setitems = function(lchunk_coords, lchunk_selection, values, fields = NA) { # TODO }, - #' method_description - #' TODO + # method_description + # TODO process_for_setitem = function(ckey, chunk_selection, value, fields = NA) { # TODO }, chunk_delitem = function(ckey) { # TODO }, - #' method_description - #' TODO + # method_description + # TODO chunk_delitems = function(ckeys) { # TODO }, - #' method_description - #' TODO + # method_description + # TODO decode_chunk = function(cdata, start = NA, nitems = NA, expected_shape = NA) { # Reference: https://github.com/zarr-developers/zarr-python/blob/5dd4a0e6cdc04c6413e14f57f61d389972ea937c/zarr/core.py#L2066 # decompress @@ -806,8 +758,8 @@ ZarrArray <- R6::R6Class("ZarrArray", # ensure correct chunk shape return(chunk) }, - #' method_description - #' TODO + # method_description + # TODO encode_chunk = function(chunk_as_raw) { # Reference: https://github.com/zarr-developers/zarr-python/blob/5dd4a0e6cdc04c6413e14f57f61d389972ea937c/zarr/core.py#L2105 @@ -837,8 +789,8 @@ ZarrArray <- R6::R6Class("ZarrArray", return(cdata) }, - #' method_description - #' TODO + # method_description + # TODO append_nosync = function(data, axis = 0) { # Reference: https://github.com/zarr-developers/zarr-python/blob/5dd4a0/zarr/core.py#L2141 # TODO @@ -1281,7 +1233,6 @@ ZarrArray <- R6::R6Class("ZarrArray", }, #' @description #' Convert Zarr object to R array (for S3 method). Note that this loads all data into memory. - #' #' @return array as.array = function() { return(self$get_item("...")$data) @@ -1317,3 +1268,93 @@ ZarrArray <- R6::R6Class("ZarrArray", as.array.ZarrArray = function(x, ...) { x$as.array() } + +#' get parallel settings +#' @keywords internal +#' @description +#' given information about user preferences and environment conditions, returns a function +#' and cluster object. +#' @param on_windows logical indicating if windows restrictions should apply +#' @param parallel_option integer, or "future" to control how parallelization occurs. +#' @param progress logical to control whether `pbapply` is used such that progress is printed. +#' @return list containing the function to use in parallel operations, a cluster object to be used +#' in parallel operations, and whether or not the cluster object needs to be closed. +#' +get_parallel_settings <- function(on_windows = (.Platform$OS.type == "windows"), + parallel_option = getOption("pizzarr.parallel_backend", NA), + progress = getOption("pizzarr.progress_bar", FALSE)) { + + cl <- parse_parallel_option(parallel_option) + is_parallel <- is_truthy_parallel_option(cl) + + # fall back on lapply + apply_func <- function(X, FUN, ..., cl = NULL) { + lapply(X, FUN, ...) + } + + # triggers closing a temporary cluster + close <- FALSE + + if(is_parallel) { + + # check for pbapply + if(progress & !requireNamespace("pbapply", quietly = TRUE)) { + # NOTEST + progress <- FALSE + warning("progress bar operations requires the 'pbapply' package.") + } + + if(isTRUE(cl == "future")) { + + if(!requireNamespace("future.apply", quietly = TRUE)) { + # NOTEST + warning("cluster options is 'future' but future.apply not available.") + + } else { # we can use future + if(progress) { + apply_func <- function(X, FUN, ..., cl = NULL) { + pbapply::pblapply(X, FUN, ..., + future.packages = "Rarr", + future.seed = TRUE, cl = cl) + } + } else { + apply_func <- function(X, FUN, ..., cl = NULL) { + future.apply::future_lapply(X, FUN, ..., + future.packages = "Rarr", + future.seed=TRUE) + } + } } + } else { + + if(!requireNamespace("parallel", quietly = TRUE)) { + # NOTEST + warning("Parallel operations require the 'parallel' or 'future' package.") + } else { + if(is.integer(cl) & on_windows) { + # See #105 + cl <- parallel::makeCluster(cl) + close <- TRUE + } + + if(progress) { + apply_func <- function(X, FUN, ..., cl = NULL) { + pbapply::pblapply(X, FUN, ..., cl = cl) + } + } else if(!is.logical(cl)) { + if(on_windows) { + apply_func <- function(X, FUN, ..., cl = NULL) { + parallel::parLapply(cl, X, FUN, ...) + } + } else { + apply_func <- function(X, FUN, ..., cl = NULL) { + parallel::mclapply(X, FUN, ..., mc.cores = cl) + } + } + } + if(is.logical(cl)) cl <- NULL + } + } + } + + list(apply_func = apply_func, cl = cl, close = close) +} \ No newline at end of file diff --git a/R/zarr-group.R b/R/zarr-group.R index 1c58657..e0b40ec 100644 --- a/R/zarr-group.R +++ b/R/zarr-group.R @@ -13,28 +13,28 @@ ZarrGroup <- R6::R6Class("ZarrGroup", # store TODO #' @keywords internal store = NULL, - #' path TODO + # path TODO #' @keywords internal path = NULL, - #' read_only TODO + # read_only TODO #' @keywords internal read_only = NULL, - #' chunk_store TODO + # chunk_store TODO #' @keywords internal chunk_store = NULL, - #' cache_attrs TODO + # cache_attrs TODO #' @keywords internal cache_attrs = NULL, - #' synchronizer TODO + # synchronizer TODO #' @keywords internal synchronizer = NULL, - #' key_prefix TODO + # key_prefix TODO #' @keywords internal key_prefix = NULL, - #' meta TODO + # meta TODO #' @keywords internal meta = NULL, - #' attrs TODO + # attrs TODO #' @keywords internal attrs = NULL, item_path = function(item) { diff --git a/README.md b/README.md index 9e9fda6..530b106 100644 --- a/README.md +++ b/README.md @@ -67,7 +67,7 @@ print(selection$data) | `Unicode` | ✔ / ✔ | Converted to `character` in R. | | `void *` | ❌ / ❌ | | | Structured data types | ❌ / ❌ | | -| Object data type - [VLenUTF8](https://numcodecs.readthedocs.io/en/stable/vlen.html#vlenutf8) | ✔ / ✔ | Converted to `character` in R. | +| Object data type - [VLenUTF8](https://numcodecs.readthedocs.io/en/stable/other/vlen.html#numcodecs.vlen.VLenUTF8) | ✔ / ✔ | Converted to `character` in R. | Note: no effort is made to assess loss of precision due to conversion. @@ -120,14 +120,14 @@ pkgdown::build_site() - Note: `pizzarr` has an optional dependency on Rarr for Blosc (de)compression. - R package development - [R packages](https://r-pkgs.org/) - - [roxygen2 syntax](https://cran.r-project.org/web/packages/roxygen2/vignettes/rd-formatting.html) + - [roxygen2 syntax](https://roxygen2.r-lib.org/articles/rd-formatting.html) - [R6](https://r6.r-lib.org/index.html) - [R6 roxygen2 syntax](https://www.tidyverse.org/blog/2019/11/roxygen2-7-0-0/#r6-documentation) - [pkgdown](https://pkgdown.r-lib.org/) - Zarr implementation - [zarr_implementations](https://github.com/zarr-developers/zarr_implementations) - [zarr-python](https://github.com/zarr-developers/zarr-python) - - [LZ4 and ZSTD compression in R](https://github.com/traversc/qs) + - [LZ4 and ZSTD compression in R](https://github.com/qsbase/qs) - [zarr.js](https://github.com/gzuidhof/zarr.js) - [zarrita.js](https://github.com/manzt/zarrita.js) - [v2 spec](https://zarr.readthedocs.io/en/stable/spec/v2.html) diff --git a/man/HttpStore.Rd b/man/HttpStore.Rd index 22962a3..52a0c17 100644 --- a/man/HttpStore.Rd +++ b/man/HttpStore.Rd @@ -9,7 +9,7 @@ Store class that uses HTTP requests. Read-only. Depends on the \code{crul} package. } \details{ -For parallel operation, set the "pizzarr.parallel_read_enabled" option +For parallel operation, set the "pizzarr.parallel_backend" option to one of: \itemize{ \item \code{"future"} if a future plan has been set up @@ -17,6 +17,8 @@ to one of: \item \code{cluster} object created with \code{parallel::make_cluster()} if you want to reuse a cluster } +Set the option "pizzarr.progress_bar" to TRUE to get a progress bar for long running reads. + For more, see \verb{vignette("parallel").} } \section{Super class}{ diff --git a/man/ZarrArray.Rd b/man/ZarrArray.Rd index 8c4d8ed..916ee17 100644 --- a/man/ZarrArray.Rd +++ b/man/ZarrArray.Rd @@ -13,120 +13,7 @@ Instantiate an array from an initialized store. \details{ The Zarr Array class. } -\keyword{(Re)load} -\keyword{(default),} -\keyword{(file} -\keyword{Array} -\keyword{False} -\keyword{False,} -\keyword{For} -\keyword{If} -\keyword{Load} -\keyword{Object,} -\keyword{Referesh} -\keyword{Refresh} -\keyword{Resize} -\keyword{Separate} -\keyword{Storage} -\keyword{String,} -\keyword{TODO} -\keyword{True} -\keyword{True,} -\keyword{Write} -\keyword{`store`} -\keyword{a} -\keyword{access} -\keyword{against} -\keyword{all} -\keyword{an} -\keyword{and} -\keyword{array} -\keyword{array's} -\keyword{attributes} -\keyword{attrs} -\keyword{be} -\keyword{both} -\keyword{cache_attrs} -\keyword{cache_metadata} -\keyword{cached} -\keyword{cached.} -\keyword{chunk} -\keyword{chunk's} -\keyword{chunk_store} -\keyword{chunks} -\keyword{chunks.} -\keyword{compared} -\keyword{compressor} -\keyword{configuration} -\keyword{contents.} -\keyword{data} -\keyword{deleted.} -\keyword{dimension_separator} -\keyword{dtype} -\keyword{each} -\keyword{entry} -\keyword{equal} -\keyword{fill} -\keyword{fill_value} -\keyword{filters} -\keyword{for} -\keyword{from} -\keyword{if} \keyword{internal} -\keyword{is} -\keyword{is_view} -\keyword{key} -\keyword{key_prefix} -\keyword{locking)} -\keyword{locking).} -\keyword{meta} -\keyword{metadata} -\keyword{metadata.} -\keyword{method_description} -\keyword{modification.} -\keyword{non-parallel} -\keyword{not} -\keyword{of} -\keyword{oindex} -\keyword{optional.} -\keyword{or} -\keyword{order} -\keyword{parallel} -\keyword{path} -\keyword{path.} -\keyword{prior} -\keyword{protected} -\keyword{provided,} -\keyword{read_only} -\keyword{regardless} -\keyword{reload} -\keyword{reloaded} -\keyword{shape} -\keyword{should} -\keyword{storage} -\keyword{store} -\keyword{store.} -\keyword{stored} -\keyword{stored,} -\keyword{storing.} -\keyword{synchronization} -\keyword{synchronizer} -\keyword{synchronizer.} -\keyword{that} -\keyword{the} -\keyword{their} -\keyword{then} -\keyword{to} -\keyword{uniformly} -\keyword{usage} -\keyword{used} -\keyword{user} -\keyword{value} -\keyword{value,} -\keyword{vindex} -\keyword{will} -\keyword{without} -\keyword{write_empty_chunks} \section{Methods}{ \subsection{Public methods}{ \itemize{ diff --git a/man/ZarrGroup.Rd b/man/ZarrGroup.Rd index 421b547..47cbefc 100644 --- a/man/ZarrGroup.Rd +++ b/man/ZarrGroup.Rd @@ -10,16 +10,7 @@ Instantiate a group from an initialized store. \details{ The Zarr Group class. } -\keyword{TODO} -\keyword{attrs} -\keyword{cache_attrs} -\keyword{chunk_store} \keyword{internal} -\keyword{key_prefix} -\keyword{meta} -\keyword{path} -\keyword{read_only} -\keyword{synchronizer} \section{Methods}{ \subsection{Public methods}{ \itemize{ diff --git a/man/get_parallel_settings.Rd b/man/get_parallel_settings.Rd new file mode 100644 index 0000000..0ff53ac --- /dev/null +++ b/man/get_parallel_settings.Rd @@ -0,0 +1,28 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/zarr-array.R +\name{get_parallel_settings} +\alias{get_parallel_settings} +\title{get parallel settings} +\usage{ +get_parallel_settings( + on_windows = (.Platform$OS.type == "windows"), + parallel_option = getOption("pizzarr.parallel_backend", NA), + progress = getOption("pizzarr.progress_bar", FALSE) +) +} +\arguments{ +\item{on_windows}{logical indicating if windows restrictions should apply} + +\item{parallel_option}{integer, or "future" to control how parallelization occurs.} + +\item{progress}{logical to control whether \code{pbapply} is used such that progress is printed.} +} +\value{ +list containing the function to use in parallel operations, a cluster object to be used +in parallel operations, and whether or not the cluster object needs to be closed. +} +\description{ +given information about user preferences and environment conditions, returns a function +and cluster object. +} +\keyword{internal} diff --git a/man/pizzarr_option_defaults.Rd b/man/pizzarr_option_defaults.Rd index 2bca041..48f4231 100644 --- a/man/pizzarr_option_defaults.Rd +++ b/man/pizzarr_option_defaults.Rd @@ -5,12 +5,17 @@ \alias{pizzarr_option_defaults} \title{pizzarr_option_defaults} \format{ -An object of class \code{list} of length 3. +An object of class \code{list} of length 4. } \usage{ pizzarr_option_defaults } \description{ -pizzarr_option_defaults +\itemize{ +\item pizzarr.http_store_cache_time_seconds how long to cache web requests +\item pizzarr.parallel_backend "future", a cluster object, or an integer (if not on windows) +\item pizzarr.parallel_write_enabled logical, whether to use parallel backend for writing +\item pizzarr.progress_bar logical whether to use \code{pbapply} to emit a progress bar +} } \keyword{datasets} diff --git a/tests/testthat/test-01-parallel.R b/tests/testthat/test-01-parallel.R index 32192cb..ae1cc46 100644 --- a/tests/testthat/test-01-parallel.R +++ b/tests/testthat/test-01-parallel.R @@ -9,8 +9,12 @@ SlowGettingDirectoryStore <- R6::R6Class("SlowGettingDirectoryStore", inherit = DirectoryStore, public = list( get_item = function(key) { + if(.Platform$OS.type == "windows") { + # windows has a lot of per process overhead + Sys.sleep(1/5) + } # Simulate a slow read such as an HTTP request. - Sys.sleep(1.0/25) + Sys.sleep(1/5) return(super$get_item(key)) } ) @@ -20,8 +24,12 @@ SlowSettingDirectoryStore <- R6::R6Class("SlowSettingDirectoryStore", inherit = DirectoryStore, public = list( set_item = function(key, value) { + if(.Platform$OS.type == "windows") { + # windows has a lot of per process overhead + Sys.sleep(1/5) + } # Simulate a slow write such as an HTTP request. - Sys.sleep(1.0/25) + Sys.sleep(1/5) return(super$set_item(key, value)) } ) @@ -47,19 +55,22 @@ get_dog_arr <- function(slow_setting = FALSE) { } run_parallel_get <- function(num_workers) { - options(pizzarr.parallel_read_enabled = num_workers) + options(pizzarr.parallel_backend = num_workers) + options(pizzarr.progress_bar = FALSE) zarr_arr <- get_dog_arr() arr <- zarr_arr$get_item("...")$data - options(pizzarr.parallel_read_enabled = FALSE) + options(pizzarr.parallel_backend = NA) return(sum(arr)) } run_parallel_set <- function(num_workers) { - options(pizzarr.parallel_write_enabled = num_workers) + options(pizzarr.parallel_write_enabled = TRUE) + options(pizzarr.parallel_backend = num_workers) + options(pizzarr.progress_bar = FALSE) zarr_arr <- get_dog_arr(slow_setting = TRUE) arr <- zarr_arr$get_item("...")$data @@ -70,18 +81,16 @@ run_parallel_set <- function(num_workers) { doubled_arr <- zarr_arr$get_item("...")$data options(pizzarr.parallel_write_enabled = FALSE) + options(pizzarr.parallel_backend = NA) return(sum(doubled_arr)) } -cl1 <- parallel::makeCluster(1) -cl2 <- parallel::makeCluster(2) - test_that("can run get_item() and set_item in parallel", { bench_df <- bench::mark( - run_parallel_get(cl1), - run_parallel_get(cl2), + run_parallel_get(1), + run_parallel_get(2), iterations = 1, memory = FALSE, filter_gc = FALSE @@ -89,8 +98,10 @@ test_that("can run get_item() and set_item in parallel", { expect_equal(unlist(bench_df$result), rep(134538481, 2)) + testthat::skip_on_covr() testthat::skip_on_os("windows") # injecting parallel workers this way on windows doesn't work + expect_equal(bench_df$total_time[[1]] > bench_df$total_time[[2]], TRUE) }) @@ -98,8 +109,8 @@ test_that("can run get_item() and set_item in parallel", { test_that("can run set_item() in parallel", { bench_df <- bench::mark( - run_parallel_set(cl1), - run_parallel_set(cl2), + run_parallel_set(1), + run_parallel_set(2), iterations = 1, memory = FALSE, filter_gc = FALSE @@ -107,12 +118,16 @@ test_that("can run set_item() in parallel", { expect_equal(unlist(bench_df$result), rep(134538481*2.0, 2)) + testthat::skip_on_covr() testthat::skip_on_os("windows") # injecting parallel workers this way on windows doesn't work + expect_equal(bench_df$total_time[[1]] > bench_df$total_time[[2]], TRUE) }) +cl1 <- parallel::makeCluster(1) + test_that("parse_parallel_option works as expected", { expect_equal(parse_parallel_option(cl1), cl1) expect_equal(parse_parallel_option("future"), "future") @@ -138,7 +153,137 @@ test_that("is_truthy_parallel_option works as expected", { expect_equal(is_truthy_parallel_option(2), TRUE) }) +test_that("get_parallel_settings", { + testthat::skip_on_covr() # need to debug why this breaks covr run + + # Case 1: not parallel + ps <- get_parallel_settings(parallel_option = NA) + + expect_equal(format(ps$apply_func), + format(function(X, FUN, ..., cl = NULL) { + lapply(X, FUN, ...) + })) + + expect_equal(ps$cl, NA) + + # Case 2a1: Future, progress + ps <- get_parallel_settings(parallel_option = "future", + progress = TRUE) + + expect_equal(format(ps$apply_func), + format(function(X, FUN, ..., cl = NULL) { + pbapply::pblapply(X, FUN, ..., + future.packages = "Rarr", + future.seed = TRUE, cl = cl) + })) + + expect_equal(ps$cl, "future") + + # Case 2a2: Future, no progress + + ps <- get_parallel_settings(parallel_option = "future", + progress = FALSE) + + expect_equal(format(ps$apply_func), + format(function(X, FUN, ..., cl = NULL) { + future.apply::future_lapply(X, FUN, ..., + future.packages = "Rarr", + future.seed=TRUE) + })) + + expect_equal(ps$cl, "future") + + # Case 2b1: cl = integer > 1, windows, progress = TRUE + + ps <- get_parallel_settings(on_windows = TRUE, + parallel_option = 2, + progress = TRUE) + + expect_equal(format(ps$apply_func), + format(function(X, FUN, ..., cl = NULL) { + pbapply::pblapply(X, FUN, ..., cl = cl) + })) + + expect_true(inherits(ps$cl, "cluster")) + + # Case 2b2: cl = 1, progress = TRUE, windows doesn't matter + + ps <- get_parallel_settings(parallel_option = 1, + progress = TRUE) + + expect_equal(format(ps$apply_func), + format(function(X, FUN, ..., cl = NULL) { + pbapply::pblapply(X, FUN, ..., cl = cl) + })) + + expect_equal(ps$cl, NULL) + + # Case 2b3: cl = 2, progress = TRUE, not windows + + ps <- get_parallel_settings(on_windows = FALSE, + parallel_option = 2, + progress = TRUE) + + expect_equal(format(ps$apply_func), + format(function(X, FUN, ..., cl = NULL) { + pbapply::pblapply(X, FUN, ..., cl = cl) + })) + + expect_equal(ps$cl, 2) + + # Case 2b4: cl = 2, not windows, progress + + ps <- get_parallel_settings(on_windows = FALSE, + parallel_option = 2, + progress = TRUE) + + expect_equal(format(ps$apply_func), + format(function(X, FUN, ..., cl = NULL) { + pbapply::pblapply(X, FUN, ..., cl = cl) + })) + + expect_equal(ps$cl, 2) + + # case 2b5 cl = 1, no progress + + ps <- get_parallel_settings(parallel_option = 1, + progress = FALSE) + + expect_equal(format(ps$apply_func), + format(function(X, FUN, ..., cl = NULL) { + lapply(X, FUN, ...) + })) + + expect_equal(ps$cl, NULL) + + # Case 2b6: cl = 2, no progress, on windows + + ps <- get_parallel_settings(on_windows = TRUE, + parallel_option = 2, + progress = FALSE) + + expect_equal(format(ps$apply_func), + format(function(X, FUN, ..., cl = NULL) { + parallel::parLapply(cl, X, FUN, ...) + })) + + expect_true(inherits(ps$cl, "cluster")) + + # Case 2b7: cl = 2, no progress, not windows + + ps <- get_parallel_settings(on_windows = FALSE, + parallel_option = 2, + progress = FALSE) + + expect_equal(format(ps$apply_func), + format(function(X, FUN, ..., cl = NULL) { + parallel::mclapply(X, FUN, ..., mc.cores = cl) + })) + + expect_equal(ps$cl, 2) + +}) + parallel::stopCluster(cl1) -parallel::stopCluster(cl2) if(clean) unlink(sample_dir, recursive = TRUE) diff --git a/vignettes/parallel.Rmd b/vignettes/parallel.Rmd index 8209633..dee59e6 100644 --- a/vignettes/parallel.Rmd +++ b/vignettes/parallel.Rmd @@ -52,10 +52,10 @@ SlowDirectoryStore <- R6::R6Class("SlowDirectoryStore", ## Read in parallel Provide an integer >= 2 or a cluster object to the option to use forking-based parallelism. -This value will be passed to the `cl` parameter of `pbapply::pblapply`. +This value will be passed to the `cl` parameter of `parallel::parLapply`, `future.apply::future_lapply`, or if the option "pizzarr.progress_bar" is `TRUE`, `pbapply::pblapply`. ```{r} -options(pizzarr.parallel_read_enabled = 4) +options(pizzarr.parallel_backend = 4) root <- pizzarr_sample("dog.ome.zarr") store <- SlowDirectoryStore$new(root) @@ -85,7 +85,7 @@ To use the `future` backend for `pbapply`, set the value of the option to the st Cluster-based: ```{r} -options(pizzarr.parallel_read_enabled = "future") +options(pizzarr.parallel_backend = "future") cl <- parallel::makeCluster(2) future::plan(future::cluster, workers = cl) @@ -102,7 +102,7 @@ parallel::stopCluster(cl) Multisession-based: ```{r} -options(pizzarr.parallel_read_enabled = "future") +options(pizzarr.parallel_backend = "future") oldplan <- future::plan() future::plan(future::multisession, workers = 4) @@ -122,7 +122,7 @@ To return to sequential mode, run: ```{r} options( - pizzarr.parallel_read_enabled = FALSE, + pizzarr.parallel_backend = NA, pizzarr.parallel_write_enabled = FALSE ) ```