diff --git a/DESCRIPTION b/DESCRIPTION index 4b3ca559b..824b48356 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: mirai Type: Package Title: Minimalist Async Evaluation Framework for R -Version: 0.12.0.9007 +Version: 0.12.0.9008 Description: Lightweight parallel code execution and distributed computing. Designed for simplicity, a 'mirai' evaluates an R expression asynchronously, on local or network resources, resolving automatically upon completion. diff --git a/NEWS.md b/NEWS.md index 2cb59875f..a50d30d08 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,6 +1,6 @@ -# mirai 0.12.0.9007 (development) +# mirai 0.12.0.9008 (development) -* Dispatcher sync (and status) timeouts widened to 10s to allow for launching large numbers of daemons. +* Dispatcher initial sync timeout widened to 10s to allow for launching large numbers of daemons. * Default for `ssh_config()` argument 'timeout' widened to 10 (seconds). * Fixes `daemons()` specifying 'output = FALSE' registering as TRUE instead. * Fixes use of `everywhere()` specifying '.args' as an unnamed list or '.expr' as a language object. diff --git a/R/daemons.R b/R/daemons.R index 500445fe0..6365903ff 100644 --- a/R/daemons.R +++ b/R/daemons.R @@ -286,7 +286,7 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = TRUE, ..., urld <- local_url() urlc <- strcat(urld, "c") sock <- req_socket(urld, resend = 0L) - sockc <- req_socket(urlc, resend = 0L) + sockc <- socket(protocol = "pair", listen = urlc) launch_and_sync_daemon(sock, wa5(urld, dots, n, urlc, url), output, tls, pass) || stop(._[["sync_timeout"]]) init_monitor(sockc = sockc, envir = envir) || stop(._[["sync_timeout"]]) } else { @@ -329,7 +329,7 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = TRUE, ..., if (dispatcher) { sock <- req_socket(urld, resend = 0L) urlc <- strcat(urld, "c") - sockc <- req_socket(urlc, resend = 0L) + sockc <- socket(protocol = "pair", listen = urlc) launch_and_sync_daemon(sock, wa4(urld, dots, envir[["stream"]], n, urlc), output) || stop(._[["sync_timeout"]]) for (i in seq_len(n)) next_stream(envir) init_monitor(sockc = sockc, envir = envir) || stop(._[["sync_timeout"]]) @@ -551,11 +551,11 @@ launch_and_sync_daemon <- function(sock, args, output, tls = NULL, pass = NULL) } } launch_daemon(args, output) - until(cv, .timelimit) + until(cv, .limit_long) } init_monitor <- function(sockc, envir) { - res <- query_dispatcher(sockc, command = FALSE, mode = 2L) + res <- query_dispatcher(sockc, command = FALSE, mode = 2L, block = .limit_long) valid <- !is.object(res) if (valid) `[[<-`(`[[<-`(`[[<-`(envir, "sockc", sockc), "urls", res[-1L]), "pid", as.integer(res[1L])) valid diff --git a/R/dispatcher.R b/R/dispatcher.R index a09e01373..f6cff78d0 100644 --- a/R/dispatcher.R +++ b/R/dispatcher.R @@ -139,11 +139,11 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., asyncdial = FALSE, ctrchannel <- is.character(monitor) if (ctrchannel) { - sockc <- socket(protocol = "rep") + sockc <- socket(protocol = "pair") on.exit(reap(sockc), add = TRUE, after = FALSE) pipe_notify(sockc, cv = cv, remove = TRUE, flag = TRUE) dial_and_sync_socket(sock = sockc, url = monitor, asyncdial = asyncdial) - recv(sockc, mode = 6L, block = .timelimit) && stop(._[["sync_timeout"]]) + recv(sockc, mode = 6L, block = .limit_long) && stop(._[["sync_timeout"]]) saio <- send_aio(sockc, c(Sys.getpid(), servernames), mode = 2L) cmessage <- recv_aio_signal(sockc, cv = cv, mode = 5L) } @@ -324,10 +324,9 @@ get_tls <- function(baseurl, tls, pass) { sub_real_port <- function(port, url) sub("(?<=:)0(?![^/])", port, url, perl = TRUE) -query_dispatcher <- function(sock, command, mode) { - send(sock, data = command, mode = 2L, block = .timelimit) - recv(sock, mode = mode, block = .timelimit) -} +query_dispatcher <- function(sock, command, mode, block = .limit_short) + if (r <- send(sock, data = command, mode = 2L, block = block)) r else + recv(sock, mode = mode, block = block) create_req <- function(ctx, cv) list(ctx = ctx, req = recv_aio_signal(ctx, cv = cv, mode = 8L)) diff --git a/R/mirai-package.R b/R/mirai-package.R index c824e8b67..5f94cb778 100644 --- a/R/mirai-package.R +++ b/R/mirai-package.R @@ -143,7 +143,8 @@ registerPromisesMethods <- function(pkgname, pkgpath) { ) .intmax <- .Machine[["integer.max"]] -.timelimit <- 10000L +.limit_short <- 5000L +.limit_long <- 10000L as.promise <- NULL recvData <- NULL