From 26cb66a7b2688287d27ea5eaf5939264bc5730e1 Mon Sep 17 00:00:00 2001 From: David Blodgett Date: Thu, 19 Dec 2024 22:03:13 -0600 Subject: [PATCH 01/10] improve sample data fetch and parallel tests --- R/utils.R | 23 +++++++++++++++++------ man/pizzarr_sample.Rd | 14 +++++++++++--- tests/testthat/test-01-parallel.R | 19 ++++++++++++++++++- 3 files changed, 46 insertions(+), 10 deletions(-) diff --git a/R/utils.R b/R/utils.R index 0411d8e..a88129f 100644 --- a/R/utils.R +++ b/R/utils.R @@ -10,13 +10,22 @@ #' @return path to ready to use zarr store #' @export #' @examples -#' zarr_samples <- pizzarr_sample() +#' +#' sample_dir <- tools::R_user_dir("pizzarr") +#' +#' clean <- !dir.exists(sample_dir) +#' +#' zarr_samples <- pizzarr_sample(outdir = sample_dir) #' #' #printing without system path for example -#' gsub(tempdir(), "...", zarr_samples, fixed = TRUE) +#' gsub(sample_dir, "...", zarr_samples, fixed = TRUE) +#' +#' # clean up if you don't want to keep them for next time +#' if(clean) unlink(sample_dir, recursive = TRUE) #' pizzarr_sample <- function(dataset = NULL, - outdir = file.path(tempdir(TRUE), "pizzarr_sample")) { + outdir = file.path(tools::R_user_dir("pizzarr"), + "pizzarr_sample")) { # will unzip here tdir <- outdir dir.create(tdir, showWarnings = FALSE, recursive = TRUE) @@ -45,7 +54,9 @@ pizzarr_sample <- function(dataset = NULL, # in case zarr_zips is all, loop over them and unzip for(z in seq_along(zarr_zips)) { - if(file.size(zarr_zips[z]) == 0) { + need <- !file.exists(file.path(tdir, avail[z])) + + if(file.size(zarr_zips[z]) == 0 & need) { new_z <- file.path(tdir, basename(zarr_zips[z])) @@ -56,8 +67,8 @@ pizzarr_sample <- function(dataset = NULL, zarr_zips[z] <- new_z } - utils::unzip(zarr_zips[z], - exdir = file.path(tdir, dirname(avail[z]))) + if(need) + utils::unzip(zarr_zips[z], exdir = file.path(tdir, dirname(avail[z]))) } return(file.path(tdir, avail)) diff --git a/man/pizzarr_sample.Rd b/man/pizzarr_sample.Rd index 3a85c2c..f0b2308 100644 --- a/man/pizzarr_sample.Rd +++ b/man/pizzarr_sample.Rd @@ -6,7 +6,7 @@ \usage{ pizzarr_sample( dataset = NULL, - outdir = file.path(tempdir(TRUE), "pizzarr_sample") + outdir = file.path(tools::R_user_dir("pizzarr"), "pizzarr_sample") ) } \arguments{ @@ -26,9 +26,17 @@ For directory stores, unzips the store to a temporary directory and returns the resulting path. } \examples{ -zarr_samples <- pizzarr_sample() + +sample_dir <- tools::R_user_dir("pizzarr") + +clean <- !dir.exists(sample_dir) + +zarr_samples <- pizzarr_sample(outdir = sample_dir) #printing without system path for example -gsub(tempdir(), "...", zarr_samples, fixed = TRUE) +gsub(sample_dir, "...", zarr_samples, fixed = TRUE) + +# clean up if you don't want to keep them for next time +if(clean) unlink(sample_dir, recursive = TRUE) } diff --git a/tests/testthat/test-01-parallel.R b/tests/testthat/test-01-parallel.R index b3ce873..32192cb 100644 --- a/tests/testthat/test-01-parallel.R +++ b/tests/testthat/test-01-parallel.R @@ -1,5 +1,10 @@ library(pizzarr) +sample_dir <- tools::R_user_dir("pizzarr") +clean <- !dir.exists(sample_dir) + +cache <- pizzarr_sample("dog.ome.zarr") + SlowGettingDirectoryStore <- R6::R6Class("SlowGettingDirectoryStore", inherit = DirectoryStore, public = list( @@ -24,8 +29,12 @@ SlowSettingDirectoryStore <- R6::R6Class("SlowSettingDirectoryStore", get_dog_arr <- function(slow_setting = FALSE) { # The path to the root of the OME-NGFF Zarr store. - root <- pizzarr_sample("dog.ome.zarr") + root <- file.path(tempdir(), "dog.ome.zarr") + + file.copy(pizzarr_sample("dog.ome.zarr"), dirname(root), + recursive = TRUE) + # Open the OME-NGFF as a DirectoryStore. if(slow_setting) { store <- SlowSettingDirectoryStore$new(root) @@ -79,6 +88,9 @@ test_that("can run get_item() and set_item in parallel", { ) expect_equal(unlist(bench_df$result), rep(134538481, 2)) + + testthat::skip_on_os("windows") + # injecting parallel workers this way on windows doesn't work expect_equal(bench_df$total_time[[1]] > bench_df$total_time[[2]], TRUE) }) @@ -94,6 +106,9 @@ test_that("can run set_item() in parallel", { ) expect_equal(unlist(bench_df$result), rep(134538481*2.0, 2)) + + testthat::skip_on_os("windows") + # injecting parallel workers this way on windows doesn't work expect_equal(bench_df$total_time[[1]] > bench_df$total_time[[2]], TRUE) }) @@ -125,3 +140,5 @@ test_that("is_truthy_parallel_option works as expected", { parallel::stopCluster(cl1) parallel::stopCluster(cl2) + +if(clean) unlink(sample_dir, recursive = TRUE) From 9d25077ba9ef1b2eea9b3714800289e090aa5ecb Mon Sep 17 00:00:00 2001 From: David Blodgett Date: Thu, 19 Dec 2024 22:07:51 -0600 Subject: [PATCH 02/10] clean up after sample data --- tests/testthat/test-compat.R | 7 ++++++- tests/testthat/test-get.R | 5 +++++ tests/testthat/test-utils.R | 7 ++++++- vignettes/basics.Rmd | 5 +++++ vignettes/parallel.Rmd | 6 ++++++ 5 files changed, 28 insertions(+), 2 deletions(-) diff --git a/tests/testthat/test-compat.R b/tests/testthat/test-compat.R index 1401912..fd57496 100644 --- a/tests/testthat/test-compat.R +++ b/tests/testthat/test-compat.R @@ -1,5 +1,8 @@ library(pizzarr) +sample_dir <- tools::R_user_dir("pizzarr") +clean <- !dir.exists(sample_dir) + test_that("Can open Zarr group using convenience function", { root <- pizzarr_sample(file.path("fixtures", "v2", "data.zarr")) @@ -513,4 +516,6 @@ test_that("Can create 0-element 2D array", { expect_equal(is.character(sel$data), TRUE) expect_equal(length(sel$data), 0) expect_equal(dim(sel$data), c(0, 0)) -}) \ No newline at end of file +}) + +if(clean) unlink(sample_dir, recursive = TRUE) diff --git a/tests/testthat/test-get.R b/tests/testthat/test-get.R index 5066d6b..cc934c0 100644 --- a/tests/testthat/test-get.R +++ b/tests/testthat/test-get.R @@ -1,5 +1,8 @@ library(pizzarr) +sample_dir <- tools::R_user_dir("pizzarr") +clean <- !dir.exists(sample_dir) + test_that("get_basic_selection_zd", { # Reference: https://github.com/zarr-developers/zarr-python/blob/5dd4a0e6cdc04c6413e14f57f61d389972ea937c/zarr/tests/test_indexing.py#L70 a <- as_scalar(42) @@ -270,3 +273,5 @@ test_that("Can read 2D string array", { expected_arr <- t(array(data = c(row1, row2, row3, row4, row5), dim = c(10, 5))) expect_equal(expected_arr, data) }) + +if(clean) unlink(sample_dir, recursive = TRUE) diff --git a/tests/testthat/test-utils.R b/tests/testthat/test-utils.R index 3a92b79..98f6655 100644 --- a/tests/testthat/test-utils.R +++ b/tests/testthat/test-utils.R @@ -1,5 +1,8 @@ library(pizzarr) +sample_dir <- tools::R_user_dir("pizzarr") +clean <- !dir.exists(sample_dir) + test_that("demo data", { demo_data <- pizzarr_sample() @@ -215,4 +218,6 @@ test_that("NaN in json", { expect_equal(check, list(name = NULL)) expect_warning(try_fromJSON("borked", "test warning"), "test warning") -}) \ No newline at end of file +}) + +if(clean) unlink(sample_dir, recursive = TRUE) \ No newline at end of file diff --git a/vignettes/basics.Rmd b/vignettes/basics.Rmd index 4e47eb3..b205a08 100644 --- a/vignettes/basics.Rmd +++ b/vignettes/basics.Rmd @@ -13,6 +13,8 @@ vignette: > comment = "#>", out.width = "100%" ) +sample_dir <- tools::R_user_dir("pizzarr") +clean <- !dir.exists(sample_dir) ``` ## Load the package @@ -123,3 +125,6 @@ class(a) a$get_attrs()$to_list() ``` +```{r, include=FALSE} +if(clean) unlink(sample_dir, recursive = TRUE) +``` \ No newline at end of file diff --git a/vignettes/parallel.Rmd b/vignettes/parallel.Rmd index 7956d9e..8209633 100644 --- a/vignettes/parallel.Rmd +++ b/vignettes/parallel.Rmd @@ -14,6 +14,8 @@ vignette: > out.width = "100%" ) library(pizzarr) +sample_dir <- tools::R_user_dir("pizzarr") +clean <- !dir.exists(sample_dir) ``` By default, reads and writes are performed sequentially (i.e., not in parallel). @@ -138,4 +140,8 @@ To re-enable, run: ```{r} pbapply::pboptions(type = "timer") +``` + +```{r, include=FALSE} +if(clean) unlink(sample_dir, recursive = TRUE) ``` \ No newline at end of file From 2ac1a22b3e18ffc76bb2caee0425fd57c84bbca5 Mon Sep 17 00:00:00 2001 From: David Blodgett Date: Fri, 20 Dec 2024 13:03:05 -0600 Subject: [PATCH 03/10] improve parallel settings code and testing --- R/stores.R | 2 + R/zarr-array.R | 148 +++++++++++++++------------ man/HttpStore.Rd | 2 + tests/testthat/test-01-parallel.R | 159 +++++++++++++++++++++++++++--- vignettes/parallel.Rmd | 2 +- 5 files changed, 238 insertions(+), 75 deletions(-) diff --git a/R/stores.R b/R/stores.R index 18179db..dcbec45 100644 --- a/R/stores.R +++ b/R/stores.R @@ -354,6 +354,8 @@ MemoryStore <- R6::R6Class("MemoryStore", #' * `integer` if you would like a one-time use cluster created per call #' * `cluster` object created with `parallel::make_cluster()` if you want to reuse a cluster #' +#' Set the option "pizzarr.progress_bar" to TRUE to get a progress bar for long running reads. +#' #' For more, see `vignette("parallel").` #' @rdname HttpStore #' @importFrom memoise memoise timeout diff --git a/R/zarr-array.R b/R/zarr-array.R index ce0d80a..2c1bb2f 100644 --- a/R/zarr-array.R +++ b/R/zarr-array.R @@ -323,38 +323,13 @@ ZarrArray <- R6::R6Class("ZarrArray", return(out) } - parallel_option <- getOption("pizzarr.parallel_read_enabled") - cl <- parse_parallel_option(parallel_option) - is_parallel <- is_truthy_parallel_option(cl) - apply_func <- lapply - if(is_parallel) { - if(!requireNamespace("pbapply", quietly = TRUE)) { - stop("Parallel reading requires the 'pbapply' package.") - } - - if(is.integer(cl) & .Platform$OS.type == "windows") { - # See #105 - cl <- parallel::makeCluster(cl) - on.exit(parallel::stopCluster(cl)) - } - - apply_func <- function(X, FUN, ..., cl = NULL) { - - if(isTRUE(cl == "future")) { - pbapply::pblapply(X, FUN, ..., - future.packages = "Rarr", - future.seed=TRUE, cl = cl) - } else { - pbapply::pblapply(X, FUN, ..., cl = cl) - } - } - - } + ps <- get_parallel_settings(parallel_option = getOption("pizzarr.parallel_read_enabled", FALSE)) + if(ps$close) on.exit(try(parallel::stopCluster(ps$cl), silent = TRUE)) parts <- indexer$iter() - part1_results <- apply_func(parts, function(proj, cl = NA) { + part1_results <- ps$FUN(parts, function(proj, cl = NA) { private$chunk_getitem_part1(proj$chunk_coords, proj$chunk_sel, out, proj$out_sel, drop_axes = indexer$drop_axes) - }, cl = cl) + }, cl = ps$cl) for(i in seq_along(parts)) { proj <- parts[[i]] @@ -474,43 +449,15 @@ ZarrArray <- R6::R6Class("ZarrArray", stop("Unknown data type for setting :(") } - parallel_option <- getOption("pizzarr.parallel_write_enabled") - cl <- parse_parallel_option(parallel_option) - is_parallel <- is_truthy_parallel_option(cl) - - apply_func <- lapply - if(is_parallel) { - if(!requireNamespace("pbapply", quietly=TRUE)) { - stop("Parallel writing requires the 'pbapply' package.") - } - - if(is.integer(cl) & .Platform$OS.type == "windows") { - # See #105 - cl <- parallel::makeCluster(cl) - on.exit(parallel::stopCluster(cl)) - - } - - apply_func <- function(X, FUN, ..., cl = NULL) { - - if(isTRUE(cl == "future")) { - pbapply::pblapply(X, FUN, ..., - future.packages = "Rarr", - future.seed=TRUE, cl = cl) - } else { - pbapply::pblapply(X, FUN, ..., cl = cl) - } - } - - } - + ps <- get_parallel_settings(parallel_option = getOption("pizzarr.parallel_write_enabled", FALSE)) + if(ps$close) on.exit(try(parallel::stopCluster(ps$cl), silent = TRUE)) parts <- indexer$iter() - apply_func(parts, function(proj, cl = NA) { + ps$FUN(parts, function(proj, cl = NA) { chunk_value <- private$get_chunk_value(proj, indexer, value, selection_shape) private$chunk_setitem(proj$chunk_coords, proj$chunk_sel, chunk_value) NULL - }, cl = cl) + }, cl = ps$cl) return() } @@ -1317,3 +1264,82 @@ ZarrArray <- R6::R6Class("ZarrArray", as.array.ZarrArray = function(x, ...) { x$as.array() } + +get_parallel_settings <- function(on_windows = (.Platform$OS.type == "windows"), + parallel_option = getOption("pizzarr.parallel_read_enabled", FALSE), + progress = getOption("pizzarr.progress_bar", FALSE)) { + + cl <- parse_parallel_option(parallel_option) + is_parallel <- is_truthy_parallel_option(cl) + + # fall back on lapply + apply_func <- function(X, FUN, ..., cl = NULL) { + lapply(X, FUN, ...) + } + + # triggers closing a temporary cluster + close <- FALSE + + if(is_parallel) { + + # check for pbapply + if(progress & !requireNamespace("pbapply", quietly = TRUE)) { + # NOTEST + progress <- FALSE + warning("Parallel progress bar operations requires the 'pbapply' package.") + } + + if(isTRUE(cl == "future")) { + + if(!requireNamespace("future.apply", quietly = TRUE)) { + # NOTEST + warning("cluster options is 'future' but future.apply not available.") + + } else { # we can use future + if(progress) { + apply_func <- function(X, FUN, ..., cl = NULL) { + pbapply::pblapply(X, FUN, ..., + future.packages = "Rarr", + future.seed = TRUE, cl = cl) + } + } else { + apply_func <- function(X, FUN, ..., cl = NULL) { + future.apply::future_lapply(X, FUN, ..., + future.packages = "Rarr", + future.seed=TRUE) + } + } } + } else { + + if(!requireNamespace("parallel", quietly = TRUE)) { + # NOTEST + warning("Parallel operations require the 'parallel' or 'future' package.") + } else { + if(is.integer(cl) & on_windows) { + # See #105 + cl <- parallel::makeCluster(cl) + close <- TRUE + } + + if(progress) { + apply_func <- function(X, FUN, ..., cl = NULL) { + pbapply::pblapply(X, FUN, ..., cl = cl) + } + } else if(!is.logical(cl)) { + if(on_windows) { + apply_func <- function(X, FUN, ..., cl = NULL) { + parallel::parLapply(cl, X, FUN, ...) + } + } else { + apply_func <- function(X, FUN, ..., cl = NULL) { + parallel::mclapply(X, FUN, ..., mc.cores = cl) + } + } + } + if(is.logical(cl)) cl <- NULL + } + } + } + + list(FUN = apply_func, cl = cl, close = close) +} \ No newline at end of file diff --git a/man/HttpStore.Rd b/man/HttpStore.Rd index 22962a3..7551266 100644 --- a/man/HttpStore.Rd +++ b/man/HttpStore.Rd @@ -17,6 +17,8 @@ to one of: \item \code{cluster} object created with \code{parallel::make_cluster()} if you want to reuse a cluster } +Set the option "pizzarr.progress_bar" to TRUE to get a progress bar for long running reads. + For more, see \verb{vignette("parallel").} } \section{Super class}{ diff --git a/tests/testthat/test-01-parallel.R b/tests/testthat/test-01-parallel.R index 32192cb..9a1a04f 100644 --- a/tests/testthat/test-01-parallel.R +++ b/tests/testthat/test-01-parallel.R @@ -9,8 +9,12 @@ SlowGettingDirectoryStore <- R6::R6Class("SlowGettingDirectoryStore", inherit = DirectoryStore, public = list( get_item = function(key) { + if(.Platform$OS.type == "windows") { + # windows has a lot of per process overhead + Sys.sleep(1/5) + } # Simulate a slow read such as an HTTP request. - Sys.sleep(1.0/25) + Sys.sleep(10/25) return(super$get_item(key)) } ) @@ -20,6 +24,10 @@ SlowSettingDirectoryStore <- R6::R6Class("SlowSettingDirectoryStore", inherit = DirectoryStore, public = list( set_item = function(key, value) { + if(.Platform$OS.type == "windows") { + # windows has a lot of per process overhead + Sys.sleep(1/5) + } # Simulate a slow write such as an HTTP request. Sys.sleep(1.0/25) return(super$set_item(key, value)) @@ -48,6 +56,7 @@ get_dog_arr <- function(slow_setting = FALSE) { run_parallel_get <- function(num_workers) { options(pizzarr.parallel_read_enabled = num_workers) + options(pizzarr.progress_bar = FALSE) zarr_arr <- get_dog_arr() arr <- zarr_arr$get_item("...")$data @@ -60,6 +69,7 @@ run_parallel_get <- function(num_workers) { run_parallel_set <- function(num_workers) { options(pizzarr.parallel_write_enabled = num_workers) + options(pizzarr.progress_bar = FALSE) zarr_arr <- get_dog_arr(slow_setting = TRUE) arr <- zarr_arr$get_item("...")$data @@ -74,14 +84,11 @@ run_parallel_set <- function(num_workers) { return(sum(doubled_arr)) } -cl1 <- parallel::makeCluster(1) -cl2 <- parallel::makeCluster(2) - test_that("can run get_item() and set_item in parallel", { bench_df <- bench::mark( - run_parallel_get(cl1), - run_parallel_get(cl2), + run_parallel_get(1), + run_parallel_get(2), iterations = 1, memory = FALSE, filter_gc = FALSE @@ -89,8 +96,6 @@ test_that("can run get_item() and set_item in parallel", { expect_equal(unlist(bench_df$result), rep(134538481, 2)) - testthat::skip_on_os("windows") - # injecting parallel workers this way on windows doesn't work expect_equal(bench_df$total_time[[1]] > bench_df$total_time[[2]], TRUE) }) @@ -98,8 +103,8 @@ test_that("can run get_item() and set_item in parallel", { test_that("can run set_item() in parallel", { bench_df <- bench::mark( - run_parallel_set(cl1), - run_parallel_set(cl2), + run_parallel_set(1), + run_parallel_set(2), iterations = 1, memory = FALSE, filter_gc = FALSE @@ -107,12 +112,12 @@ test_that("can run set_item() in parallel", { expect_equal(unlist(bench_df$result), rep(134538481*2.0, 2)) - testthat::skip_on_os("windows") - # injecting parallel workers this way on windows doesn't work expect_equal(bench_df$total_time[[1]] > bench_df$total_time[[2]], TRUE) }) +cl1 <- parallel::makeCluster(1) + test_that("parse_parallel_option works as expected", { expect_equal(parse_parallel_option(cl1), cl1) expect_equal(parse_parallel_option("future"), "future") @@ -138,7 +143,135 @@ test_that("is_truthy_parallel_option works as expected", { expect_equal(is_truthy_parallel_option(2), TRUE) }) +test_that("get_parallel_settings", { + # Case 1: not parallel + ps <- get_parallel_settings(parallel_option = FALSE) + + expect_equal(format(ps$FUN), + format(function(X, FUN, ..., cl = NULL) { + lapply(X, FUN, ...) + })) + + expect_equal(ps$cl, FALSE) + + # Case 2a1: Future, progress + ps <- get_parallel_settings(parallel_option = "future", + progress = TRUE) + + expect_equal(format(ps$FUN), + format(function(X, FUN, ..., cl = NULL) { + pbapply::pblapply(X, FUN, ..., + future.packages = "Rarr", + future.seed = TRUE, cl = cl) + })) + + expect_equal(ps$cl, "future") + + # Case 2a2: Future, no progress + + ps <- get_parallel_settings(parallel_option = "future", + progress = FALSE) + + expect_equal(format(ps$FUN), + format(function(X, FUN, ..., cl = NULL) { + future.apply::future_lapply(X, FUN, ..., + future.packages = "Rarr", + future.seed=TRUE) + })) + + expect_equal(ps$cl, "future") + + # Case 2b1: cl = integer > 1, windows, progress = TRUE + + ps <- get_parallel_settings(on_windows = TRUE, + parallel_option = 2, + progress = TRUE) + + expect_equal(format(ps$FUN), + format(function(X, FUN, ..., cl = NULL) { + pbapply::pblapply(X, FUN, ..., cl = cl) + })) + + expect_true(inherits(ps$cl, "cluster")) + + # Case 2b2: cl = 1, progress = TRUE, windows doesn't matter + + ps <- get_parallel_settings(parallel_option = 1, + progress = TRUE) + + expect_equal(format(ps$FUN), + format(function(X, FUN, ..., cl = NULL) { + pbapply::pblapply(X, FUN, ..., cl = cl) + })) + + expect_equal(ps$cl, NULL) + + # Case 2b3: cl = 2, progress = TRUE, not windows + + ps <- get_parallel_settings(on_windows = FALSE, + parallel_option = 2, + progress = TRUE) + + expect_equal(format(ps$FUN), + format(function(X, FUN, ..., cl = NULL) { + pbapply::pblapply(X, FUN, ..., cl = cl) + })) + + expect_equal(ps$cl, 2) + + # Case 2b4: cl = 2, not windows, progress + + ps <- get_parallel_settings(on_windows = FALSE, + parallel_option = 2, + progress = TRUE) + + expect_equal(format(ps$FUN), + format(function(X, FUN, ..., cl = NULL) { + pbapply::pblapply(X, FUN, ..., cl = cl) + })) + + expect_equal(ps$cl, 2) + + # case 2b5 cl = 1, no progress + + ps <- get_parallel_settings(parallel_option = 1, + progress = FALSE) + + expect_equal(format(ps$FUN), + format(function(X, FUN, ..., cl = NULL) { + lapply(X, FUN, ...) + })) + + expect_equal(ps$cl, NULL) + + # Case 2b6: cl = 2, no progress, on windows + + ps <- get_parallel_settings(on_windows = TRUE, + parallel_option = 2, + progress = FALSE) + + expect_equal(format(ps$FUN), + format(function(X, FUN, ..., cl = NULL) { + parallel::parLapply(cl, X, FUN, ...) + })) + + expect_true(inherits(ps$cl, "cluster")) + + # Case 2b7: cl = 2, no progress, not windows + + ps <- get_parallel_settings(on_windows = FALSE, + parallel_option = 2, + progress = FALSE) + + expect_equal(format(ps$FUN), + format(function(X, FUN, ..., cl = NULL) { + parallel::mclapply(X, FUN, ..., mc.cores = cl) + })) + + expect_equal(ps$cl, 2) + +}) + parallel::stopCluster(cl1) -parallel::stopCluster(cl2) if(clean) unlink(sample_dir, recursive = TRUE) diff --git a/vignettes/parallel.Rmd b/vignettes/parallel.Rmd index 8209633..bacda04 100644 --- a/vignettes/parallel.Rmd +++ b/vignettes/parallel.Rmd @@ -52,7 +52,7 @@ SlowDirectoryStore <- R6::R6Class("SlowDirectoryStore", ## Read in parallel Provide an integer >= 2 or a cluster object to the option to use forking-based parallelism. -This value will be passed to the `cl` parameter of `pbapply::pblapply`. +This value will be passed to the `cl` parameter of `parallel::parLapply`, `future.apply::future_lapply`, or if the option "pizzarr.progress_bar" is `TRUE`, `pbapply::pblapply`. ```{r} options(pizzarr.parallel_read_enabled = 4) From 6a3f186f44a4b8d27c78a0c3e85520af428fd960 Mon Sep 17 00:00:00 2001 From: David Blodgett Date: Fri, 20 Dec 2024 13:16:59 -0600 Subject: [PATCH 04/10] test nits --- tests/testthat/test-01-parallel.R | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/testthat/test-01-parallel.R b/tests/testthat/test-01-parallel.R index 9a1a04f..ffb9a21 100644 --- a/tests/testthat/test-01-parallel.R +++ b/tests/testthat/test-01-parallel.R @@ -14,7 +14,7 @@ SlowGettingDirectoryStore <- R6::R6Class("SlowGettingDirectoryStore", Sys.sleep(1/5) } # Simulate a slow read such as an HTTP request. - Sys.sleep(10/25) + Sys.sleep(1.0/25) return(super$get_item(key)) } ) @@ -144,6 +144,8 @@ test_that("is_truthy_parallel_option works as expected", { }) test_that("get_parallel_settings", { + testthat::skip_on_covr() # need to debug why this breaks covr run + # Case 1: not parallel ps <- get_parallel_settings(parallel_option = FALSE) From f1eabbc715e9e299ec44b3cfdec3e52aade6129e Mon Sep 17 00:00:00 2001 From: David Blodgett Date: Fri, 20 Dec 2024 13:22:31 -0600 Subject: [PATCH 05/10] . --- tests/testthat/test-01-parallel.R | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/testthat/test-01-parallel.R b/tests/testthat/test-01-parallel.R index ffb9a21..f6590b9 100644 --- a/tests/testthat/test-01-parallel.R +++ b/tests/testthat/test-01-parallel.R @@ -14,7 +14,7 @@ SlowGettingDirectoryStore <- R6::R6Class("SlowGettingDirectoryStore", Sys.sleep(1/5) } # Simulate a slow read such as an HTTP request. - Sys.sleep(1.0/25) + Sys.sleep(1/5) return(super$get_item(key)) } ) @@ -29,7 +29,7 @@ SlowSettingDirectoryStore <- R6::R6Class("SlowSettingDirectoryStore", Sys.sleep(1/5) } # Simulate a slow write such as an HTTP request. - Sys.sleep(1.0/25) + Sys.sleep(1/5) return(super$set_item(key, value)) } ) From 233f76cd88acd48f04b498dd71b18c22b2ba750f Mon Sep 17 00:00:00 2001 From: David Blodgett Date: Fri, 20 Dec 2024 13:32:59 -0600 Subject: [PATCH 06/10] . --- tests/testthat/test-01-parallel.R | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/testthat/test-01-parallel.R b/tests/testthat/test-01-parallel.R index f6590b9..7e1431d 100644 --- a/tests/testthat/test-01-parallel.R +++ b/tests/testthat/test-01-parallel.R @@ -96,6 +96,7 @@ test_that("can run get_item() and set_item in parallel", { expect_equal(unlist(bench_df$result), rep(134538481, 2)) + testthat::skip_on_covr() expect_equal(bench_df$total_time[[1]] > bench_df$total_time[[2]], TRUE) }) @@ -112,6 +113,7 @@ test_that("can run set_item() in parallel", { expect_equal(unlist(bench_df$result), rep(134538481*2.0, 2)) + testthat::skip_on_covr() expect_equal(bench_df$total_time[[1]] > bench_df$total_time[[2]], TRUE) }) From bc4da7315498a83051140b65f8afac29e183466d Mon Sep 17 00:00:00 2001 From: David Blodgett Date: Fri, 20 Dec 2024 15:15:58 -0600 Subject: [PATCH 07/10] doco cleanup for CRAN checks --- DESCRIPTION | 2 +- R/zarr-array.R | 158 +++++++++++++++++++++++------------------------ R/zarr-group.R | 16 ++--- README.md | 6 +- man/ZarrArray.Rd | 113 --------------------------------- man/ZarrGroup.Rd | 9 --- 6 files changed, 90 insertions(+), 214 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 38ac3ed..848a8aa 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,6 +1,6 @@ Package: pizzarr Type: Package -Title: Slice into Zarr arrays in R +Title: Slice into Zarr Arrays in R Version: 0.1.0 Authors@R: c( person( diff --git a/R/zarr-array.R b/R/zarr-array.R index 2c1bb2f..6e3ef8c 100644 --- a/R/zarr-array.R +++ b/R/zarr-array.R @@ -16,71 +16,70 @@ ZarrArray <- R6::R6Class("ZarrArray", # store Array store, already initialized. #' @keywords internal store = NULL, - #' chunk_store Separate storage for chunks. If not provided, `store` will be used for storage of both chunks and metadata. + # chunk_store Separate storage for chunks. If not provided, `store` will be used for storage of both chunks and metadata. #' @keywords internal chunk_store = NULL, - #' path Storage path. String, optional. + # path Storage path. String, optional. #' @keywords internal path = NULL, - #' read_only True if array should be protected against modification. + # read_only True if array should be protected against modification. #' @keywords internal read_only = NULL, - #' synchronizer Array synchronizer. Object, optional. + # synchronizer Array synchronizer. Object, optional. #' @keywords internal synchronizer = NULL, - #' cache_metadata If True (default), array configuration metadata will be cached. If False, metadata will be reloaded prior to all data access and modification. + # cache_metadata If True (default), array configuration metadata will be cached. If False, metadata will be reloaded prior to all data access and modification. #' @keywords internal cache_metadata = NULL, - #' cache_attrs If True (default), user attributes will be cached. If False, attributes will be reloaded prior to all data access and modification. + # cache_attrs If True (default), user attributes will be cached. If False, attributes will be reloaded prior to all data access and modification. #' @keywords internal cache_attrs = NULL, - #' write_empty_chunks If True, all chunks will be stored regardless of their contents. If False (default), each chunk is compared to the array's fill value prior to storing. If a chunk is uniformly equal to the fill value, then that chunk is not be stored, and the store entry for that chunk's key is deleted. + # write_empty_chunks If True, all chunks will be stored regardless of their contents. If False (default), each chunk is compared to the array's fill value prior to storing. If a chunk is uniformly equal to the fill value, then that chunk is not be stored, and the store entry for that chunk's key is deleted. #' @keywords internal write_empty_chunks = NULL, - #' key_prefix TODO + # key_prefix TODO #' @keywords internal key_prefix = NULL, - #' is_view TODO + # is_view TODO #' @keywords internal is_view = NULL, - #' attrs TODO + # attrs TODO #' @keywords internal attrs = NULL, - #' meta TODO + # meta TODO #' @keywords internal meta = NULL, - #' shape TODO + # shape TODO #' @keywords internal shape = NULL, - #' chunks TODO + # chunks TODO #' @keywords internal chunks = NULL, - #' dtype TODO + # dtype TODO #' @keywords internal dtype = NULL, - #' fill_value TODO + # fill_value TODO #' @keywords internal fill_value = NULL, - #' order TODO + # order TODO #' @keywords internal order = NULL, - #' dimension_separator TODO + # dimension_separator TODO #' @keywords internal dimension_separator = NULL, - #' compressor TODO + # compressor TODO #' @keywords internal compressor = NULL, - #' filters TODO + # filters TODO #' @keywords internal filters = NULL, - #' vindex TODO + # vindex TODO #' @keywords internal vindex = NULL, - #' oindex TODO + # oindex TODO #' @keywords internal oindex = NULL, - #' method_description - #' (Re)load metadata from store without synchronization (file locking). + # (Re)load metadata from store without synchronization (file locking). load_metadata_nosync = function() { mkey <- paste0(private$key_prefix, ARRAY_META_KEY) @@ -135,28 +134,28 @@ ZarrArray <- R6::R6Class("ZarrArray", } private$dtype <- normalize_dtype(meta$dtype, object_codec = object_codec) }, - #' method_description - #' Load or reload metadata from store. + # method_description + # Load or reload metadata from store. load_metadata = function() { private$load_metadata_nosync() # TODO: support for synchronization }, - #' method_description - #' Referesh metadata if not cached without synchronization (file locking). + # method_description + # Referesh metadata if not cached without synchronization (file locking). refresh_metadata_nosync = function() { if(!private$cache_metadata && !private$is_view) { private$load_metadata_nosync() } }, - #' method_description - #' Refresh metadata from store if not cached. + # method_description + # Refresh metadata from store if not cached. refresh_metadata = function() { if(!private$cache_metadata) { private$load_metadata() } }, - #' method_description - #' Write metadata to store without synchronization (file locking). + # method_description + # Write metadata to store without synchronization (file locking). flush_metadata_nosync = function() { if(private$is_view) { stop("Operation not permitted for views") @@ -188,14 +187,14 @@ ZarrArray <- R6::R6Class("ZarrArray", encoded_meta <- private$store$metadata_class$encode_array_metadata(zarray_meta) private$store$set_item(mkey, encoded_meta) }, - #' method_description - #' TODO + # method_description + # TODO chunk_key = function(chunk_coords) { # Reference: https://github.com/zarr-developers/zarr-python/blob/5dd4a0/zarr/core.py#L2063 return(paste0(private$key_prefix, do.call(paste, c(as.list(chunk_coords), sep = private$dimension_separator)))) }, - #' method_description - #' TODO + # method_description + # TODO compute_cdata_shape = function() { # Reference: https://github.com/zarr-developers/zarr-python/blob/5dd4a0/zarr/core.py#L428 if(is.null(private$shape)) { @@ -212,8 +211,8 @@ ZarrArray <- R6::R6Class("ZarrArray", cdata_shape <- as.numeric(cdata_shape) return(cdata_shape) }, - #' method_description - #' Resize an array without synchronization (file locking) + # method_description + # Resize an array without synchronization (file locking) resize_nosync = function(...) { # Note: When resizing an array, the data are not rearranged in any way. # If one or more dimensions are shrunk, any chunks falling outside the @@ -252,8 +251,8 @@ ZarrArray <- R6::R6Class("ZarrArray", } } }, - #' method_description - #' TODO + # method_description + # TODO get_basic_selection_zd = function(selection = NA, out = NA, fields = NA) { # Special case basic selection for zero-dimensional array # Check selection is valid @@ -296,14 +295,14 @@ ZarrArray <- R6::R6Class("ZarrArray", } return(out) }, - #' method_description - #' TODO + # method_description + # TODO get_basic_selection_nd = function(selection = NA, out = NA, fields = NA) { indexer <- BasicIndexer$new(selection, self) return(private$get_selection(indexer, out = out, fields = fields)) }, - #' method_description - #' TODO + # method_description + # TODO get_selection = function(indexer, out = NA, fields = NA) { # Reference: https://github.com/gzuidhof/zarr.js/blob/292804/src/core/index.ts#L304 # We iterate over all chunks which overlap the selection and thus contain data @@ -344,8 +343,8 @@ ZarrArray <- R6::R6Class("ZarrArray", return(out) }, - #' method_description - #' TODO + # method_description + # TODO set_basic_selection_zd = function(selection, value, fields = NA) { # Reference: https://github.com/zarr-developers/zarr-python/blob/5dd4a0e6cdc04c6413e14f57f61d389972ea937c/zarr/core.py#L1625 @@ -407,14 +406,14 @@ ZarrArray <- R6::R6Class("ZarrArray", c_data <- private$encode_chunk(chunk_raw) self$get_chunk_store()$set_item(c_key, c_data) }, - #' method_description - #' TODO + # method_description + # TODO set_basic_selection_nd = function(selection, value, fields = NA) { indexer <- BasicIndexer$new(selection, self) return(private$set_selection(indexer, value = value, fields = fields)) }, - #' method_description - #' TODO + # method_description + # TODO set_selection = function(indexer, value, fields = NA) { # Reference: https://github.com/zarr-developers/zarr-python/blob/5dd4a0/zarr/core.py#L1682 # Reference: https://github.com/gzuidhof/zarr.js/blob/15e3a3f00eb19f0133018fb65f002311ea53bb7c/src/core/index.ts#L566 @@ -462,14 +461,14 @@ ZarrArray <- R6::R6Class("ZarrArray", return() } }, - #' method_description - #' TODO + # method_description + # TODO process_chunk = function(out, cdata, chunk_selection, drop_axes, out_is_ndarray, fields, out_selection, partial_read_decode = FALSE) { # Reference: https://github.com/zarr-developers/zarr-python/blob/5dd4a0/zarr/core.py#L1755 # TODO }, - #' method_description - #' TODO + # method_description + # TODO get_chunk_value = function(proj, indexer, value, selection_shape) { # Reference: https://github.com/gzuidhof/zarr.js/blob/15e3a3f00eb19f0133018fb65f002311ea53bb7c/src/core/index.ts#L550 @@ -488,13 +487,13 @@ ZarrArray <- R6::R6Class("ZarrArray", } return(chunk_value) }, - #' method_description - #' TODO + # method_description + # TODO chunk_buffer_to_raw_array = function(decoded_chunk) { # TODO }, - #' method_description - #' For parallel usage + # method_description + # For parallel usage chunk_getitem_part1 = function(chunk_coords, chunk_selection, out, out_selection, drop_axes = NA, fields = NA) { if(length(chunk_coords) != length(private$chunks)) { stop("Inconsistent shapes: chunkCoordsLength: ${chunkCoords.length}, cDataShapeLength: ${this.chunkDataShape.length}") @@ -514,8 +513,8 @@ ZarrArray <- R6::R6Class("ZarrArray", }) return(result) }, - #' method_description - #' For parallel usage + # method_description + # For parallel usage chunk_getitem_part2 = function(part1_result, chunk_coords, chunk_selection, out, out_selection, drop_axes = NA, fields = NA) { c_key <- private$chunk_key(chunk_coords) @@ -557,8 +556,8 @@ ZarrArray <- R6::R6Class("ZarrArray", } } }, - #' method_description - #' For non-parallel usage + # method_description + # For non-parallel usage chunk_getitem = function(chunk_coords, chunk_selection, out, out_selection, drop_axes = NA, fields = NA) { # TODO # Reference: https://github.com/gzuidhof/zarr.js/blob/15e3a3f00eb19f0133018fb65f002311ea53bb7c/src/core/index.ts#L380 @@ -605,13 +604,13 @@ ZarrArray <- R6::R6Class("ZarrArray", } }) }, - #' method_description - #' TODO + # method_description + # TODO chunk_getitems = function(lchunk_coords, lchunk_selection, out, lout_selection, drop_axes = NA, fields = NA) { # TODO }, - #' method_description - #' TODO + # method_description + # TODO chunk_setitem = function(chunk_coords, chunk_selection, value, fields = NA) { # Reference: https://github.com/gzuidhof/zarr.js/blob/15e3a3f00eb19f0133018fb65f002311ea53bb7c/src/core/index.ts#L625 @@ -688,31 +687,31 @@ ZarrArray <- R6::R6Class("ZarrArray", chunk_data <- private$encode_chunk(chunk_raw) self$get_chunk_store()$set_item(chunk_key, chunk_data) }, - #' method_description - #' TODO + # method_description + # TODO chunk_setitem_nosync = function(chunk_coords, chunk_selection, value, fields = NA) { # TODO }, - #' method_description - #' TODO + # method_description + # TODO chunk_setitems = function(lchunk_coords, lchunk_selection, values, fields = NA) { # TODO }, - #' method_description - #' TODO + # method_description + # TODO process_for_setitem = function(ckey, chunk_selection, value, fields = NA) { # TODO }, chunk_delitem = function(ckey) { # TODO }, - #' method_description - #' TODO + # method_description + # TODO chunk_delitems = function(ckeys) { # TODO }, - #' method_description - #' TODO + # method_description + # TODO decode_chunk = function(cdata, start = NA, nitems = NA, expected_shape = NA) { # Reference: https://github.com/zarr-developers/zarr-python/blob/5dd4a0e6cdc04c6413e14f57f61d389972ea937c/zarr/core.py#L2066 # decompress @@ -753,8 +752,8 @@ ZarrArray <- R6::R6Class("ZarrArray", # ensure correct chunk shape return(chunk) }, - #' method_description - #' TODO + # method_description + # TODO encode_chunk = function(chunk_as_raw) { # Reference: https://github.com/zarr-developers/zarr-python/blob/5dd4a0e6cdc04c6413e14f57f61d389972ea937c/zarr/core.py#L2105 @@ -784,8 +783,8 @@ ZarrArray <- R6::R6Class("ZarrArray", return(cdata) }, - #' method_description - #' TODO + # method_description + # TODO append_nosync = function(data, axis = 0) { # Reference: https://github.com/zarr-developers/zarr-python/blob/5dd4a0/zarr/core.py#L2141 # TODO @@ -1228,7 +1227,6 @@ ZarrArray <- R6::R6Class("ZarrArray", }, #' @description #' Convert Zarr object to R array (for S3 method). Note that this loads all data into memory. - #' #' @return array as.array = function() { return(self$get_item("...")$data) diff --git a/R/zarr-group.R b/R/zarr-group.R index 1c58657..e0b40ec 100644 --- a/R/zarr-group.R +++ b/R/zarr-group.R @@ -13,28 +13,28 @@ ZarrGroup <- R6::R6Class("ZarrGroup", # store TODO #' @keywords internal store = NULL, - #' path TODO + # path TODO #' @keywords internal path = NULL, - #' read_only TODO + # read_only TODO #' @keywords internal read_only = NULL, - #' chunk_store TODO + # chunk_store TODO #' @keywords internal chunk_store = NULL, - #' cache_attrs TODO + # cache_attrs TODO #' @keywords internal cache_attrs = NULL, - #' synchronizer TODO + # synchronizer TODO #' @keywords internal synchronizer = NULL, - #' key_prefix TODO + # key_prefix TODO #' @keywords internal key_prefix = NULL, - #' meta TODO + # meta TODO #' @keywords internal meta = NULL, - #' attrs TODO + # attrs TODO #' @keywords internal attrs = NULL, item_path = function(item) { diff --git a/README.md b/README.md index 9e9fda6..530b106 100644 --- a/README.md +++ b/README.md @@ -67,7 +67,7 @@ print(selection$data) | `Unicode` | ✔ / ✔ | Converted to `character` in R. | | `void *` | ❌ / ❌ | | | Structured data types | ❌ / ❌ | | -| Object data type - [VLenUTF8](https://numcodecs.readthedocs.io/en/stable/vlen.html#vlenutf8) | ✔ / ✔ | Converted to `character` in R. | +| Object data type - [VLenUTF8](https://numcodecs.readthedocs.io/en/stable/other/vlen.html#numcodecs.vlen.VLenUTF8) | ✔ / ✔ | Converted to `character` in R. | Note: no effort is made to assess loss of precision due to conversion. @@ -120,14 +120,14 @@ pkgdown::build_site() - Note: `pizzarr` has an optional dependency on Rarr for Blosc (de)compression. - R package development - [R packages](https://r-pkgs.org/) - - [roxygen2 syntax](https://cran.r-project.org/web/packages/roxygen2/vignettes/rd-formatting.html) + - [roxygen2 syntax](https://roxygen2.r-lib.org/articles/rd-formatting.html) - [R6](https://r6.r-lib.org/index.html) - [R6 roxygen2 syntax](https://www.tidyverse.org/blog/2019/11/roxygen2-7-0-0/#r6-documentation) - [pkgdown](https://pkgdown.r-lib.org/) - Zarr implementation - [zarr_implementations](https://github.com/zarr-developers/zarr_implementations) - [zarr-python](https://github.com/zarr-developers/zarr-python) - - [LZ4 and ZSTD compression in R](https://github.com/traversc/qs) + - [LZ4 and ZSTD compression in R](https://github.com/qsbase/qs) - [zarr.js](https://github.com/gzuidhof/zarr.js) - [zarrita.js](https://github.com/manzt/zarrita.js) - [v2 spec](https://zarr.readthedocs.io/en/stable/spec/v2.html) diff --git a/man/ZarrArray.Rd b/man/ZarrArray.Rd index 8c4d8ed..916ee17 100644 --- a/man/ZarrArray.Rd +++ b/man/ZarrArray.Rd @@ -13,120 +13,7 @@ Instantiate an array from an initialized store. \details{ The Zarr Array class. } -\keyword{(Re)load} -\keyword{(default),} -\keyword{(file} -\keyword{Array} -\keyword{False} -\keyword{False,} -\keyword{For} -\keyword{If} -\keyword{Load} -\keyword{Object,} -\keyword{Referesh} -\keyword{Refresh} -\keyword{Resize} -\keyword{Separate} -\keyword{Storage} -\keyword{String,} -\keyword{TODO} -\keyword{True} -\keyword{True,} -\keyword{Write} -\keyword{`store`} -\keyword{a} -\keyword{access} -\keyword{against} -\keyword{all} -\keyword{an} -\keyword{and} -\keyword{array} -\keyword{array's} -\keyword{attributes} -\keyword{attrs} -\keyword{be} -\keyword{both} -\keyword{cache_attrs} -\keyword{cache_metadata} -\keyword{cached} -\keyword{cached.} -\keyword{chunk} -\keyword{chunk's} -\keyword{chunk_store} -\keyword{chunks} -\keyword{chunks.} -\keyword{compared} -\keyword{compressor} -\keyword{configuration} -\keyword{contents.} -\keyword{data} -\keyword{deleted.} -\keyword{dimension_separator} -\keyword{dtype} -\keyword{each} -\keyword{entry} -\keyword{equal} -\keyword{fill} -\keyword{fill_value} -\keyword{filters} -\keyword{for} -\keyword{from} -\keyword{if} \keyword{internal} -\keyword{is} -\keyword{is_view} -\keyword{key} -\keyword{key_prefix} -\keyword{locking)} -\keyword{locking).} -\keyword{meta} -\keyword{metadata} -\keyword{metadata.} -\keyword{method_description} -\keyword{modification.} -\keyword{non-parallel} -\keyword{not} -\keyword{of} -\keyword{oindex} -\keyword{optional.} -\keyword{or} -\keyword{order} -\keyword{parallel} -\keyword{path} -\keyword{path.} -\keyword{prior} -\keyword{protected} -\keyword{provided,} -\keyword{read_only} -\keyword{regardless} -\keyword{reload} -\keyword{reloaded} -\keyword{shape} -\keyword{should} -\keyword{storage} -\keyword{store} -\keyword{store.} -\keyword{stored} -\keyword{stored,} -\keyword{storing.} -\keyword{synchronization} -\keyword{synchronizer} -\keyword{synchronizer.} -\keyword{that} -\keyword{the} -\keyword{their} -\keyword{then} -\keyword{to} -\keyword{uniformly} -\keyword{usage} -\keyword{used} -\keyword{user} -\keyword{value} -\keyword{value,} -\keyword{vindex} -\keyword{will} -\keyword{without} -\keyword{write_empty_chunks} \section{Methods}{ \subsection{Public methods}{ \itemize{ diff --git a/man/ZarrGroup.Rd b/man/ZarrGroup.Rd index 421b547..47cbefc 100644 --- a/man/ZarrGroup.Rd +++ b/man/ZarrGroup.Rd @@ -10,16 +10,7 @@ Instantiate a group from an initialized store. \details{ The Zarr Group class. } -\keyword{TODO} -\keyword{attrs} -\keyword{cache_attrs} -\keyword{chunk_store} \keyword{internal} -\keyword{key_prefix} -\keyword{meta} -\keyword{path} -\keyword{read_only} -\keyword{synchronizer} \section{Methods}{ \subsection{Public methods}{ \itemize{ From 92ca03a082cfd6ab239ec463f007da761be385e8 Mon Sep 17 00:00:00 2001 From: David Blodgett Date: Wed, 19 Feb 2025 21:14:08 -0600 Subject: [PATCH 08/10] short cut to avoid some web requests --- R/stores.R | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/R/stores.R b/R/stores.R index dcbec45..16fb974 100644 --- a/R/stores.R +++ b/R/stores.R @@ -377,6 +377,10 @@ HttpStore <- R6::R6Class("HttpStore", key <- item_to_key(item) path <- paste(private$base_path, key, sep="/") + ret <- try_from_zmeta(key, self) + + if(!is.null(ret)) return(ret) + parallel_option <- getOption("pizzarr.parallel_read_enabled") parallel_option <- parse_parallel_option(parallel_option) is_parallel <- is_truthy_parallel_option(parallel_option) From b44cefc9bf84c382fa26529d5b62fc6ca0005a17 Mon Sep 17 00:00:00 2001 From: David Blodgett Date: Thu, 20 Feb 2025 08:11:37 -0600 Subject: [PATCH 09/10] apply_func instead of FUN --- R/zarr-array.R | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/R/zarr-array.R b/R/zarr-array.R index 6e3ef8c..0f2fd96 100644 --- a/R/zarr-array.R +++ b/R/zarr-array.R @@ -326,7 +326,7 @@ ZarrArray <- R6::R6Class("ZarrArray", if(ps$close) on.exit(try(parallel::stopCluster(ps$cl), silent = TRUE)) parts <- indexer$iter() - part1_results <- ps$FUN(parts, function(proj, cl = NA) { + part1_results <- ps$apply_func(parts, function(proj, cl = NA) { private$chunk_getitem_part1(proj$chunk_coords, proj$chunk_sel, out, proj$out_sel, drop_axes = indexer$drop_axes) }, cl = ps$cl) @@ -452,7 +452,7 @@ ZarrArray <- R6::R6Class("ZarrArray", if(ps$close) on.exit(try(parallel::stopCluster(ps$cl), silent = TRUE)) parts <- indexer$iter() - ps$FUN(parts, function(proj, cl = NA) { + ps$apply_func(parts, function(proj, cl = NA) { chunk_value <- private$get_chunk_value(proj, indexer, value, selection_shape) private$chunk_setitem(proj$chunk_coords, proj$chunk_sel, chunk_value) NULL @@ -1339,5 +1339,5 @@ get_parallel_settings <- function(on_windows = (.Platform$OS.type == "windows"), } } - list(FUN = apply_func, cl = cl, close = close) + list(apply_func = apply_func, cl = cl, close = close) } \ No newline at end of file From b214e5ca3d803666cf117d7dd4c855a0a0f2e981 Mon Sep 17 00:00:00 2001 From: David Blodgett Date: Thu, 20 Feb 2025 11:14:46 -0600 Subject: [PATCH 10/10] clean up parallel options and documentation --- R/options.R | 19 +++++++++++++----- R/stores.R | 4 ++-- R/utils.R | 12 ++++++------ R/zarr-array.R | 31 +++++++++++++++++++++++------- man/HttpStore.Rd | 2 +- man/get_parallel_settings.Rd | 28 +++++++++++++++++++++++++++ man/pizzarr_option_defaults.Rd | 9 +++++++-- tests/testthat/test-01-parallel.R | 32 ++++++++++++++++--------------- vignettes/parallel.Rmd | 8 ++++---- 9 files changed, 103 insertions(+), 42 deletions(-) create mode 100644 man/get_parallel_settings.Rd diff --git a/R/options.R b/R/options.R index c886bc7..0fd5b29 100644 --- a/R/options.R +++ b/R/options.R @@ -1,21 +1,29 @@ # Adapted from https://github.com/IRkernel/IRkernel/blob/master/R/options.r #' pizzarr_option_defaults +#' @description +#' * pizzarr.http_store_cache_time_seconds how long to cache web requests +#' * pizzarr.parallel_backend "future", a cluster object, or an integer (if not on windows) +#' * pizzarr.parallel_write_enabled logical, whether to use parallel backend for writing +#' * pizzarr.progress_bar logical whether to use `pbapply` to emit a progress bar #' @export pizzarr_option_defaults <- list( pizzarr.http_store_cache_time_seconds = 3600, - pizzarr.parallel_read_enabled = FALSE, - pizzarr.parallel_write_enabled = FALSE + pizzarr.parallel_backend = NA, + pizzarr.parallel_write_enabled = FALSE, + pizzarr.progress_bar = FALSE ) #' @keywords internal parse_parallel_option <- function(val) { + if(is.na(val)) return(val) + if(inherits(val, "cluster")) { return(val) } - if(val == "future") { + if(!is.na(val) && val == "future") { return("future") } @@ -34,8 +42,9 @@ parse_parallel_option <- function(val) { #' @keywords internal from_env <- list( PIZZARR_HTTP_STORE_CACHE_TIME_SECONDS = as.integer, - PIZZARR_PARALLEL_READ_ENABLED = parse_parallel_option, - PIZZARR_PARALLEL_WRITE_ENABLED = parse_parallel_option + PIZZARR_PARALLEL_BACKEND = parse_parallel_option, + PIZZARR_PARALLEL_WRITE_ENABLED = as.logical, + PIZZARR_PROGRESS_BAR = as.logical ) # converts e.g. jupyter.log_level to JUPYTER_LOG_LEVEL diff --git a/R/stores.R b/R/stores.R index 16fb974..38d258c 100644 --- a/R/stores.R +++ b/R/stores.R @@ -348,7 +348,7 @@ MemoryStore <- R6::R6Class("MemoryStore", #' @description #' Store class that uses HTTP requests. #' Read-only. Depends on the `crul` package. -#' @details For parallel operation, set the "pizzarr.parallel_read_enabled" option +#' @details For parallel operation, set the "pizzarr.parallel_backend" option #' to one of: #' * `"future"` if a future plan has been set up #' * `integer` if you would like a one-time use cluster created per call @@ -381,7 +381,7 @@ HttpStore <- R6::R6Class("HttpStore", if(!is.null(ret)) return(ret) - parallel_option <- getOption("pizzarr.parallel_read_enabled") + parallel_option <- getOption("pizzarr.parallel_backend", NA) parallel_option <- parse_parallel_option(parallel_option) is_parallel <- is_truthy_parallel_option(parallel_option) diff --git a/R/utils.R b/R/utils.R index a88129f..749aab9 100644 --- a/R/utils.R +++ b/R/utils.R @@ -374,12 +374,12 @@ item_to_key <- function(item) { #' @keywords internal is_truthy_parallel_option <- function(val) { - if(inherits(val, "cluster")) { - return(TRUE) - } - if(val == "future") { - return(TRUE) - } + if(is.na(val)) return(FALSE) + + if(inherits(val, "cluster")) return(TRUE) + + if(val == "future") return(TRUE) + return(as.logical(as.integer(val))) } diff --git a/R/zarr-array.R b/R/zarr-array.R index 0f2fd96..7c6bba9 100644 --- a/R/zarr-array.R +++ b/R/zarr-array.R @@ -322,11 +322,11 @@ ZarrArray <- R6::R6Class("ZarrArray", return(out) } - ps <- get_parallel_settings(parallel_option = getOption("pizzarr.parallel_read_enabled", FALSE)) + ps <- get_parallel_settings(parallel_option = getOption("pizzarr.parallel_backend", NA)) if(ps$close) on.exit(try(parallel::stopCluster(ps$cl), silent = TRUE)) parts <- indexer$iter() - part1_results <- ps$apply_func(parts, function(proj, cl = NA) { + part1_results <- ps$apply_func(parts, function(proj) { private$chunk_getitem_part1(proj$chunk_coords, proj$chunk_sel, out, proj$out_sel, drop_axes = indexer$drop_axes) }, cl = ps$cl) @@ -448,11 +448,17 @@ ZarrArray <- R6::R6Class("ZarrArray", stop("Unknown data type for setting :(") } - ps <- get_parallel_settings(parallel_option = getOption("pizzarr.parallel_write_enabled", FALSE)) - if(ps$close) on.exit(try(parallel::stopCluster(ps$cl), silent = TRUE)) + par_opt <- NA + + if(getOption("pizzarr.parallel_write_enabled", FALSE)) { + par_opt <- getOption("pizzarr.parallel_backend", NA) + } + ps <- get_parallel_settings(parallel_option = par_opt) + if(ps$close) on.exit(try(parallel::stopCluster(ps$cl), silent = TRUE)) + parts <- indexer$iter() - ps$apply_func(parts, function(proj, cl = NA) { + ps$apply_func(parts, function(proj) { chunk_value <- private$get_chunk_value(proj, indexer, value, selection_shape) private$chunk_setitem(proj$chunk_coords, proj$chunk_sel, chunk_value) NULL @@ -1263,8 +1269,19 @@ as.array.ZarrArray = function(x, ...) { x$as.array() } +#' get parallel settings +#' @keywords internal +#' @description +#' given information about user preferences and environment conditions, returns a function +#' and cluster object. +#' @param on_windows logical indicating if windows restrictions should apply +#' @param parallel_option integer, or "future" to control how parallelization occurs. +#' @param progress logical to control whether `pbapply` is used such that progress is printed. +#' @return list containing the function to use in parallel operations, a cluster object to be used +#' in parallel operations, and whether or not the cluster object needs to be closed. +#' get_parallel_settings <- function(on_windows = (.Platform$OS.type == "windows"), - parallel_option = getOption("pizzarr.parallel_read_enabled", FALSE), + parallel_option = getOption("pizzarr.parallel_backend", NA), progress = getOption("pizzarr.progress_bar", FALSE)) { cl <- parse_parallel_option(parallel_option) @@ -1284,7 +1301,7 @@ get_parallel_settings <- function(on_windows = (.Platform$OS.type == "windows"), if(progress & !requireNamespace("pbapply", quietly = TRUE)) { # NOTEST progress <- FALSE - warning("Parallel progress bar operations requires the 'pbapply' package.") + warning("progress bar operations requires the 'pbapply' package.") } if(isTRUE(cl == "future")) { diff --git a/man/HttpStore.Rd b/man/HttpStore.Rd index 7551266..52a0c17 100644 --- a/man/HttpStore.Rd +++ b/man/HttpStore.Rd @@ -9,7 +9,7 @@ Store class that uses HTTP requests. Read-only. Depends on the \code{crul} package. } \details{ -For parallel operation, set the "pizzarr.parallel_read_enabled" option +For parallel operation, set the "pizzarr.parallel_backend" option to one of: \itemize{ \item \code{"future"} if a future plan has been set up diff --git a/man/get_parallel_settings.Rd b/man/get_parallel_settings.Rd new file mode 100644 index 0000000..0ff53ac --- /dev/null +++ b/man/get_parallel_settings.Rd @@ -0,0 +1,28 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/zarr-array.R +\name{get_parallel_settings} +\alias{get_parallel_settings} +\title{get parallel settings} +\usage{ +get_parallel_settings( + on_windows = (.Platform$OS.type == "windows"), + parallel_option = getOption("pizzarr.parallel_backend", NA), + progress = getOption("pizzarr.progress_bar", FALSE) +) +} +\arguments{ +\item{on_windows}{logical indicating if windows restrictions should apply} + +\item{parallel_option}{integer, or "future" to control how parallelization occurs.} + +\item{progress}{logical to control whether \code{pbapply} is used such that progress is printed.} +} +\value{ +list containing the function to use in parallel operations, a cluster object to be used +in parallel operations, and whether or not the cluster object needs to be closed. +} +\description{ +given information about user preferences and environment conditions, returns a function +and cluster object. +} +\keyword{internal} diff --git a/man/pizzarr_option_defaults.Rd b/man/pizzarr_option_defaults.Rd index 2bca041..48f4231 100644 --- a/man/pizzarr_option_defaults.Rd +++ b/man/pizzarr_option_defaults.Rd @@ -5,12 +5,17 @@ \alias{pizzarr_option_defaults} \title{pizzarr_option_defaults} \format{ -An object of class \code{list} of length 3. +An object of class \code{list} of length 4. } \usage{ pizzarr_option_defaults } \description{ -pizzarr_option_defaults +\itemize{ +\item pizzarr.http_store_cache_time_seconds how long to cache web requests +\item pizzarr.parallel_backend "future", a cluster object, or an integer (if not on windows) +\item pizzarr.parallel_write_enabled logical, whether to use parallel backend for writing +\item pizzarr.progress_bar logical whether to use \code{pbapply} to emit a progress bar +} } \keyword{datasets} diff --git a/tests/testthat/test-01-parallel.R b/tests/testthat/test-01-parallel.R index 7e1431d..45dcb9d 100644 --- a/tests/testthat/test-01-parallel.R +++ b/tests/testthat/test-01-parallel.R @@ -55,20 +55,21 @@ get_dog_arr <- function(slow_setting = FALSE) { } run_parallel_get <- function(num_workers) { - options(pizzarr.parallel_read_enabled = num_workers) + options(pizzarr.parallel_backend = num_workers) options(pizzarr.progress_bar = FALSE) zarr_arr <- get_dog_arr() arr <- zarr_arr$get_item("...")$data - options(pizzarr.parallel_read_enabled = FALSE) + options(pizzarr.parallel_backend = NA) return(sum(arr)) } run_parallel_set <- function(num_workers) { - options(pizzarr.parallel_write_enabled = num_workers) + options(pizzarr.parallel_write_enabled = TRUE) + options(pizzarr.parallel_backend = num_workers) options(pizzarr.progress_bar = FALSE) zarr_arr <- get_dog_arr(slow_setting = TRUE) @@ -80,6 +81,7 @@ run_parallel_set <- function(num_workers) { doubled_arr <- zarr_arr$get_item("...")$data options(pizzarr.parallel_write_enabled = FALSE) + options(pizzarr.parallel_backend = NA) return(sum(doubled_arr)) } @@ -149,20 +151,20 @@ test_that("get_parallel_settings", { testthat::skip_on_covr() # need to debug why this breaks covr run # Case 1: not parallel - ps <- get_parallel_settings(parallel_option = FALSE) + ps <- get_parallel_settings(parallel_option = NA) - expect_equal(format(ps$FUN), + expect_equal(format(ps$apply_func), format(function(X, FUN, ..., cl = NULL) { lapply(X, FUN, ...) })) - expect_equal(ps$cl, FALSE) + expect_equal(ps$cl, NA) # Case 2a1: Future, progress ps <- get_parallel_settings(parallel_option = "future", progress = TRUE) - expect_equal(format(ps$FUN), + expect_equal(format(ps$apply_func), format(function(X, FUN, ..., cl = NULL) { pbapply::pblapply(X, FUN, ..., future.packages = "Rarr", @@ -176,7 +178,7 @@ test_that("get_parallel_settings", { ps <- get_parallel_settings(parallel_option = "future", progress = FALSE) - expect_equal(format(ps$FUN), + expect_equal(format(ps$apply_func), format(function(X, FUN, ..., cl = NULL) { future.apply::future_lapply(X, FUN, ..., future.packages = "Rarr", @@ -191,7 +193,7 @@ test_that("get_parallel_settings", { parallel_option = 2, progress = TRUE) - expect_equal(format(ps$FUN), + expect_equal(format(ps$apply_func), format(function(X, FUN, ..., cl = NULL) { pbapply::pblapply(X, FUN, ..., cl = cl) })) @@ -203,7 +205,7 @@ test_that("get_parallel_settings", { ps <- get_parallel_settings(parallel_option = 1, progress = TRUE) - expect_equal(format(ps$FUN), + expect_equal(format(ps$apply_func), format(function(X, FUN, ..., cl = NULL) { pbapply::pblapply(X, FUN, ..., cl = cl) })) @@ -216,7 +218,7 @@ test_that("get_parallel_settings", { parallel_option = 2, progress = TRUE) - expect_equal(format(ps$FUN), + expect_equal(format(ps$apply_func), format(function(X, FUN, ..., cl = NULL) { pbapply::pblapply(X, FUN, ..., cl = cl) })) @@ -229,7 +231,7 @@ test_that("get_parallel_settings", { parallel_option = 2, progress = TRUE) - expect_equal(format(ps$FUN), + expect_equal(format(ps$apply_func), format(function(X, FUN, ..., cl = NULL) { pbapply::pblapply(X, FUN, ..., cl = cl) })) @@ -241,7 +243,7 @@ test_that("get_parallel_settings", { ps <- get_parallel_settings(parallel_option = 1, progress = FALSE) - expect_equal(format(ps$FUN), + expect_equal(format(ps$apply_func), format(function(X, FUN, ..., cl = NULL) { lapply(X, FUN, ...) })) @@ -254,7 +256,7 @@ test_that("get_parallel_settings", { parallel_option = 2, progress = FALSE) - expect_equal(format(ps$FUN), + expect_equal(format(ps$apply_func), format(function(X, FUN, ..., cl = NULL) { parallel::parLapply(cl, X, FUN, ...) })) @@ -267,7 +269,7 @@ test_that("get_parallel_settings", { parallel_option = 2, progress = FALSE) - expect_equal(format(ps$FUN), + expect_equal(format(ps$apply_func), format(function(X, FUN, ..., cl = NULL) { parallel::mclapply(X, FUN, ..., mc.cores = cl) })) diff --git a/vignettes/parallel.Rmd b/vignettes/parallel.Rmd index bacda04..dee59e6 100644 --- a/vignettes/parallel.Rmd +++ b/vignettes/parallel.Rmd @@ -55,7 +55,7 @@ Provide an integer >= 2 or a cluster object to the option to use forking-based p This value will be passed to the `cl` parameter of `parallel::parLapply`, `future.apply::future_lapply`, or if the option "pizzarr.progress_bar" is `TRUE`, `pbapply::pblapply`. ```{r} -options(pizzarr.parallel_read_enabled = 4) +options(pizzarr.parallel_backend = 4) root <- pizzarr_sample("dog.ome.zarr") store <- SlowDirectoryStore$new(root) @@ -85,7 +85,7 @@ To use the `future` backend for `pbapply`, set the value of the option to the st Cluster-based: ```{r} -options(pizzarr.parallel_read_enabled = "future") +options(pizzarr.parallel_backend = "future") cl <- parallel::makeCluster(2) future::plan(future::cluster, workers = cl) @@ -102,7 +102,7 @@ parallel::stopCluster(cl) Multisession-based: ```{r} -options(pizzarr.parallel_read_enabled = "future") +options(pizzarr.parallel_backend = "future") oldplan <- future::plan() future::plan(future::multisession, workers = 4) @@ -122,7 +122,7 @@ To return to sequential mode, run: ```{r} options( - pizzarr.parallel_read_enabled = FALSE, + pizzarr.parallel_backend = NA, pizzarr.parallel_write_enabled = FALSE ) ```