diff --git a/DESCRIPTION b/DESCRIPTION index 5096f5a3..18a614b1 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,6 +1,6 @@ Package: Spectra Title: Spectra Infrastructure for Mass Spectrometry Data -Version: 1.13.1 +Version: 1.13.2 Description: The Spectra package defines an efficient infrastructure for storing and handling mass spectrometry spectra and functionality to subset, process, visualize and compare spectra data. It provides different diff --git a/NAMESPACE b/NAMESPACE index 9424c53d..386f4c31 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -1,5 +1,6 @@ # Generated by roxygen2: do not edit by hand +export("processingChunkSize<-") export(MsBackendCached) export(MsBackendDataFrame) export(MsBackendHdf5Peaks) @@ -27,6 +28,8 @@ export(plotMzDelta) export(plotSpectra) export(plotSpectraOverlay) export(ppm) +export(processingChunkFactor) +export(processingChunkSize) export(processingLog) export(reduceSpectra) export(scalePeaks) @@ -63,6 +66,7 @@ exportMethods(addProcessing) exportMethods(backendBpparam) exportMethods(backendInitialize) exportMethods(backendMerge) +exportMethods(backendParallelFactor) exportMethods(bin) exportMethods(c) exportMethods(centroided) diff --git a/NEWS.md b/NEWS.md index 68e28932..bd40e31c 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,5 +1,17 @@ # Spectra 1.13 +## Changes in 1.13.2 + +- Add possibility to enable and perform chunk-wise (parallel) processing to + `Spectra`: add functions `processingChunkSize`, `backendParallelFactor` and + `processingChunkFactor` to set or get definition of chunks for parallel + processing. All functions working on peaks data use this mechanism which + is implemented in the internal `.peaksapply` function. The `Spectra` object + gains a new slot `"processingChunkSize"` that is used to define the + size of the processing chunks for the `Spectra`. See also [issue + #304](https://github.com/rformassspectrometry/Spectra/issues/304). + This ensures processing also of very large data sets. + ## Changes in 1.13.1 - Fix issue with `bin` function (see diff --git a/R/AllGenerics.R b/R/AllGenerics.R index 7a5fdbbb..772436df 100644 --- a/R/AllGenerics.R +++ b/R/AllGenerics.R @@ -11,6 +11,9 @@ setGeneric("backendMerge", def = function(object, ...) standardGeneric("backendMerge"), valueClass = "MsBackend") #' @rdname hidden_aliases +setGeneric("backendParallelFactor", def = function(object, ...) + standardGeneric("backendParallelFactor")) +#' @rdname hidden_aliases setMethod("bin", "numeric", MsCoreUtils::bin) setGeneric("combinePeaks", function(object, ...) standardGeneric("combinePeaks")) diff --git a/R/MsBackend.R b/R/MsBackend.R index 79970a57..bdda7456 100644 --- a/R/MsBackend.R +++ b/R/MsBackend.R @@ -9,6 +9,8 @@ #' @aliases supportsSetBackend #' @aliases backendBpparam #' @aliases backendInitialize +#' @aliases backendParallelFactor,MsBackendMzR-method +#' @aliases backendParallelFactor,MsBackendHdf5Peaks-method #' #' @description #' @@ -212,7 +214,9 @@ #' because they contain a connection to a database that can not be #' shared across processes) should extend this method to return only #' `SerialParam()` and hence disable parallel processing for (most) -#' methods and functions. +#' methods and functions. See also `backendParallelFactor` for a +#' function to provide a preferred splitting of the backend for parallel +#' processing. #' #' - `backendInitialize`: initialises the backend. This method is #' supposed to be called rights after creating an instance of the @@ -233,6 +237,14 @@ #' instance. All objects to be merged have to be of the same type (e.g. #' [MsBackendDataFrame()]). #' +#' - `backendParallelFactor`: returns a `factor` defining an optimal +#' (preferred) way how the backend can be split for parallel processing +#' used for all peak data accessor or data manipulation functions. +#' The default implementation returns a factor of length 0 (`factor()`) +#' providing thus no default splitting. A `backendParallelFactor` for +#' `MsBackendMzR` on the other hand returns `factor(dataStorage(object))` +#' hence suggesting to split the object by data file. +#' #' - `dataOrigin`: gets a `character` of length equal to the number of spectra #' in `object` with the *data origin* of each spectrum. This could e.g. be #' the mzML file from which the data was read. @@ -849,6 +861,13 @@ setMethod("backendMerge", "MsBackend", function(object, ...) { stop("Not implemented for ", class(object), ".") }) +#' @exportMethod backendParallelFactor +#' +#' @rdname MsBackend +setMethod("backendParallelFactor", "MsBackend", function(object, ...) { + factor() +}) + #' @rdname MsBackend setMethod("export", "MsBackend", function(object, ...) { stop(class(object), " does not support export of data; please provide a ", diff --git a/R/MsBackendHdf5Peaks.R b/R/MsBackendHdf5Peaks.R index feb82fcf..e5482803 100644 --- a/R/MsBackendHdf5Peaks.R +++ b/R/MsBackendHdf5Peaks.R @@ -306,3 +306,7 @@ setMethod("backendMerge", "MsBackendHdf5Peaks", function(object, ...) { validObject(res) res }) + +setMethod("backendParallelFactor", "MsBackendHdf5Peaks", function(object) { + factor(dataStorage(object), levels = unique(dataStorage(object))) +}) diff --git a/R/MsBackendMzR.R b/R/MsBackendMzR.R index a0286dab..74b00308 100644 --- a/R/MsBackendMzR.R +++ b/R/MsBackendMzR.R @@ -210,3 +210,7 @@ setMethod("export", "MsBackendMzR", function(object, x, file = tempfile(), MoreArgs = list(format = format, copy = copy), BPPARAM = BPPARAM) }) + +setMethod("backendParallelFactor", "MsBackendMzR", function(object) { + factor(dataStorage(object), levels = unique(dataStorage(object))) +}) diff --git a/R/Spectra-functions.R b/R/Spectra-functions.R index 6f1302a0..1de4e3ba 100644 --- a/R/Spectra-functions.R +++ b/R/Spectra-functions.R @@ -77,11 +77,11 @@ NULL #' @param ... optional additional arguments to `FUN`. #' #' @param f `factor` or `vector` that can be coerced to one defining how the -#' data should be split for parallel processing. Set to `NULL` to disable -#' parallel processing. +#' data should be split for parallel processing. Set to `NULL` or +#' `factor()` to disable splitting and parallel processing. #' -#' @param columns `character` defining the columns that should be returned. This -#' will be passed to the backend's `peaksData` function. +#' @param columns `character` defining the columns that should be returned. +#' This will be passed to the backend's `peaksData` function. #' #' @param BPPARAM parallel processing setup. #' @@ -91,11 +91,40 @@ NULL #' #' @importMethodsFrom BiocParallel bplapply #' +#' @note +#' +#' Who's calling `.peaksapply`: +#' +#' In *Spectra.R*: +#' +#' - `peaksData`: would pass optional parameter `f` down. +#' - `as,Spectra,list`. +#' - `intensity`: would pass optional parameter `f` down. +#' - `ionCount`. +#' - `isCentroided`. +#' - `isEmpty`. +#' - `mz`: would pass optional parameter `f` down. +#' - `lengths`. +#' - `$`. +#' - `bin`. +#' +#' In *Spectra-functions.R*: +#' +#' - `.compare_spectra_chunk`: has its own `chunkSize` parameter and ignores +#' any other splitting. Does also some weird internal chunking by +#' `dataStorage` that we might want to skip/disable (or use +#' `backendParallelFactor` instead). +#' - `.compare_spectra_self`: does not do any splitting. +#' - `.combine_spectra`: uses `f = x$dataStorage` to split. Needs to be fixed. +#' - `.estimate_precursor_intensity`: disables parallel/chunk wise processing. +#' But it's invoked splitting based on `dataOrigin`, so, should be OK. +#' #' @noRd .peaksapply <- function(object, FUN = NULL, ..., spectraVariables = .processingQueueVariables(object), - f = dataStorage(object), - columns = c("mz", "intensity"), BPPARAM = bpparam()) { + f = .parallel_processing_factor(object), + columns = c("mz", "intensity"), + BPPARAM = backendBpparam(object)) { len <- length(object) lf <- length(f) if (!len) @@ -179,36 +208,44 @@ NULL #' @export applyProcessing #' #' @rdname Spectra -applyProcessing <- function(object, f = dataStorage(object), +applyProcessing <- function(object, f = processingChunkFactor(object), BPPARAM = bpparam(), ...) { - if (!length(object@processingQueue)) + queue <- object@processingQueue + if (!length(queue)) return(object) if (isReadOnly(object@backend)) stop(class(object@backend), " is read-only. 'applyProcessing' works ", "only with backends that support writing data.") BPPARAM <- backendBpparam(object@backend, BPPARAM) - if (!is.factor(f)) - f <- factor(f, levels = unique(f)) - if (length(f) != length(object)) - stop("length 'f' has to be equal to the length of 'object' (", - length(object), ")") + svars <- .processingQueueVariables(object) pv <- peaksVariables(object) - bknds <- bplapply( - split(object@backend, f = f), function(z, queue, pv, svars) { - if (length(svars)) - spd <- as.data.frame(spectraData(z, columns = svars)) - else spd <- NULL - peaksData(z) <- .apply_processing_queue( - peaksData(z, columns = pv), spd, queue) - z - }, queue = object@processingQueue, pv = pv, - svars = .processingQueueVariables(object), - BPPARAM = BPPARAM) - bknds <- backendMerge(bknds) - if (is.unsorted(f)) - bknds <- bknds[order(unlist(split(seq_along(bknds), f), - use.names = FALSE))] - object@backend <- bknds + if (length(f)) { + if (!is.factor(f)) + f <- factor(f, levels = unique(f)) + if (length(f) != length(object)) + stop("length 'f' has to be equal to the length of 'object' (", + length(object), ")") + bknds <- bplapply( + split(object@backend, f = f), function(z, queue, pv, svars) { + if (length(svars)) + spd <- as.data.frame(spectraData(z, columns = svars)) + else spd <- NULL + peaksData(z) <- .apply_processing_queue( + peaksData(z, columns = pv), spd, queue) + z + }, queue = queue, pv = pv, svars = svars, BPPARAM = BPPARAM) + bknds <- backendMerge(bknds) + if (is.unsorted(f)) + bknds <- bknds[order(unlist(split(seq_along(bknds), f), + use.names = FALSE))] + object@backend <- bknds + } else { + if (length(svars)) + spd <- as.data.frame(spectraData(object@backend, columns = svars)) + else spd <- NULL + peaksData(object@backend) <- .apply_processing_queue( + peaksData(object@backend, columns = pv), spd, queue) + } object@processing <- .logging(object@processing, "Applied processing queue with ", length(object@processingQueue), @@ -310,12 +347,12 @@ applyProcessing <- function(object, f = dataStorage(object), bppx <- backendBpparam(x@backend, BPPARAM) if (is(bppx, "SerialParam")) fx <- NULL - else fx <- factor(dataStorage(x)) + else fx <- backendParallelFactor(x@backend) pqvx <- .processingQueueVariables(x) bppy <- backendBpparam(y@backend, BPPARAM) if (is(bppy, "SerialParam")) fy <- NULL - else fy <- factor(dataStorage(y)) + else fy <- backendParallelFactor(y@backend) pqvy <- .processingQueueVariables(y) if (nx <= chunkSize && ny <= chunkSize) { mat <- .peaks_compare( @@ -341,14 +378,14 @@ applyProcessing <- function(object, f = dataStorage(object), for (x_chunk in x_chunks) { x_buff <- x[x_chunk] if (length(fx)) - fxc <- factor(dataStorage(x_buff)) + fxc <- backendParallelFactor(x_buff@backend) else fxc <- NULL x_peaks <- .peaksapply(x_buff, f = fxc, spectraVariables = pqvx, BPPARAM = bppx) for (y_chunk in y_chunks) { y_buff <- y[y_chunk] if (length(fy)) - fyc <- factor(dataStorage(y_buff)) + fyc <- backendParallelFactor(y_buff@backend) else fyc <- NULL mat[x_chunk, y_chunk] <- .peaks_compare( x_peaks, .peaksapply(y_buff, f = fyc, spectraVariables = pqvy, @@ -460,7 +497,8 @@ applyProcessing <- function(object, f = dataStorage(object), x_new <- setBackend(x_new, MsBackendMemory()) bpp <- SerialParam() peaksData(x_new@backend) <- lapply( - split(.peaksapply(x, f = NULL, BPPARAM = bpp), f = f), FUN = FUN, ...) + split(.peaksapply(x, f = factor(), BPPARAM = bpp), f = f), + FUN = FUN, ...) x_new@processingQueue <- list() validObject(x_new) x_new @@ -1008,3 +1046,138 @@ filterPrecursorPeaks <- function(object, tolerance = 0, ppm = 20, object@processing, "Filter: remove peaks ", msg) object } + +#' @title Define a factor for parallel access to peaks data +#' +#' @description +#' +#' Define a factor to split a `Spectra` for parallel processing based on +#' parallel processing setup, +#' +#' If `processingChunkSize(x)` is defined and its value is smaller then +#' `length(x)`, a factor depending on this chunk size is returned. +#' Otherwise `backendParallelFactor(x)` is returned which is the optimal +#' splitting for parallel processing suggested by the backend. +#' +#' Properties/considerations: +#' +#' - in-memory backends: don't split if not requested by the user (e.g. by +#' specifying `f` or `chunkSize`. +#' - on-disk backends: file-based backends, such as `MsBackendMzR`: perform +#' per file parallel processing if `f` or `chunkSize` is not defined. +#' Other on-disk backends: only if requested by the user. +#' +#' @param x `Spectra` object. +#' +#' @param chunkSize `integer` defining the size of chunks into which `x` should +#' be split. +#' +#' @return `factor` with the desired way how to split `x` for chunk-wise +#' processing (or `factor()` to disable). +#' +#' @author Johannes Rainer +#' +#' @noRd +.parallel_processing_factor <- function(x, chunkSize = processingChunkSize(x)) { + lx <- length(x) + if (is.finite(chunkSize) && chunkSize < lx) + .chunk_factor(lx, chunkSize = chunkSize) + else backendParallelFactor(x@backend) +} + +#' @title Parallel and chunk-wise processing of `Spectra` +#' +#' @description +#' +#' Many operations on `Spectra` objects, specifically those working with +#' the actual MS data (peaks data), allow a chunk-wise processing in which +#' the `Spectra` is splitted into smaller parts (chunks) that are +#' iteratively processed. This enables parallel processing of the data (by +#' data chunk) and also reduces the memory demand since only the MS data +#' of the currently processed subset is loaded into memory and processed. +#' This chunk-wise processing, which is by default disabled, can be enabled +#' by setting the processing chunk size of a `Spectra` with the +#' `processingChunkSize` function to a value which is smaller than the +#' length of the `Spectra` object. Setting `processingChunkSize(sps) <- 1000` +#' will cause any data manipulation operation on the `sps`, such as +#' `filterIntensity` or `bin`, to be performed eventually in parallel for +#' sets of 1000 spectra in each iteration. +#' +#' Such chunk-wise processing is specifically useful for `Spectra` objects +#' using an *on-disk* backend or for very large experiments. For small data +#' sets or `Spectra` using an in-memory backend, a direct processing might +#' however be more efficient. Setting the chunk size to `Inf` will disable +#' the chunk-wise processing. +#' +#' For some backends a certain type of splitting and chunk-wise processing +#' might be preferable. The `MsBackendMzR` backend for example needs to load +#' the MS data from the original (mzML) files, hence chunk-wise processing +#' on a per-file basis would be ideal. The [backendParallelFactor()] function +#' for `MsBackend` allows backends to suggest a preferred splitting of the +#' data by returning a `factor` defining the respective data chunks. The +#' `MsBackendMzR` returns for example a `factor` based on the *dataStorage* +#' spectra variable. A `factor` of length 0 is returned if no particular +#' preferred splitting should be performed. The suggested chunk definition +#' will be used if no finite `processingChunkSize` is defined. Setting +#' the `processingChunkSize` overrides `backendParallelFactor`. +#' +#' See the *Large-scale data handling and processing with Spectra* for more +#' information and examples. +#' +#' Functions to configure parallel or chunk-wise processing: +#' +#' - `processingChunkSize`: allows to get or set the size of the chunks for +#' parallel processing or chunk-wise processing of a `Spectra` in general. +#' With a value of `Inf` (the default) no chunk-wise processing will be +#' performed. +#' +#' - `processingChunkFactor`: returns a `factor` defining the chunks into +#' which a `Spectra` will be split for chunk-wise (parallel) processing. +#' A `factor` of length 0 indicates that no chunk-wise processing will be +#' performed. +#' +#' @note +#' +#' Some backends might not support parallel processing at all. +#' For these, the `backendBpparam` function will always return a +#' `SerialParam()` independently on how parallel processing was defined. +#' +#' @param x `Spectra`. +#' +#' @param value `integer(1)` defining the chunk size. +#' +#' @return `processingChunkSize` returns the currently defined processing +#' chunk size (or `Inf` if it is not defined). `processingChunkFactor` +#' returns a `factor` defining the chunks into which `x` will be split +#' for (parallel) chunk-wise processing or a `factor` of length 0 if +#' no splitting is defined. +#' +#' @author Johannes Rainer +#' +#' @export +processingChunkSize <- function(x) { + if (.hasSlot(x, "processingChunkSize")) + x@processingChunkSize + else Inf +} + +#' @rdname processingChunkSize +#' +#' @export +`processingChunkSize<-` <- function(x, value) { + if (length(value) != 1L) + stop("'value' has to be of length 1") + if (!.hasSlot(x, "processingChunkSize")) + x <- updateObject(x) + x@processingChunkSize <- value + x +} + +#' @rdname processingChunkSize +#' +#' @export +processingChunkFactor <- function(x) { + if (!inherits(x, "Spectra")) + stop("'x' is supposed to be a 'Spectra' object") + .parallel_processing_factor(x) +} diff --git a/R/Spectra.R b/R/Spectra.R index 976492bf..ef987705 100644 --- a/R/Spectra.R +++ b/R/Spectra.R @@ -29,6 +29,9 @@ NULL #' `applyProcessing` function. See the *Data manipulation and analysis #' methods* section below for more details. #' +#' For more information on parallel or chunk-wise processing (especially +#' helpful for very large data sets) see [processingChunkSize()]. +#' #' To apply arbitrary functions to a `Spectra` use the `spectrapply` function #' (or directly [chunkapply()] for chunk-wise processing). See description of #' the `spectrapply` function below for details. @@ -749,38 +752,6 @@ NULL #' Parameter `msLevel.` allows to apply this to only spectra of certain MS #' level(s). #' -#' @section Parallel processing: -#' -#' Some `Spectra` functions have build-in parallel processing that can be -#' configured by passing the parallel processing setup with the `BPPARAM` -#' function argument (which defaults to `BPPARAM = bpparam()`, thus uses -#' the default set up). Most functions have an additional parameter `f` that -#' allows to define how `Spectra` will be split to perform parallel processing. -#' This parameter `f` defaults to `f = dataStorage(object)` and hence -#' parallel processing is performed *by file* (if a file-based, on-disk -#' backend such as `MsBackendMzR` is used). Some `MsBackend` classes might -#' however not support parallel processing. The `backendBpparam` function -#' allows to evaluate wheter a `Spectra` (respectively its `MsBackend`) -#' supports a certain parallel processing setup. Calling -#' `backendBpparam(sps, BPPARAM = MulticoreParam(3))` on a `Spectra` object -#' `sps` would return `SerialParam()` in case the backend of the `Spectra` -#' object does not support parallel processing. All functions listed below -#' use this same function to eventually disable parallel processing to -#' avoid failure of a function call. -#' -#' The functions with build-in parallel processing capabilities are: -#' -#' - `applyProcessing`. -#' - `combineSpectra`. -#' - `containsMz` (does not provide a parameter `f`, but performs parallel -#' processing separate for `dataStorage`). -#' - `containsNeutralLoss` (same as `containsMz`). -#' - `estimatePrecursorIntensity`. -#' - `setBackend`. -#' - `Spectra` (that passes the `BPPARAM` to the `backendInitialize` of the -#' used `MsBackend`). -#' - `spectrapply`. -#' #' #' @return See individual method description for the return value. #' @@ -856,6 +827,8 @@ NULL #' splitted. For `filterPrecursorScan`: #' defining which spectra belong to the same original data file (sample). #' Defaults to `f = dataOrigin(x)`. +#' For `intensity`, `mz` and `peaksData`: factor defining how data should +#' be chunk-wise loaded an processed. Defaults to [processingChunkFactor()]. #' #' @param FUN For `addProcessing`: function to be applied to the peak matrix #' of each spectrum in `object`. For `compareSpectra`: function to compare @@ -1478,9 +1451,11 @@ setClass( processing = "character", ## metadata metadata = "list", + processingChunkSize = "numeric", version = "character" ), - prototype = prototype(version = "0.2") + prototype = prototype(version = "0.2", + processingChunkSize = Inf) ) setValidity("Spectra", function(object) { @@ -1574,7 +1549,7 @@ setMethod("Spectra", "ANY", function(object, processingQueue = list(), #' @exportMethod setBackend setMethod( "setBackend", c("Spectra", "MsBackend"), - function(object, backend, f = dataStorage(object), ..., + function(object, backend, f = processingChunkFactor(object), ..., BPPARAM = bpparam()) { backend_class <- class(object@backend) BPPARAM <- backendBpparam(object@backend, BPPARAM) @@ -1585,13 +1560,11 @@ setMethod( bknds <- backendInitialize( backend, data = spectraData(object@backend), ...) } else { - f <- force(factor(f, levels = unique(f))) - if (length(f) != length(object)) - stop("length of 'f' has to match the length of 'object'") - if (length(levels(f)) == 1L || inherits(BPPARAM, "SerialParam")) { - bknds <- backendInitialize( - backend, data = spectraData(object@backend), ...) - } else { + if (!is.factor(f)) + f <- force(factor(f, levels = unique(f))) + if (length(f) && length(levels(f) > 1)) { + if (length(f) != length(object)) + stop("length of 'f' has to match the length of 'object'") bknds <- bplapply( split(object@backend, f = f), function(z, ...) { @@ -1605,6 +1578,9 @@ setMethod( if (is.unsorted(f)) bknds <- bknds[order(unlist(split(seq_along(bknds), f), use.names = FALSE))] + } else { + bknds <- backendInitialize( + backend, data = spectraData(object@backend), ...) } } object@backend <- bknds @@ -1656,14 +1632,12 @@ setMethod("acquisitionNum", "Spectra", function(object) #' @rdname Spectra setMethod( "peaksData", "Spectra", - function(object, columns = c("mz", "intensity"), ..., BPPARAM = bpparam()) { - BPPARAM <- backendBpparam(object, BPPARAM) - if (is(BPPARAM, "SerialParam")) - f <- NULL - else f <- factor(dataStorage(object)) - SimpleList(.peaksapply(object, f = f, columns = columns, - BPPARAM = BPPARAM, ...)) -}) + function(object, columns = c("mz", "intensity"), + f = processingChunkFactor(object), ..., BPPARAM = bpparam()) { + if (length(object@processingQueue) || length(f)) + SimpleList(.peaksapply(object, columns = columns, f = f)) + else SimpleList(peaksData(object@backend, columns = columns)) + }) #' @rdname Spectra setMethod("peaksVariables", "Spectra", function(object) @@ -1671,11 +1645,7 @@ setMethod("peaksVariables", "Spectra", function(object) #' @importFrom methods setAs setAs("Spectra", "list", function(from, to) { - BPPARAM <- backendBpparam(from) - if (is(BPPARAM, "SerialParam")) - f <- NULL - else f <- factor(dataStorage(from)) - .peaksapply(from, f = f, BPPARAM = BPPARAM) + .peaksapply(from) }) setAs("Spectra", "SimpleList", function(from, to) { @@ -1724,49 +1694,36 @@ setMethod("dropNaSpectraVariables", "Spectra", function(object) { }) #' @rdname Spectra -setMethod("intensity", "Spectra", function(object, ...) { - if (length(object@processingQueue)) - NumericList(.peaksapply(object, FUN = function(z, ...) z[, 2], ...), - compress = FALSE) - else intensity(object@backend) # Disables also parallel proc. (issue #44) +setMethod("intensity", "Spectra", function(object, + f = processingChunkFactor(object), + ...) { + if (length(object@processingQueue) || length(f)) + NumericList(.peaksapply(object, FUN = function(z, ...) z[, 2], + f = f, ...), compress = FALSE) + else intensity(object@backend) }) #' @rdname Spectra setMethod("ionCount", "Spectra", function(object) { - BPPARAM <- backendBpparam(object) - if (is(BPPARAM, "SerialParam")) - f <- NULL - else f <- factor(dataStorage(object)) if (length(object)) - unlist(.peaksapply(object, f = f, BPPARAM = BPPARAM, - FUN = function(pks, ...) - sum(pks[, 2], na.rm = TRUE)), - use.names = FALSE) + unlist(.peaksapply( + object, FUN = function(pks, ...) sum(pks[, 2], na.rm = TRUE)), + use.names = FALSE) else numeric() }) #' @rdname Spectra setMethod("isCentroided", "Spectra", function(object, ...) { - BPPARAM <- backendBpparam(object) - if (is(BPPARAM, "SerialParam")) - f <- NULL - else f <- factor(dataStorage(object)) if (length(object)) - unlist(.peaksapply(object, f = f, BPPARAM = BPPARAM, - FUN = .peaks_is_centroided), + unlist(.peaksapply(object, FUN = .peaks_is_centroided), use.names = FALSE) else logical() }) #' @rdname Spectra setMethod("isEmpty", "Spectra", function(x) { - BPPARAM <- backendBpparam(x) - if (is(BPPARAM, "SerialParam")) - f <- NULL - else f <- factor(dataStorage(x)) if (length(x)) - unlist(.peaksapply(x, f = f, BPPARAM = BPPARAM, - FUN = function(pks, ...) nrow(pks) == 0), + unlist(.peaksapply(x, FUN = function(pks, ...) nrow(pks) == 0), use.names = FALSE) else logical() }) @@ -1816,6 +1773,7 @@ setMethod("containsMz", "Spectra", function(object, mz = numeric(), return(rep(NA, length(object))) mz <- unique(sort(mz)) BPPARAM <- backendBpparam(object@backend, BPPARAM) + ## TODO: fix to use .peaksapply instead. if (is(BPPARAM, "SerialParam")) .has_mz(object, mz, tolerance = tolerance, ppm = ppm, condFun = cond_fun, parallel = BPPARAM) @@ -1836,6 +1794,7 @@ setMethod("containsNeutralLoss", "Spectra", function(object, neutralLoss = 0, tolerance = 0, ppm = 20, BPPARAM = bpparam()) { BPPARAM <- backendBpparam(object@backend, BPPARAM) + ## TODO: FIX me to use chunk size. if (is(BPPARAM, "SerialParam")) { .has_mz_each(object, precursorMz(object) - neutralLoss, tolerance = tolerance, ppm = ppm, parallel = BPPARAM) @@ -1879,19 +1838,21 @@ setMethod("length", "Spectra", function(x) length(x@backend)) setMethod("msLevel", "Spectra", function(object) msLevel(object@backend)) #' @rdname Spectra -setMethod("mz", "Spectra", function(object, ...) { - if (length(object@processingQueue)) - NumericList(.peaksapply(object, FUN = function(z, ...) z[, 1], ...), - compress = FALSE) - else mz(object@backend) # Disables also parallel processing (issue #44) +setMethod("mz", "Spectra", function(object, f = processingChunkFactor(object), + ...) { + if (length(object@processingQueue) || length(f)) + NumericList(.peaksapply(object, FUN = function(z, ...) z[, 1], + f = f, ...), compress = FALSE) + else mz(object@backend) }) #' @rdname Spectra #' #' @exportMethod lengths setMethod("lengths", "Spectra", function(x, use.names = FALSE) { + f <- .parallel_processing_factor(x) if (length(x)) { - if (length(x@processingQueue)) + if (length(x@processingQueue) || length(f)) unlist(.peaksapply(x, FUN = function(pks, ...) nrow(pks)), use.names = use.names) else lengths(x@backend, use.names = use.names) diff --git a/data/fft_spectrum.RData b/data/fft_spectrum.RData index e122b4a8..c4b47c09 100644 Binary files a/data/fft_spectrum.RData and b/data/fft_spectrum.RData differ diff --git a/man/MsBackend.Rd b/man/MsBackend.Rd index 3d6a1ba5..a728a05f 100644 --- a/man/MsBackend.Rd +++ b/man/MsBackend.Rd @@ -15,10 +15,13 @@ \alias{supportsSetBackend} \alias{backendBpparam} \alias{backendInitialize} +\alias{backendParallelFactor,MsBackendMzR-method} +\alias{backendParallelFactor,MsBackendHdf5Peaks-method} \alias{backendBpparam,MsBackend-method} \alias{backendInitialize,MsBackend-method} \alias{backendMerge,list-method} \alias{backendMerge,MsBackend-method} +\alias{backendParallelFactor,MsBackend-method} \alias{export,MsBackend-method} \alias{acquisitionNum,MsBackend-method} \alias{peaksData,MsBackend-method} @@ -104,6 +107,8 @@ \S4method{backendMerge}{MsBackend}(object, ...) +\S4method{backendParallelFactor}{MsBackend}(object, ...) + \S4method{export}{MsBackend}(object, ...) \S4method{acquisitionNum}{MsBackend}(object) @@ -476,7 +481,9 @@ by the backend. Backends not supporting parallel processing (e.g. because they contain a connection to a database that can not be shared across processes) should extend this method to return only \code{SerialParam()} and hence disable parallel processing for (most) -methods and functions. +methods and functions. See also \code{backendParallelFactor} for a +function to provide a preferred splitting of the backend for parallel +processing. \item \code{backendInitialize}: initialises the backend. This method is supposed to be called rights after creating an instance of the backend class and should prepare the backend (e.g. set the data @@ -494,6 +501,13 @@ spectra variable \code{dataStorage}. \item \code{backendMerge}: merges (combines) \code{MsBackend} objects into a single instance. All objects to be merged have to be of the same type (e.g. \code{\link[=MsBackendDataFrame]{MsBackendDataFrame()}}). +\item \code{backendParallelFactor}: returns a \code{factor} defining an optimal +(preferred) way how the backend can be split for parallel processing +used for all peak data accessor or data manipulation functions. +The default implementation returns a factor of length 0 (\code{factor()}) +providing thus no default splitting. A \code{backendParallelFactor} for +\code{MsBackendMzR} on the other hand returns \code{factor(dataStorage(object))} +hence suggesting to split the object by data file. \item \code{dataOrigin}: gets a \code{character} of length equal to the number of spectra in \code{object} with the \emph{data origin} of each spectrum. This could e.g. be the mzML file from which the data was read. diff --git a/man/Spectra.Rd b/man/Spectra.Rd index 2c22a0d2..2152d640 100644 --- a/man/Spectra.Rd +++ b/man/Spectra.Rd @@ -106,7 +106,12 @@ \alias{combinePeaks,Spectra-method} \title{The Spectra class to manage and access MS data} \usage{ -applyProcessing(object, f = dataStorage(object), BPPARAM = bpparam(), ...) +applyProcessing( + object, + f = processingChunkFactor(object), + BPPARAM = bpparam(), + ... +) concatenateSpectra(x, ...) @@ -189,7 +194,13 @@ filterPrecursorPeaks( BPPARAM = bpparam() ) -\S4method{setBackend}{Spectra,MsBackend}(object, backend, f = dataStorage(object), ..., BPPARAM = bpparam()) +\S4method{setBackend}{Spectra,MsBackend}( + object, + backend, + f = processingChunkFactor(object), + ..., + BPPARAM = bpparam() +) \S4method{c}{Spectra}(x, ...) @@ -199,7 +210,13 @@ filterPrecursorPeaks( \S4method{acquisitionNum}{Spectra}(object) -\S4method{peaksData}{Spectra}(object, columns = c("mz", "intensity"), ..., BPPARAM = bpparam()) +\S4method{peaksData}{Spectra}( + object, + columns = c("mz", "intensity"), + f = processingChunkFactor(object), + ..., + BPPARAM = bpparam() +) \S4method{peaksVariables}{Spectra}(object) @@ -219,7 +236,7 @@ filterPrecursorPeaks( \S4method{dropNaSpectraVariables}{Spectra}(object) -\S4method{intensity}{Spectra}(object, ...) +\S4method{intensity}{Spectra}(object, f = processingChunkFactor(object), ...) \S4method{ionCount}{Spectra}(object) @@ -269,7 +286,7 @@ filterPrecursorPeaks( \S4method{msLevel}{Spectra}(object) -\S4method{mz}{Spectra}(object, ...) +\S4method{mz}{Spectra}(object, f = processingChunkFactor(object), ...) \S4method{lengths}{Spectra}(x, use.names = FALSE) @@ -477,7 +494,9 @@ For \code{combineSpectra}: \code{factor} defining the grouping of the spectra th should be combined. For \code{spectrapply}: \code{factor} how \code{object} should be splitted. For \code{filterPrecursorScan}: defining which spectra belong to the same original data file (sample). -Defaults to \code{f = dataOrigin(x)}.} +Defaults to \code{f = dataOrigin(x)}. +For \code{intensity}, \code{mz} and \code{peaksData}: factor defining how data should +be chunk-wise loaded an processed. Defaults to \code{\link[=processingChunkFactor]{processingChunkFactor()}}.} \item{BPPARAM}{Parallel setup configuration. See \code{\link[=bpparam]{bpparam()}} for more information. This is passed directly to the \code{\link[=backendInitialize]{backendInitialize()}} method @@ -759,6 +778,9 @@ and \code{\link[=MsBackendHdf5Peaks]{MsBackendHdf5Peaks()}}) it is possible to a \code{applyProcessing} function. See the \emph{Data manipulation and analysis methods} section below for more details. +For more information on parallel or chunk-wise processing (especially +helpful for very large data sets) see \code{\link[=processingChunkSize]{processingChunkSize()}}. + To apply arbitrary functions to a \code{Spectra} use the \code{spectrapply} function (or directly \code{\link[=chunkapply]{chunkapply()}} for chunk-wise processing). See description of the \code{spectrapply} function below for details. @@ -1399,41 +1421,6 @@ level(s). } } -\section{Parallel processing}{ - - -Some \code{Spectra} functions have build-in parallel processing that can be -configured by passing the parallel processing setup with the \code{BPPARAM} -function argument (which defaults to \code{BPPARAM = bpparam()}, thus uses -the default set up). Most functions have an additional parameter \code{f} that -allows to define how \code{Spectra} will be split to perform parallel processing. -This parameter \code{f} defaults to \code{f = dataStorage(object)} and hence -parallel processing is performed \emph{by file} (if a file-based, on-disk -backend such as \code{MsBackendMzR} is used). Some \code{MsBackend} classes might -however not support parallel processing. The \code{backendBpparam} function -allows to evaluate wheter a \code{Spectra} (respectively its \code{MsBackend}) -supports a certain parallel processing setup. Calling -\code{backendBpparam(sps, BPPARAM = MulticoreParam(3))} on a \code{Spectra} object -\code{sps} would return \code{SerialParam()} in case the backend of the \code{Spectra} -object does not support parallel processing. All functions listed below -use this same function to eventually disable parallel processing to -avoid failure of a function call. - -The functions with build-in parallel processing capabilities are: -\itemize{ -\item \code{applyProcessing}. -\item \code{combineSpectra}. -\item \code{containsMz} (does not provide a parameter \code{f}, but performs parallel -processing separate for \code{dataStorage}). -\item \code{containsNeutralLoss} (same as \code{containsMz}). -\item \code{estimatePrecursorIntensity}. -\item \code{setBackend}. -\item \code{Spectra} (that passes the \code{BPPARAM} to the \code{backendInitialize} of the -used \code{MsBackend}). -\item \code{spectrapply}. -} -} - \examples{ ## Create a Spectra providing a `DataFrame` containing the spectrum data. diff --git a/man/hidden_aliases.Rd b/man/hidden_aliases.Rd index 8b72d1e8..127c07e1 100644 --- a/man/hidden_aliases.Rd +++ b/man/hidden_aliases.Rd @@ -8,6 +8,7 @@ \alias{[,MsBackendDataFrame-method} \alias{ppm} \alias{backendMerge} +\alias{backendParallelFactor} \alias{bin,numeric-method} \alias{containsMz} \alias{containsNeutralLoss} @@ -172,6 +173,8 @@ \usage{ backendMerge(object, ...) +backendParallelFactor(object, ...) + \S4method{bin}{numeric}( x, y, diff --git a/man/processingChunkSize.Rd b/man/processingChunkSize.Rd new file mode 100644 index 00000000..88c2dbcc --- /dev/null +++ b/man/processingChunkSize.Rd @@ -0,0 +1,82 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/Spectra-functions.R +\name{processingChunkSize} +\alias{processingChunkSize} +\alias{processingChunkSize<-} +\alias{processingChunkFactor} +\title{Parallel and chunk-wise processing of \code{Spectra}} +\usage{ +processingChunkSize(x) + +processingChunkSize(x) <- value + +processingChunkFactor(x) +} +\arguments{ +\item{x}{\code{Spectra}.} + +\item{value}{\code{integer(1)} defining the chunk size.} +} +\value{ +\code{processingChunkSize} returns the currently defined processing +chunk size (or \code{Inf} if it is not defined). \code{processingChunkFactor} +returns a \code{factor} defining the chunks into which \code{x} will be split +for (parallel) chunk-wise processing or a \code{factor} of length 0 if +no splitting is defined. +} +\description{ +Many operations on \code{Spectra} objects, specifically those working with +the actual MS data (peaks data), allow a chunk-wise processing in which +the \code{Spectra} is splitted into smaller parts (chunks) that are +iteratively processed. This enables parallel processing of the data (by +data chunk) and also reduces the memory demand since only the MS data +of the currently processed subset is loaded into memory and processed. +This chunk-wise processing, which is by default disabled, can be enabled +by setting the processing chunk size of a \code{Spectra} with the +\code{processingChunkSize} function to a value which is smaller than the +length of the \code{Spectra} object. Setting \code{processingChunkSize(sps) <- 1000} +will cause any data manipulation operation on the \code{sps}, such as +\code{filterIntensity} or \code{bin}, to be performed eventually in parallel for +sets of 1000 spectra in each iteration. + +Such chunk-wise processing is specifically useful for \code{Spectra} objects +using an \emph{on-disk} backend or for very large experiments. For small data +sets or \code{Spectra} using an in-memory backend, a direct processing might +however be more efficient. Setting the chunk size to \code{Inf} will disable +the chunk-wise processing. + +For some backends a certain type of splitting and chunk-wise processing +might be preferable. The \code{MsBackendMzR} backend for example needs to load +the MS data from the original (mzML) files, hence chunk-wise processing +on a per-file basis would be ideal. The \code{\link[=backendParallelFactor]{backendParallelFactor()}} function +for \code{MsBackend} allows backends to suggest a preferred splitting of the +data by returning a \code{factor} defining the respective data chunks. The +\code{MsBackendMzR} returns for example a \code{factor} based on the \emph{dataStorage} +spectra variable. A \code{factor} of length 0 is returned if no particular +preferred splitting should be performed. The suggested chunk definition +will be used if no finite \code{processingChunkSize} is defined. Setting +the \code{processingChunkSize} overrides \code{backendParallelFactor}. + +See the \emph{Large-scale data handling and processing with Spectra} for more +information and examples. + +Functions to configure parallel or chunk-wise processing: +\itemize{ +\item \code{processingChunkSize}: allows to get or set the size of the chunks for +parallel processing or chunk-wise processing of a \code{Spectra} in general. +With a value of \code{Inf} (the default) no chunk-wise processing will be +performed. +\item \code{processingChunkFactor}: returns a \code{factor} defining the chunks into +which a \code{Spectra} will be split for chunk-wise (parallel) processing. +A \code{factor} of length 0 indicates that no chunk-wise processing will be +performed. +} +} +\note{ +Some backends might not support parallel processing at all. +For these, the \code{backendBpparam} function will always return a +\code{SerialParam()} independently on how parallel processing was defined. +} +\author{ +Johannes Rainer +} diff --git a/tests/testthat/test_MsBackend.R b/tests/testthat/test_MsBackend.R index 99e579e9..3d3f7e28 100644 --- a/tests/testthat/test_MsBackend.R +++ b/tests/testthat/test_MsBackend.R @@ -71,3 +71,7 @@ test_that("backendBpparam,MsBackend works", { res <- backendBpparam(sciex_mzr, SerialParam()) expect_equal(res, SerialParam()) }) + +test_that("backendParallelFactor,MsBackend works", { + expect_equal(backendParallelFactor(MsBackendMemory()), factor()) +}) diff --git a/tests/testthat/test_MsBackendHdf5Peaks.R b/tests/testthat/test_MsBackendHdf5Peaks.R index e317412f..b7afdf37 100644 --- a/tests/testthat/test_MsBackendHdf5Peaks.R +++ b/tests/testthat/test_MsBackendHdf5Peaks.R @@ -403,3 +403,9 @@ test_that("dropNaSpectraVariables works with MsBackendHdf5", { expect_true(length(spectraVariables(res)) < length(spectraVariables(sciex_hd5))) }) + +test_that("backendParallelFactor,MsBackendHdf5Peaks", { + expect_equal(backendParallelFactor(sciex_hd5), + factor(dataStorage(sciex_hd5), + levels = unique(dataStorage(sciex_hd5)))) +}) diff --git a/tests/testthat/test_MsBackendMzR.R b/tests/testthat/test_MsBackendMzR.R index c66c4283..ff891738 100644 --- a/tests/testthat/test_MsBackendMzR.R +++ b/tests/testthat/test_MsBackendMzR.R @@ -564,3 +564,9 @@ test_that("dropNaSpectraVariables works with MsBackendMzR", { test_that("supportsSetBackend,MsBackendMzR", { expect_false(supportsSetBackend(MsBackendMzR())) }) + +test_that("backendParallelFactor,MsBackendMzR", { + expect_equal(backendParallelFactor(sciex_mzr), + factor(dataStorage(sciex_mzr), + levels = unique(dataStorage(sciex_mzr)))) +}) diff --git a/tests/testthat/test_Spectra-functions.R b/tests/testthat/test_Spectra-functions.R index a4491be1..ec73a72f 100644 --- a/tests/testthat/test_Spectra-functions.R +++ b/tests/testthat/test_Spectra-functions.R @@ -833,3 +833,31 @@ test_that("filterPrecursorPeaks,Spectra works", { lengths(x)[msLevel(x) > 1L])) expect_equal(lengths(res)[msLevel(x) == 1L], lengths(x)[msLevel(x) == 1L]) }) + +test_that("processingChunkSize works", { + s <- Spectra() + expect_equal(processingChunkSize(s), Inf) + processingChunkSize(s) <- 1000 + expect_equal(processingChunkSize(s), 1000) + expect_error(processingChunkSize(s) <- c(1, 2), "length 1") + expect_error(processingChunkSize(s) <- "A", "character") +}) + +test_that("processingChunkFactor works", { + s <- Spectra() + expect_equal(processingChunkFactor(s), factor()) + tmp <- Spectra(sciex_mzr) + + expect_equal(length(processingChunkFactor(tmp)), length(tmp)) + expect_true(is.factor(processingChunkFactor(tmp))) + + processingChunkSize(tmp) <- 1000 + res <- processingChunkFactor(tmp) + expect_true(is.factor(res)) + expect_true(length(res) == length(tmp)) + expect_equal(levels(res), c("1", "2")) + + expect_equal(.parallel_processing_factor(tmp), processingChunkFactor(tmp)) + + expect_error(processingChunkFactor("a"), "Spectra") +}) diff --git a/tests/testthat/test_Spectra.R b/tests/testthat/test_Spectra.R index e22462cc..37b655bb 100644 --- a/tests/testthat/test_Spectra.R +++ b/tests/testthat/test_Spectra.R @@ -1776,3 +1776,10 @@ test_that("combinePeaks,Spectra works", { msLevel = 1L, spectrumMsLevel = 1L) expect_equal(res_1, peaksData(res)[[1L]]) }) + +test_that("processingChunkSize works", { + expect_equal(processingChunkSize(sps_dia), Inf) + tmp <- sps_dia + processingChunkSize(tmp) <- 10 + expect_equal(processingChunkSize(tmp), 10) +}) diff --git a/vignettes/MsBackend.Rmd b/vignettes/MsBackend.Rmd index 40253623..ce0158b5 100644 --- a/vignettes/MsBackend.Rmd +++ b/vignettes/MsBackend.Rmd @@ -96,12 +96,21 @@ General conventions for MS data of a `Spectra` are: ## Notes on parallel processing -The default parallel processing of `Spectra` splits backends by their -`dataStorage` and performs the processing in parallel on the subsets. Not all -backends will support such parallel processing (or any parallel processing) and -thus unexpected errors might occur (depending on the user's settings). A new -function allowing to evaluate the parallel processing capabilities of a backend -will be added to help avoiding such issues. +For parallel processing, `Spectra` splits the backend based on a defined +`factor` and processes each in parallel (or *in serial* if a `SerialParam` is +used). The splitting `factor` can be defined for `Spectra` by setting the +parameter `processingChunkSize`. Alternatively, through the +`backendParallelFactor` function the backend can also *suggest* a `factor` that +should/could be used for splitting and parallel processing. The default +implementation for `backendParallelFactor` is to return an empty `factor` +(`factor()`) hence not suggesting any preferred +splitting. `backendParallelFactor` for `MsBackendMzR` on the other hand returns +a `factor` based on the data files the data is stored in (i.e. based on the +`dataStorage` of the MS data). + +Besides parallel processing, this chunk-wise processing can also reduce the +memory demand for operations, because only the peak data of the current chunk +needs to be realized in memory. # API @@ -1562,6 +1571,20 @@ setMethod("backendBpparam", signature = "MsBackend", ``` +### `backendParallelFactor` + +The `backendParallelFactor` allows a backend to suggest a preferred way how the +backend could be split for parallel processing. See also the notes on parallel +processing above for more information. The default implementation returns +`factor()` (i.e. a `factor` of length 0) hence not suggesting any splitting: + +```{r} +setMethod("backendParallelFactor", "MsBackend", function(object, ...) { + factor() +}) + +``` + ### `dropNaSpectraVariables` The `dropNaSpectraVariables` is supposed to allow removing all spectra variables diff --git a/vignettes/Spectra-large-scale.Rmd b/vignettes/Spectra-large-scale.Rmd new file mode 100644 index 00000000..89f96c5e --- /dev/null +++ b/vignettes/Spectra-large-scale.Rmd @@ -0,0 +1,242 @@ +--- +title: "Large-scale data handling and processing with Spectra" +output: + BiocStyle::html_document: + toc_float: true +vignette: > + %\VignetteIndexEntry{Large-scale data handling and processing with Spectra} + %\VignetteEngine{knitr::rmarkdown} + %\VignetteEncoding{UTF-8} + %\VignettePackage{Spectra} + %\VignetteDepends{Spectra,mzR,BiocStyle,msdata} +bibliography: references.bib +--- + +```{r style, echo = FALSE, results = 'asis', message=FALSE} +BiocStyle::markdown() +``` + +**Package**: `r Biocpkg("Spectra")`
+**Authors**: `r packageDescription("Spectra")[["Author"]] `
+**Last modified:** `r file.info("Spectra.Rmd")$mtime`
+**Compiled**: `r date()` + +```{r, echo = FALSE, message = FALSE} +library(Spectra) +library(BiocStyle) +``` + + +# Introduction + +The `r Biocpkg("Spectra")` package supports handling and processing of also very +large mass spectrometry (MS) data sets. Through dedicated backends, that only +load MS data when requested/needed, the memory demand can be minimized. Examples +for such backends are the `MsBackendMzR` and the `MsBackendOfflineSql` (defined +in the `r Biocpkg("MsBackendSql")` package). In addition, `Spectra` supports +chunk-wise data processing, hence only parts of the data are loaded into memory +and processed at a time. In this document we provide information on how large +scale data can be best processed with the *Spectra* package. + + +# Memory requirements of different data representations + +The *Spectra* package separates functionality to process and analyze MS data +(implemented for the `Spectra` class) from the code that defines how and where +the MS data is stored. For the latter, different implementations of the +`MsBackend` class are available, that either are optimized for performance (such +as the `MsBackendMemory` and `MsBackendDataFrame`) or for low memory requirement +(such as the `MsBackendMzR`, or the `MsBackendOfflineSql` implemented in the +`r Biocpkg("MsBackendSql")` package, that through the smallest possible memory +footprint enables also the analysis of very large data sets). Below we load MS +data from 4 test files into a `Spectra` using a `MsBackendMzR` backend. + +```{r} +library(Spectra) + +#' Define the file names from which to import the data +fls <- c( + system.file("TripleTOF-SWATH", "PestMix1_DDA.mzML", package = "msdata"), + system.file("TripleTOF-SWATH", "PestMix1_SWATH.mzML", package = "msdata"), + system.file("sciex", "20171016_POOL_POS_1_105-134.mzML", + package = "msdata"), + system.file("sciex", "20171016_POOL_POS_3_105-134.mzML", + package = "msdata")) + +#' Creating a Spectra object representing the MS data +sps_mzr <- Spectra(fls, source = MsBackendMzR()) +sps_mzr +``` + +The resulting `Spectra` uses a `MsBackendMzR` for data representation. This +backend does only load general spectra data into memory while the *full* MS data +(i.e., the *m/z* and intensity values of the individual mass peaks) is only +loaded when requested or needed. In contrast to an *in-memory* backend, the +memory footprint of this backend is thus lower. + +Below we create a `Spectra` that keeps the full data in memory by changing the +backend to a `MsBackendMemory` backend and compare the sizes of both objects. + +```{r} +sps_mem <- setBackend(sps_mzr, MsBackendMemory()) + +print(object.size(sps_mzr), units = "MB") +print(object.size(sps_mem), units = "MB") +``` + +Keeping the full data in memory requires thus considerably more memory. + +We next disable parallel processing for *Spectra* to allow an unbiased +estimation of memory usage. + +```{r} +#' Disable parallel processing globally +register(SerialParam()) +``` + + +# Chunk-wise and parallel processing + +Operations on peaks data are the most time and memory demanding tasks. These +generally apply a function to, or modify the *m/z* and/or intensity +values. Among these functions are for example functions that filter, remove or +combine mass peaks (such as `filterMzRange`, `filterIntensity` or +`combinePeaks`) or functions that perform calculations on the peaks data (such +as `bin` or `pickPeaks`) or also functions that provide information on, or +summarize spectra (such as `lengths` or `ionCount`). For all these functions, +the peaks data needs to be present in memory and on-disk backends, such as the +`MsBackendMzR`, need thus to first import the data from their data +storage. However, loading the full peaks data at once into memory might not be +possible for large data sets. Loading and processing the data in smaller chunks +would however reduce the memory demand and hence allow to process also such data +sets. For the `MsBackendMzR` and `MsBackendHdf5Peaks` backends the data is +automatically split and processed by the data storage files. For all other +backends chunk-wise processing can be enabled by defining the +`processingChunkSize` of a `Spectra`, i.e. the number of spectra for which peaks +data should be loaded and processed in each iteration. The +`processingChunkFactor` function can be used to evaluate how the data will be +split. Below we use this function to evaluate how chunk-wise processing would be +performed with the two `Spectra` objects. + +```{r} +processingChunkFactor(sps_mem) +``` + +For the `Spectra` with the in-memory backend an empty `factor()` is returned, +thus, no chunk-wise processing will be performed. We next evaluate whether the +`Spectra` with the `MsBackendMzR` on-disk backend would use chunk-wise +processing. + +```{r} +processingChunkFactor(sps_mzr) |> table() +``` + +The data would thus be split and processed by the original file, from which the +data is imported. We next specifically define the chunk-size for both `Spectra` +with the `processingChunkSize` function. + +```{r} +processingChunkSize(sps_mem) <- 3000 +processingChunkFactor(sps_mem) |> table() +``` + +After setting the chunk size, also the `Spectra` with the in-memory backend +would use chunk-wise processing. We repeat with the other `Spectra` object: + +```{r} +processingChunkSize(sps_mzr) <- 3000 +processingChunkFactor(sps_mzr) |> table() +``` + +The `Spectra` with the `MsBackendMzR` backend would now split the data in about +equally sized arbitrary chunks and no longer by original data file. Setting +`processingChunkSize` thus overrides any splitting suggested by the backend. + +After having set a `processingChunkSize`, any operation involving peaks data +will by default be performed in a chunk-wise manner. Thus, calling `ionCount` on +our `Spectra` will now split the data in chunks of 3000 spectra and sum the +intensities (per spectrum) chunk by chunk. + +```{r} +tic <- ionCount(sps_mem) +``` + +While chunk-wise processing reduces the memory demand of operations, the +splitting and merging of the data and results can negatively impact +performance. Thus, small data sets or `Spectra` with in-memory backends +will generally not benefit from this type of processing. For computationally +intense operation on the other hand, chunk-wise processing has the advantage, +that chunks can (and will) be processed in parallel (depending on the parallel +processing setup). + +Note that this chunk-wise processing only affects functions that involve actual +peak data. Subset operations that only reduce the number of spectra (such as +`filterRt` or `[`) bypass this mechanism and are applied immediately to the +data. + +For an evaluation of chunk-wise processing see also this +[issue](https://github.com/rformassspectrometry/Spectra/issues/304#issuecomment-1825699576) +on the *Spectra* github repository. + + +# Notes and suggestions for parallel or chunk-wise processing + +- Estimating memory usage in R tends to be difficult, but for MS data sets with + more than about 100 samples or whenever processing tends to take longer than + expected it is suggested to enable chunk-wise processing (if not already used, + as with `MsBackendMzR`). + +- `Spectra` uses the `r Biocpkg("BiocParallel")` package for parallel + processing. The parallel processing setup can be configured globally by + *registering* the preferred setup using the `register` function (e.g. + `register(SnowParam(4))` to use socket-based parallel processing on Windows + using 4 different R processes). Parallel processing can be disabled by setting + `register(SerialParam())`. + +- Chunk-wise processing will by default run in parallel, depending on the + configured parallel processing setup. + +- Parallel processing (and also chunk-wise processing) have a computational + overhead, because the data needs to be split and merged. Thus, for some + operations or data sets avoiding this mechanism can be more efficient + (e.g. for in-memory backends or small data sets). + + +# `Spectra` functions supporting or using parallel processing + +Some functions allow to configure parallel processing using a dedicated +parameter that allows to define how to split the data for parallel (or +chunk-wise) processing. These functions are: + +- `applyProcessing`: parameter `f` (defaults to `processingChunkFactor(object)`) + can be used to define how to split and process the data in parallel. +- `combineSpectra`: parameter `p` (defaults to `x$dataStorage`) defines how the + data should be split and processed in parallel. +- `estimatePrecursorIntensity`: parameter `f` (defaults to `dataOrigin(x)`) + defines the splitting and processing. This should represent the original data + files the spectra data derives from. +- `intensity`: parameter `f` (defaults to `processingChunkFactor(object)`) + defines if and how the data should be split for parallel processing. +- `mz`: parameter `f` (defaults to `processingChunkFactor(object)`) + defines if and how the data should be split for parallel processing. +- `peaksData`: parameter `f` (defaults to `processingChunkFactor(object)`) + defines if and how the data should be split for parallel processing. +- `setBackend`: parameter `f` (defaults to `processingChunkFactor(object)`) + defines if and how the data should be split for parallel processing. + +Functions that perform chunk-wise (parallel) processing *natively*, i.e., based +on the `processingChunkFactor`: + +- `containsMz`. +- `containsNeutralLoss`. +- `ionCount`. +- `isCentroided`. +- `isEmpty`. + + + +# Session information + +```{r si} +sessionInfo() +``` diff --git a/vignettes/Spectra.Rmd b/vignettes/Spectra.Rmd index 1065d665..0bf839e7 100644 --- a/vignettes/Spectra.Rmd +++ b/vignettes/Spectra.Rmd @@ -41,7 +41,9 @@ This vignette provides general examples and descriptions for the *Spectra* package. Additional information and tutorials are available, such as [SpectraTutorials](https://jorainer.github.io/SpectraTutorials/), [MetaboAnnotationTutorials](https://jorainer.github.io/MetaboAnnotationTutorials), -or also in [@rainer_modular_2022]. +or also in [@rainer_modular_2022]. For information on how to handle and +(parallel) process large-scale data sets see the *Large-scale data handling and +processing with Spectra* vignette. # Installation @@ -956,7 +958,7 @@ them based on similar intensity values. Spectra binning ensures that the binned m/z values are comparable across all spectra. Below we bin our spectra using a bin size of 0.1 (i.e. all peaks with an m/z smaller than 0.1 are aggregated into one binned peak. Below, we explicitly set `zero.rm = FALSE` to retain all bins -generated by the function, including those with an intensity of zero. +generated by the function, including those with an intensity of zero. ```{r} sps_bin <- Spectra::bin(sps, binSize = 0.1, zero.rm = FALSE) @@ -1101,38 +1103,6 @@ head(dataStorage(sps_sciex)) head(basename(dataOrigin(sps_sciex))) ``` -## Parallel processing notes - -Most functions on `Spectra` support (and use) parallel processing out of the -box. Peak data access and manipulation methods perform by default parallel -processing on a per-file basis (i.e. using the `dataStorage` variable as -splitting factor). `Spectra` uses `r Biocpkg("BiocParallel")` for -parallel processing and all functions use the default registered parallel -processing setup of that package (see `?bpparam()` for more information). - -Note however that some backends (e.g. those using connections to SQL databases -such as `MsBackendMassbank`, `MsBackendSql`) do not support parallel processing -because the database connection can not be shared across processes. The -`backendBpparam` function allows to evaluate whether a `Spectra` object (or in -fact its used `MsBackend` class) supports a certain parallel processing setup. -Calling `backendBpparam` on a `Spectra` with a backend that does not support -parallel processing would (should) return `SerialParam()`, always. Below we -create an empty `Spectra` object and test whether that supports parallel -processing. - -```{r} -tmp <- Spectra() -backendBpparam(tmp) -``` - -The backend (in this case a `MsBackendMemory` thus supports parallel processing. -We can also specifically test this for individual backends (and parallel -setups): - -```{r} -backendBpparam(MsBackendDataFrame(), SnowParam(2)) -``` - # Backends