Skip to content

Commit

Permalink
Fix #51
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau-lilly committed Mar 29, 2023
1 parent 5a1f823 commit 18a8b28
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 68 deletions.
3 changes: 1 addition & 2 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
* Remove the superfluous `clean()` controller method.
* Clean up logic for `scale()` and `launch()` controller methods.
* Add a new `inactive()` launcher method to help with the above.
* Reduce superfluous worker launches using cleaner controller logic (#51).
* Eliminate the remaining superfluous worker launches with a last-minute check for an existing connection in the launcher (#51).
* Eliminate superfluous worker launches: clean up the controller logic, and check worker discoveries before worker connections (#51).
* Simplify the launcher plugin interface (#54).

# crew 0.0.5
Expand Down
116 changes: 62 additions & 54 deletions R/crew_launcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,17 @@
#' crew_session_terminate()
#' }
crew_launcher <- function(
name = NULL,
seconds_launch = 30,
seconds_interval = 0.001,
seconds_timeout = 10,
seconds_idle = Inf,
seconds_wall = Inf,
seconds_exit = 0.1,
tasks_max = Inf,
tasks_timers = 0L,
async_dial = TRUE,
cleanup = FALSE
name = NULL,
seconds_launch = 30,
seconds_interval = 0.001,
seconds_timeout = 10,
seconds_idle = Inf,
seconds_wall = Inf,
seconds_exit = 0.1,
tasks_max = Inf,
tasks_timers = 0L,
async_dial = TRUE,
cleanup = FALSE
) {
name <- as.character(name %|||% random_name())
launcher <- crew_class_launcher_callr$new(
Expand Down Expand Up @@ -115,28 +115,24 @@ crew_class_launcher <- R6::R6Class(
cloneable = FALSE,
portable = TRUE,
private = list(
which_active = function() {
which_launching = function() {
bound <- self$seconds_launch
start <- self$workers$start
now <- nanonext::mclock() / 1000
launching <- !is.na(start) & ((now - start) < bound)
!is.na(start) & ((now - start) < bound)
},
which_active = function() {
listeners <- self$workers$listener
listening <- map_lgl(listeners, connection_opened)
connected <- map_lgl(listeners, dialer_connected)
not_discovered <- !connected
i <- listening & not_discovered & launching
not_discovered[i] <- map_lgl(listeners[i], dialer_not_discovered)
listening & (connected | (launching & not_discovered))
launching <- private$which_launching()
map_lgl(
seq_along(listeners),
~is_active(listeners[[.x]], launching[.x])
)
},
which_unreachable = function() {
bound <- self$seconds_launch
start <- self$workers$start
now <- nanonext::mclock() / 1000
not_launching <- is.na(start) | ((now - start) > bound)
which_lost = function() {
listeners <- self$workers$listener
listening <- map_lgl(listeners, connection_opened)
not_discovered <- map_lgl(listeners, dialer_not_discovered)
listening & not_launching & not_discovered
launching <- private$which_launching()
map_lgl(seq_along(listeners), ~is_lost(listeners[[.x]], launching[.x]))
}
),
public = list(
Expand Down Expand Up @@ -200,17 +196,17 @@ crew_class_launcher <- R6::R6Class(
#' crew_session_terminate()
#' }
initialize = function(
name = NULL,
seconds_launch = NULL,
seconds_interval = NULL,
seconds_timeout = NULL,
seconds_idle = NULL,
seconds_wall = NULL,
seconds_exit = NULL,
tasks_max = NULL,
tasks_timers = NULL,
async_dial = NULL,
cleanup = NULL
name = NULL,
seconds_launch = NULL,
seconds_interval = NULL,
seconds_timeout = NULL,
seconds_idle = NULL,
seconds_wall = NULL,
seconds_exit = NULL,
tasks_max = NULL,
tasks_timers = NULL,
async_dial = NULL,
cleanup = NULL
) {
self$name <- name
self$seconds_launch <- seconds_launch
Expand Down Expand Up @@ -345,18 +341,18 @@ crew_class_launcher <- R6::R6Class(
inactive = function() {
as.character(self$workers$socket[!private$which_active()])
},
#' @description Get the unreachable workers.
#' @details A worker is unreachable if it was supposed to launch,
#' @description Get the lost workers.
#' @details A worker is lost if it was supposed to launch,
#' but it never connected to the client,
#' and its startup window elapsed.
#' @return Character vector of worker websockets.
unreachable = function() {
as.character(self$workers$socket[private$which_unreachable()])
lost = function() {
as.character(self$workers$socket[private$which_lost()])
},
#' @description Terminate unreachable workers.
#' @description Terminate lost workers.
#' @return `NULL` (invisibly)
clean = function() {
self$terminate(sockets = self$unreachable())
self$terminate(sockets = self$lost())
},
#' @description Launch one or more workers.
#' @details If a worker is already assigned to a socket,
Expand Down Expand Up @@ -387,16 +383,14 @@ crew_class_launcher <- R6::R6Class(
token = token,
name = self$name
)
if (!dialer_connected(self$workers$listener[[index]])) {
handle <- self$launch_worker(
call = call,
name = self$name,
token = token
)
self$workers$listener[[index]] <- listener
self$workers$handle[[index]] <- handle
self$workers$launches[[index]] <- self$workers$launches[[index]] + 1L
}
handle <- self$launch_worker(
call = call,
name = self$name,
token = token
)
self$workers$listener[[index]] <- listener
self$workers$handle[[index]] <- handle
self$workers$launches[[index]] <- self$workers$launches[[index]] + 1L
}
invisible()
},
Expand Down Expand Up @@ -437,3 +431,17 @@ crew_class_launcher <- R6::R6Class(
}
)
)

is_active <- function(listener, launching) {
connection_opened(listener) && if_any(
dialer_discovered(listener),
dialer_connected(listener),
launching
)
}

is_lost <- function(listener, launching) {
connection_opened(listener) &&
(!launching) &&
dialer_not_discovered(listener)
}
16 changes: 8 additions & 8 deletions man/crew_class_launcher.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion man/crew_class_launcher_callr.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions tests/testthat/test-crew_launcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ crew_test("launcher populate()", {
expect_equal(workers$handle, list(crew_null, crew_null))
})

crew_test("launcher active(), inactive(), and unreachable()", {
crew_test("launcher active(), inactive(), and lost()", {
skip_on_cran()
launcher <- crew_class_launcher$new(seconds_launch = 1)
port_mirai <- free_port()
Expand All @@ -111,7 +111,7 @@ crew_test("launcher active(), inactive(), and unreachable()", {
dialers <- list()
expect_equal(launcher$active(), character(0L))
expect_equal(launcher$inactive(), launcher$workers$socket)
expect_equal(launcher$unreachable(), character(0L))
expect_equal(launcher$lost(), character(0L))
for (index in seq_len(9L)) {
token <- launcher$workers$token[index]
listener <- connection_listen(
Expand Down Expand Up @@ -157,7 +157,7 @@ crew_test("launcher active(), inactive(), and unreachable()", {
)
crew_wait(
~identical(
sort(as.character(launcher$unreachable())),
sort(as.character(launcher$lost())),
sort(sprintf("ws://127.0.0.1:%s/%s", port_mirai, c(1L, 2L)))
),
seconds_interval = 0.001,
Expand Down

0 comments on commit 18a8b28

Please sign in to comment.