Skip to content

Commit

Permalink
All parallel backends now prevent nested parallelization, unless expl…
Browse files Browse the repository at this point in the history
…icitly allowed.

This allowed us to drop getExpression() for MultisessionFuture, and argument
'mc.cores' from getExpression() for MulticoreFuture.
  • Loading branch information
HenrikBengtsson committed Jan 2, 2025
1 parent 6d8b2af commit a953a2e
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 27 deletions.
1 change: 0 additions & 1 deletion NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ S3method(futures,listenv)
S3method(getExpression,ClusterFuture)
S3method(getExpression,Future)
S3method(getExpression,MulticoreFuture)
S3method(getExpression,MultisessionFuture)
S3method(getExpression,UniprocessFuture)
S3method(globals,Future)
S3method(journal,Future)
Expand Down
8 changes: 7 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
# Version (development version)

* ...
## New Features

* All parallel backends now prevent nested parallelization, unless
explicitly allowed, e.g. settings recognized by
`parallelly::availableCores()` or set by the future
`plan()`. Previously, this had to be implemented by each backend,
but now it's handled automatically by the future framework.


# Version 1.34.0 [2024-07-29]
Expand Down
10 changes: 7 additions & 3 deletions R/MulticoreFuture-class.R
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ run.MulticoreFuture <- function(future, ...) {
FutureRegistry(reg, action = "add", future = future, earlySignal = TRUE)

future.args <- list(expr)
job <- do.call(parallel::mcparallel, args = future.args, envir = envir)
job <- local({
oopts <- options(mc.cores = NULL)
on.exit(options(oopts))
do.call(parallel::mcparallel, args = future.args, envir = envir)
})

future$job <- job
future$state <- "running"
Expand Down Expand Up @@ -299,7 +303,7 @@ result.MulticoreFuture <- function(future, ...) {

#' @export
getExpression.MulticoreFuture <- local({
function(future, expr = future$expr, mc.cores = 1L, immediateConditions = TRUE, ...) {
function(future, expr = future$expr, immediateConditions = TRUE, ...) {
## Assert that no arguments but the first is passed by position
assert_no_positional_args_but_first()

Expand Down Expand Up @@ -329,6 +333,6 @@ getExpression.MulticoreFuture <- local({
}
} ## if (resignalImmediateConditions && immediateConditions)

NextMethod(expr = expr, mc.cores = mc.cores, immediateConditions = immediateConditions, threads = threads)
NextMethod(expr = expr, immediateConditions = immediateConditions, threads = threads)
}
})
22 changes: 0 additions & 22 deletions R/MultisessionFuture-class.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,3 @@ MultisessionFuture <- function(expr = NULL, substitute = TRUE, envir = parent.fr
future <- structure(future, class = c("MultisessionFuture", class(future)))
future
}


#' @export
getExpression.MultisessionFuture <- function(future, mc.cores = 1L, ...) {
## NOTE: In order to override the default 'mc.cores = NULL' of
## getExpression.Future(), we have to pass it as a named argument to
## NextMethod(). If not done, that is, if we just call NextMethod(), then
## 'mc.cores' will resolve to the default (= NULL). If we don't name the
## argument - NextMethod("getExpression", mc.cores) - then the default
## will still be NULL.
## The problem with using NextMethod(mc.cores = mc.cores) is that if we
## call getExpression(f, 2L) instead of getExpression(f, mc.cores = 2L),
## then the call will become getExpression.Future(f, 2L, mc.cores = 2L).
## I don't think there is a solution here, except to enforce that all
## arguments to a method that uses NextMethod() must be named.
## See also https://github.com/HenrikBengtsson/Wishlist-for-R/issues/44
## /HB 2018-06-13

## Assert that no arguments but the first is passed by position
assert_no_positional_args_but_first()
NextMethod(mc.cores = mc.cores)
}
34 changes: 34 additions & 0 deletions R/expressions.R
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,43 @@ evalFuture <- function(
}
}

## Limit nested parallelization
## (a) Identify default number of cores - ignoring plan settings
...future.ncores <- base::local({
ans <- NA_integer_

base::options(parallelly.availableCores.fallback = 1L)
ncores <- parallelly::availableCores(which = "all")
ncores <- ncores[ncores != ncores["system"]]
ncores <- ncores[base::setdiff(base::names(ncores), base::c("_R_CHECK_LIMIT_CORES_", "Bioconductor"))]
if (base::length(ncores) > 0) {
if (base::length(ncores) > 1) {
ncores <- ncores[base::setdiff(base::names(ncores), "fallback")]
}
if (base::length(ncores) > 0) {
ans <- base::min(ncores, na.rm = TRUE)
}
}
ans
})

## Use the next-level-down ("popped") future strategy
future::plan(strategiesR, .cleanup = FALSE, .init = FALSE)

if (!is.na(...future.ncores)) {
## (b) Identify default number of cores - acknowledging plan settings
...future.ncores <- base::local({
nworkers <- future::nbrOfWorkers()
base::min(base::c(nworkers, ...future.ncores), na.rm = TRUE)
})
}

if (!is.na(...future.ncores)) {
...future.options.ncores <- base::options(mc.cores = ...future.ncores)
base::on.exit(base::options(...future.options.ncores), add = TRUE)
}


## Set RNG seed?
if (is.numeric(seed)) {
genv <- globalenv()
Expand Down

0 comments on commit a953a2e

Please sign in to comment.