From 18a8b28d48b46cd3683b6eb6f9f5249eccd9a7e7 Mon Sep 17 00:00:00 2001 From: wlandau Date: Wed, 29 Mar 2023 12:25:53 -0400 Subject: [PATCH] Fix #51 --- NEWS.md | 3 +- R/crew_launcher.R | 116 +++++++++++++++------------- man/crew_class_launcher.Rd | 16 ++-- man/crew_class_launcher_callr.Rd | 2 +- tests/testthat/test-crew_launcher.R | 6 +- 5 files changed, 75 insertions(+), 68 deletions(-) diff --git a/NEWS.md b/NEWS.md index eb210329..864dce81 100644 --- a/NEWS.md +++ b/NEWS.md @@ -5,8 +5,7 @@ * Remove the superfluous `clean()` controller method. * Clean up logic for `scale()` and `launch()` controller methods. * Add a new `inactive()` launcher method to help with the above. -* Reduce superfluous worker launches using cleaner controller logic (#51). -* Eliminate the remaining superfluous worker launches with a last-minute check for an existing connection in the launcher (#51). +* Eliminate superfluous worker launches: clean up the controller logic, and check worker discoveries before worker connections (#51). * Simplify the launcher plugin interface (#54). # crew 0.0.5 diff --git a/R/crew_launcher.R b/R/crew_launcher.R index 23699eb1..01aca331 100644 --- a/R/crew_launcher.R +++ b/R/crew_launcher.R @@ -61,17 +61,17 @@ #' crew_session_terminate() #' } crew_launcher <- function( - name = NULL, - seconds_launch = 30, - seconds_interval = 0.001, - seconds_timeout = 10, - seconds_idle = Inf, - seconds_wall = Inf, - seconds_exit = 0.1, - tasks_max = Inf, - tasks_timers = 0L, - async_dial = TRUE, - cleanup = FALSE + name = NULL, + seconds_launch = 30, + seconds_interval = 0.001, + seconds_timeout = 10, + seconds_idle = Inf, + seconds_wall = Inf, + seconds_exit = 0.1, + tasks_max = Inf, + tasks_timers = 0L, + async_dial = TRUE, + cleanup = FALSE ) { name <- as.character(name %|||% random_name()) launcher <- crew_class_launcher_callr$new( @@ -115,28 +115,24 @@ crew_class_launcher <- R6::R6Class( cloneable = FALSE, portable = TRUE, private = list( - which_active = function() { + which_launching = function() { bound <- self$seconds_launch start <- self$workers$start now <- nanonext::mclock() / 1000 - launching <- !is.na(start) & ((now - start) < bound) + !is.na(start) & ((now - start) < bound) + }, + which_active = function() { listeners <- self$workers$listener - listening <- map_lgl(listeners, connection_opened) - connected <- map_lgl(listeners, dialer_connected) - not_discovered <- !connected - i <- listening & not_discovered & launching - not_discovered[i] <- map_lgl(listeners[i], dialer_not_discovered) - listening & (connected | (launching & not_discovered)) + launching <- private$which_launching() + map_lgl( + seq_along(listeners), + ~is_active(listeners[[.x]], launching[.x]) + ) }, - which_unreachable = function() { - bound <- self$seconds_launch - start <- self$workers$start - now <- nanonext::mclock() / 1000 - not_launching <- is.na(start) | ((now - start) > bound) + which_lost = function() { listeners <- self$workers$listener - listening <- map_lgl(listeners, connection_opened) - not_discovered <- map_lgl(listeners, dialer_not_discovered) - listening & not_launching & not_discovered + launching <- private$which_launching() + map_lgl(seq_along(listeners), ~is_lost(listeners[[.x]], launching[.x])) } ), public = list( @@ -200,17 +196,17 @@ crew_class_launcher <- R6::R6Class( #' crew_session_terminate() #' } initialize = function( - name = NULL, - seconds_launch = NULL, - seconds_interval = NULL, - seconds_timeout = NULL, - seconds_idle = NULL, - seconds_wall = NULL, - seconds_exit = NULL, - tasks_max = NULL, - tasks_timers = NULL, - async_dial = NULL, - cleanup = NULL + name = NULL, + seconds_launch = NULL, + seconds_interval = NULL, + seconds_timeout = NULL, + seconds_idle = NULL, + seconds_wall = NULL, + seconds_exit = NULL, + tasks_max = NULL, + tasks_timers = NULL, + async_dial = NULL, + cleanup = NULL ) { self$name <- name self$seconds_launch <- seconds_launch @@ -345,18 +341,18 @@ crew_class_launcher <- R6::R6Class( inactive = function() { as.character(self$workers$socket[!private$which_active()]) }, - #' @description Get the unreachable workers. - #' @details A worker is unreachable if it was supposed to launch, + #' @description Get the lost workers. + #' @details A worker is lost if it was supposed to launch, #' but it never connected to the client, #' and its startup window elapsed. #' @return Character vector of worker websockets. - unreachable = function() { - as.character(self$workers$socket[private$which_unreachable()]) + lost = function() { + as.character(self$workers$socket[private$which_lost()]) }, - #' @description Terminate unreachable workers. + #' @description Terminate lost workers. #' @return `NULL` (invisibly) clean = function() { - self$terminate(sockets = self$unreachable()) + self$terminate(sockets = self$lost()) }, #' @description Launch one or more workers. #' @details If a worker is already assigned to a socket, @@ -387,16 +383,14 @@ crew_class_launcher <- R6::R6Class( token = token, name = self$name ) - if (!dialer_connected(self$workers$listener[[index]])) { - handle <- self$launch_worker( - call = call, - name = self$name, - token = token - ) - self$workers$listener[[index]] <- listener - self$workers$handle[[index]] <- handle - self$workers$launches[[index]] <- self$workers$launches[[index]] + 1L - } + handle <- self$launch_worker( + call = call, + name = self$name, + token = token + ) + self$workers$listener[[index]] <- listener + self$workers$handle[[index]] <- handle + self$workers$launches[[index]] <- self$workers$launches[[index]] + 1L } invisible() }, @@ -437,3 +431,17 @@ crew_class_launcher <- R6::R6Class( } ) ) + +is_active <- function(listener, launching) { + connection_opened(listener) && if_any( + dialer_discovered(listener), + dialer_connected(listener), + launching + ) +} + +is_lost <- function(listener, launching) { + connection_opened(listener) && + (!launching) && + dialer_not_discovered(listener) +} diff --git a/man/crew_class_launcher.Rd b/man/crew_class_launcher.Rd index 6b468e6f..23ed4a5f 100644 --- a/man/crew_class_launcher.Rd +++ b/man/crew_class_launcher.Rd @@ -99,7 +99,7 @@ Other launchers: \item \href{#method-crew_class_launcher-populate}{\code{crew_class_launcher$populate()}} \item \href{#method-crew_class_launcher-active}{\code{crew_class_launcher$active()}} \item \href{#method-crew_class_launcher-inactive}{\code{crew_class_launcher$inactive()}} -\item \href{#method-crew_class_launcher-unreachable}{\code{crew_class_launcher$unreachable()}} +\item \href{#method-crew_class_launcher-lost}{\code{crew_class_launcher$lost()}} \item \href{#method-crew_class_launcher-clean}{\code{crew_class_launcher$clean()}} \item \href{#method-crew_class_launcher-launch}{\code{crew_class_launcher$launch()}} \item \href{#method-crew_class_launcher-terminate}{\code{crew_class_launcher$terminate()}} @@ -325,16 +325,16 @@ Character vector of worker websockets. } } \if{html}{\out{
}} -\if{html}{\out{}} -\if{latex}{\out{\hypertarget{method-crew_class_launcher-unreachable}{}}} -\subsection{Method \code{unreachable()}}{ -Get the unreachable workers. +\if{html}{\out{}} +\if{latex}{\out{\hypertarget{method-crew_class_launcher-lost}{}}} +\subsection{Method \code{lost()}}{ +Get the lost workers. \subsection{Usage}{ -\if{html}{\out{
}}\preformatted{crew_class_launcher$unreachable()}\if{html}{\out{
}} +\if{html}{\out{
}}\preformatted{crew_class_launcher$lost()}\if{html}{\out{
}} } \subsection{Details}{ -A worker is unreachable if it was supposed to launch, +A worker is lost if it was supposed to launch, but it never connected to the client, and its startup window elapsed. } @@ -347,7 +347,7 @@ Character vector of worker websockets. \if{html}{\out{}} \if{latex}{\out{\hypertarget{method-crew_class_launcher-clean}{}}} \subsection{Method \code{clean()}}{ -Terminate unreachable workers. +Terminate lost workers. \subsection{Usage}{ \if{html}{\out{
}}\preformatted{crew_class_launcher$clean()}\if{html}{\out{
}} } diff --git a/man/crew_class_launcher_callr.Rd b/man/crew_class_launcher_callr.Rd index 080cbcfb..b1cadaf2 100644 --- a/man/crew_class_launcher_callr.Rd +++ b/man/crew_class_launcher_callr.Rd @@ -50,10 +50,10 @@ Other launchers:
  • crew::crew_class_launcher$inactive()
  • crew::crew_class_launcher$initialize()
  • crew::crew_class_launcher$launch()
  • +
  • crew::crew_class_launcher$lost()
  • crew::crew_class_launcher$populate()
  • crew::crew_class_launcher$settings()
  • crew::crew_class_launcher$terminate()
  • -
  • crew::crew_class_launcher$unreachable()
  • crew::crew_class_launcher$validate()
  • diff --git a/tests/testthat/test-crew_launcher.R b/tests/testthat/test-crew_launcher.R index f71af4d3..bb230154 100644 --- a/tests/testthat/test-crew_launcher.R +++ b/tests/testthat/test-crew_launcher.R @@ -98,7 +98,7 @@ crew_test("launcher populate()", { expect_equal(workers$handle, list(crew_null, crew_null)) }) -crew_test("launcher active(), inactive(), and unreachable()", { +crew_test("launcher active(), inactive(), and lost()", { skip_on_cran() launcher <- crew_class_launcher$new(seconds_launch = 1) port_mirai <- free_port() @@ -111,7 +111,7 @@ crew_test("launcher active(), inactive(), and unreachable()", { dialers <- list() expect_equal(launcher$active(), character(0L)) expect_equal(launcher$inactive(), launcher$workers$socket) - expect_equal(launcher$unreachable(), character(0L)) + expect_equal(launcher$lost(), character(0L)) for (index in seq_len(9L)) { token <- launcher$workers$token[index] listener <- connection_listen( @@ -157,7 +157,7 @@ crew_test("launcher active(), inactive(), and unreachable()", { ) crew_wait( ~identical( - sort(as.character(launcher$unreachable())), + sort(as.character(launcher$lost())), sort(sprintf("ws://127.0.0.1:%s/%s", port_mirai, c(1L, 2L))) ), seconds_interval = 0.001,