Skip to content

Commit

Permalink
move dispatcher control (monitor) sockets to pair protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Jan 31, 2024
1 parent f2739bb commit dcdc759
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 14 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: mirai
Type: Package
Title: Minimalist Async Evaluation Framework for R
Version: 0.12.0.9007
Version: 0.12.0.9008
Description: Lightweight parallel code execution and distributed computing.
Designed for simplicity, a 'mirai' evaluates an R expression asynchronously,
on local or network resources, resolving automatically upon completion.
Expand Down
4 changes: 2 additions & 2 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# mirai 0.12.0.9007 (development)
# mirai 0.12.0.9008 (development)

* Dispatcher sync (and status) timeouts widened to 10s to allow for launching large numbers of daemons.
* Dispatcher initial sync timeout widened to 10s to allow for launching large numbers of daemons.
* Default for `ssh_config()` argument 'timeout' widened to 10 (seconds).
* Fixes `daemons()` specifying 'output = FALSE' registering as TRUE instead.
* Fixes use of `everywhere()` specifying '.args' as an unnamed list or '.expr' as a language object.
Expand Down
8 changes: 4 additions & 4 deletions R/daemons.R
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = TRUE, ...,
urld <- local_url()
urlc <- strcat(urld, "c")
sock <- req_socket(urld, resend = 0L)
sockc <- req_socket(urlc, resend = 0L)
sockc <- socket(protocol = "pair", listen = urlc)
launch_and_sync_daemon(sock, wa5(urld, dots, n, urlc, url), output, tls, pass) || stop(._[["sync_timeout"]])
init_monitor(sockc = sockc, envir = envir) || stop(._[["sync_timeout"]])
} else {
Expand Down Expand Up @@ -329,7 +329,7 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = TRUE, ...,
if (dispatcher) {
sock <- req_socket(urld, resend = 0L)
urlc <- strcat(urld, "c")
sockc <- req_socket(urlc, resend = 0L)
sockc <- socket(protocol = "pair", listen = urlc)
launch_and_sync_daemon(sock, wa4(urld, dots, envir[["stream"]], n, urlc), output) || stop(._[["sync_timeout"]])
for (i in seq_len(n)) next_stream(envir)
init_monitor(sockc = sockc, envir = envir) || stop(._[["sync_timeout"]])
Expand Down Expand Up @@ -551,11 +551,11 @@ launch_and_sync_daemon <- function(sock, args, output, tls = NULL, pass = NULL)
}
}
launch_daemon(args, output)
until(cv, .timelimit)
until(cv, .limit_long)
}

init_monitor <- function(sockc, envir) {
res <- query_dispatcher(sockc, command = FALSE, mode = 2L)
res <- query_dispatcher(sockc, command = FALSE, mode = 2L, block = .limit_long)
valid <- !is.object(res)
if (valid) `[[<-`(`[[<-`(`[[<-`(envir, "sockc", sockc), "urls", res[-1L]), "pid", as.integer(res[1L]))
valid
Expand Down
11 changes: 5 additions & 6 deletions R/dispatcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,11 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., asyncdial = FALSE,

ctrchannel <- is.character(monitor)
if (ctrchannel) {
sockc <- socket(protocol = "rep")
sockc <- socket(protocol = "pair")
on.exit(reap(sockc), add = TRUE, after = FALSE)
pipe_notify(sockc, cv = cv, remove = TRUE, flag = TRUE)
dial_and_sync_socket(sock = sockc, url = monitor, asyncdial = asyncdial)
recv(sockc, mode = 6L, block = .timelimit) && stop(._[["sync_timeout"]])
recv(sockc, mode = 6L, block = .limit_long) && stop(._[["sync_timeout"]])
saio <- send_aio(sockc, c(Sys.getpid(), servernames), mode = 2L)
cmessage <- recv_aio_signal(sockc, cv = cv, mode = 5L)
}
Expand Down Expand Up @@ -324,10 +324,9 @@ get_tls <- function(baseurl, tls, pass) {

sub_real_port <- function(port, url) sub("(?<=:)0(?![^/])", port, url, perl = TRUE)

query_dispatcher <- function(sock, command, mode) {
send(sock, data = command, mode = 2L, block = .timelimit)
recv(sock, mode = mode, block = .timelimit)
}
query_dispatcher <- function(sock, command, mode, block = .limit_short)
if (r <- send(sock, data = command, mode = 2L, block = block)) r else
recv(sock, mode = mode, block = block)

create_req <- function(ctx, cv)
list(ctx = ctx, req = recv_aio_signal(ctx, cv = cv, mode = 8L))
3 changes: 2 additions & 1 deletion R/mirai-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ registerPromisesMethods <- function(pkgname, pkgpath) {
)

.intmax <- .Machine[["integer.max"]]
.timelimit <- 10000L
.limit_short <- 5000L
.limit_long <- 10000L

as.promise <- NULL
recvData <- NULL
Expand Down

0 comments on commit dcdc759

Please sign in to comment.