Skip to content

Set max consecutive futile launches #102

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* Suppress interactive browser on Windows which launched on each worker previously (@psychelzh).
* Migrate to the new host/daemon nomenclature in `mirai` 0.9.1 (#96).
* Suppress `status()` retries (@shikokuchuo, #100).
* Implement `launch_max` to error out if workers repeatedly launch without completing any tasks (#101, @shikokuchuo, @multimeric).

# crew 0.4.0

Expand Down
6 changes: 4 additions & 2 deletions R/crew_controller_local.R
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ crew_controller_local <- function(
reset_globals = TRUE,
reset_packages = FALSE,
reset_options = FALSE,
garbage_collection = FALSE
garbage_collection = FALSE,
launch_max = 5L
) {
client <- crew_client(
name = name,
Expand All @@ -57,7 +58,8 @@ crew_controller_local <- function(
reset_globals = reset_globals,
reset_packages = reset_packages,
reset_options = reset_options,
garbage_collection = garbage_collection
garbage_collection = garbage_collection,
launch_max = launch_max
)
controller <- crew_controller(client = client, launcher = launcher)
controller$validate()
Expand Down
44 changes: 41 additions & 3 deletions R/crew_launcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@
#' because packages sometimes rely on options they set at loading time.
#' @param garbage_collection `TRUE` to run garbage collection between
#' tasks, `FALSE` to skip.
#' @param launch_max Positive integer of length 1, maximum allowed
#' consecutive launch attempts which do not complete any tasks.
#' Enforced on a worker-by-worker basis.
#' The futile launch count resets to back 0
#' for each worker that completes a task.
#' It is recommended to set `launch_max` above 0
#' because sometimes workers are unproductive under perfectly ordinary
#' circumstances. But `launch_max` should still be small enough
#' to detect errors in the underlying platform.
#' @examples
#' if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
#' client <- crew_client()
Expand All @@ -71,7 +80,8 @@ crew_launcher <- function(
reset_globals = TRUE,
reset_packages = FALSE,
reset_options = FALSE,
garbage_collection = FALSE
garbage_collection = FALSE,
launch_max = 5L
) {
name <- as.character(name %|||% crew_random_name())
crew_class_launcher$new(
Expand Down Expand Up @@ -138,6 +148,8 @@ crew_class_launcher <- R6::R6Class(
reset_options = NULL,
#' @field garbage_collection See [crew_launcher()].
garbage_collection = NULL,
#' @field launch_max See [crew_launcher()].
launch_max = NULL,
#' @field until Numeric of length 1, time point when throttled unlocks.
until = NULL,
#' @description Launcher constructor.
Expand All @@ -154,6 +166,7 @@ crew_class_launcher <- R6::R6Class(
#' @param reset_packages See [crew_launcher()].
#' @param reset_options See [crew_launcher()].
#' @param garbage_collection See [crew_launcher()].
#' @param launch_max See [crew_launcher()].
#' @examples
#' if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
#' client <- crew_client()
Expand All @@ -178,7 +191,8 @@ crew_class_launcher <- R6::R6Class(
reset_globals = NULL,
reset_packages = NULL,
reset_options = NULL,
garbage_collection = NULL
garbage_collection = NULL,
launch_max = NULL
) {
self$name <- name
self$seconds_interval <- seconds_interval
Expand All @@ -192,6 +206,7 @@ crew_class_launcher <- R6::R6Class(
self$reset_packages <- reset_packages
self$reset_options <- reset_options
self$garbage_collection <- garbage_collection
self$launch_max <- launch_max
},
#' @description Validate the launcher.
#' @return `NULL` (invisibly).
Expand Down Expand Up @@ -232,7 +247,8 @@ crew_class_launcher <- R6::R6Class(
"seconds_wall",
"seconds_exit",
"tasks_max",
"tasks_timers"
"tasks_timers",
"launch_max"
)
for (field in fields) {
crew_assert(
Expand All @@ -259,7 +275,9 @@ crew_class_launcher <- R6::R6Class(
"socket",
"start",
"launches",
"futile",
"launched",
"history",
"assigned",
"complete"
)
Expand Down Expand Up @@ -341,7 +359,9 @@ crew_class_launcher <- R6::R6Class(
socket = sockets,
start = rep(NA_real_, n),
launches = rep(0L, n),
futile = rep(0L, n),
launched = rep(FALSE, n),
history = rep(0L, n),
assigned = rep(0L, n),
complete = rep(0L, n)
)
Expand Down Expand Up @@ -466,6 +486,22 @@ crew_class_launcher <- R6::R6Class(
worker = index,
instance = instance
)
history <- self$workers$history[index]
complete <- self$workers$complete[index]
futile <- self$workers$futile[index]
futile <- if_any(complete > history, 0L, futile + 1L)
crew_assert(
futile <= self$launch_max,
message = paste(
"{crew} worker",
index,
"launched",
self$launch_max,
"times in a row without completing any tasks. Either raise",
"launch_max or troubleshoot your platform to figure out",
"why {crew} workers are not launching or connecting."
)
)
handle <- self$launch_worker(
call = as.character(call),
name = as.character(name),
Expand All @@ -477,7 +513,9 @@ crew_class_launcher <- R6::R6Class(
self$workers$socket[index] <- socket
self$workers$start[index] <- nanonext::mclock() / 1000
self$workers$launches[index] <- self$workers$launches[index] + 1L
self$workers$futile[index] <- futile
self$workers$launched[index] <- TRUE
self$workers$history[index] <- complete
invisible()
},
#' @description Throttle repeated calls.
Expand Down
6 changes: 4 additions & 2 deletions R/crew_launcher_local.R
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ crew_launcher_local <- function(
reset_globals = TRUE,
reset_packages = FALSE,
reset_options = FALSE,
garbage_collection = FALSE
garbage_collection = FALSE,
launch_max = 5L
) {
name <- as.character(name %|||% crew_random_name())
launcher <- crew_class_launcher_local$new(
Expand All @@ -42,7 +43,8 @@ crew_launcher_local <- function(
reset_globals = reset_globals,
reset_packages = reset_packages,
reset_options = reset_options,
garbage_collection = garbage_collection
garbage_collection = garbage_collection,
launch_max = launch_max
)
launcher$validate()
launcher
Expand Down
7 changes: 6 additions & 1 deletion man/crew_class_launcher.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 12 additions & 1 deletion man/crew_controller_local.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 12 additions & 1 deletion man/crew_launcher.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 12 additions & 1 deletion man/crew_launcher_local.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 38 additions & 0 deletions tests/interactive/test-launch_max.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Run manually and watch htop.
# Pause between launches to allow workers to idle out.
library(crew)
library(testthat)
x <- crew_controller_local(
workers = 1L,
seconds_idle = 1e-9,
launch_max = 3L
)
x$start()
expect_equal(x$launcher$workers$futile, 0L)
x$launch(n = 1L)
# Pause until worker idles out.
expect_equal(x$launcher$workers$futile, 1L)
x$launch(n = 1L)
# Pause until worker idles out.
expect_equal(x$launcher$workers$futile, 2L)
x$launcher$seconds_idle <- Inf
x$push(TRUE)
x$wait()
expect_equal(x$launcher$workers$futile, 3L)
x$launcher$terminate(1L)
# Pause until worker exits.
x$launcher$seconds_idle <- 1e-9
x$launch(n = 1L)
# Pause until worker exits.
expect_equal(x$launcher$workers$futile, 0L)
x$launch(n = 1L)
# Pause until worker exits.
expect_equal(x$launcher$workers$futile, 1L)
x$launch(n = 1L)
# Pause until worker exits.
expect_equal(x$launcher$workers$futile, 2L)
x$launch(n = 1L)
# Pause until worker exits.
expect_equal(x$launcher$workers$futile, 3L)
expect_error(x$launch(n = 1L), class = "crew_error")
x$terminate()
6 changes: 4 additions & 2 deletions tests/launchers/test-launcher-system2.R
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ crew_controller_system2 <- function(
reset_globals = TRUE,
reset_packages = FALSE,
reset_options = FALSE,
garbage_collection = FALSE
garbage_collection = FALSE,
launch_max = 5L
) {
client <- crew::crew_client(
name = name,
Expand All @@ -57,7 +58,8 @@ crew_controller_system2 <- function(
reset_globals = reset_globals,
reset_packages = reset_packages,
reset_options = reset_options,
garbage_collection = garbage_collection
garbage_collection = garbage_collection,
launch_max = launch_max
)
controller <- crew::crew_controller(client = client, launcher = launcher)
controller$validate()
Expand Down
8 changes: 6 additions & 2 deletions tests/testthat/test-crew_launcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,9 @@ crew_test("launcher start()", {
"socket",
"start",
"launches",
"futile",
"launched",
"history",
"assigned",
"complete"
)
Expand Down Expand Up @@ -298,7 +300,8 @@ crew_test("custom launcher", {
reset_globals = TRUE,
reset_packages = FALSE,
reset_options = FALSE,
garbage_collection = FALSE
garbage_collection = FALSE,
launch_max = 5L
) {
client <- crew::crew_client(
name = name,
Expand All @@ -322,7 +325,8 @@ crew_test("custom launcher", {
reset_globals = reset_globals,
reset_packages = reset_packages,
reset_options = reset_options,
garbage_collection = garbage_collection
garbage_collection = garbage_collection,
launch_max = launch_max
)
controller <- crew::crew_controller(
client = client,
Expand Down
8 changes: 5 additions & 3 deletions vignettes/plugins.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ Every `launch_worker()` method must accept arguments `call`, `launcher`, `worker

To see what the `call` object looks like, create a new launcher and run the `call()` method.

```{r, eval = TRUE}
```{r, eval = TRUE, message = FALSE}
library(crew)
launcher <- crew_launcher_local()
launcher$call(
Expand Down Expand Up @@ -135,7 +135,8 @@ crew_controller_custom <- function(
reset_globals = TRUE,
reset_packages = FALSE,
reset_options = FALSE,
garbage_collection = FALSE
garbage_collection = FALSE,
launch_max = 5L
) {
client <- crew::crew_client(
name = name,
Expand All @@ -159,7 +160,8 @@ crew_controller_custom <- function(
reset_globals = reset_globals,
reset_packages = reset_packages,
reset_options = reset_options,
garbage_collection = garbage_collection
garbage_collection = garbage_collection,
launch_max = launch_max
)
controller <- crew::crew_controller(client = client, launcher = launcher)
controller$validate()
Expand Down