Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Superfluous launches in the case of fully transient workers #51

Closed
wlandau opened this issue Mar 21, 2023 Discussed in #50 · 32 comments
Closed

Superfluous launches in the case of fully transient workers #51

wlandau opened this issue Mar 21, 2023 Discussed in #50 · 32 comments

Comments

@wlandau
Copy link
Owner

wlandau commented Mar 21, 2023

Promoting #50 to an issue so it is more visible. See reprex below. I am not sure where the problem comes from. Things may improve with shikokuchuo/mirai#38, and they may improve we can figure out a way to use mirai's own sockets instead of custom bus sockets to detect when particular new instances of worker processes connect.

library(crew)
crew_session_start()
x <- crew_controller_callr(
  tasks_max = 1L,
  workers = 1L
)
x$start()
for (index in seq_len(100L)) {
  name <- paste0("task_", index)
  x$push(name = name, command = index, data = list(index = index))
  message(paste("push", name))
}
x$wait()
results <- NULL
while (!is.null(out <- x$pop(scale = FALSE))) {
  if (!is.null(out)) {
    results <- dplyr::bind_rows(results, out)
  }
}
length(unique(results$socket_session))
#> [1] 100
utils::stack(x$summary(-contains(c("controller", "seconds"))))
#> values              ind
#> 1  ws://10.0.0.9:54499    worker_socket
#> 2                FALSE worker_connected
#> 3                FALSE      worker_busy
#> 4                  114  worker_launches # It is not usually this high, but it is always a little over 100 (e.g. 105 in other test runs)
#> 5                  101 worker_instances
#> 6                    0   tasks_assigned
#> 7                    1   tasks_complete
#> 8                  100     popped_tasks
#> 9                    0    popped_errors
#> 10                   0  popped_warnings
x$terminate()
@wlandau
Copy link
Owner Author

wlandau commented Mar 21, 2023

To do: test in a temporary fork that uses mirai sockets instead of bus sockets to look for active workers. That may help isolate the problem in crew if the bug is on my end.

@wlandau
Copy link
Owner Author

wlandau commented Mar 23, 2023

Update: although other performance tests and benchmarks have improved immensely with mirai 0.8.1.9002 and nanonext 0.8.0.9001, this particular test is still showing too many launches for the amount of tasks completed by persistent workers.

@wlandau
Copy link
Owner Author

wlandau commented Mar 26, 2023

In https://github.com/wlandau/crew/compare/main..096e6727cf2715983e7cb347c05748ca103a31aa, I try switching to mirai::daemons() to monitor connectivity instead of my custom NNG sockets, and it doesn't appear to work.

@wlandau
Copy link
Owner Author

wlandau commented Mar 26, 2023

I did do some work to clean up the controller and launcher logic, now in branch main. I repeated the test above, and I still notice a couple superfluous worker launches. In addition, I occasionally see errors under results$error:

"'miraiError' chr Error in envir[[\".expr\"]]: subscript out of bounds"

Maybe I can reproduce the error with just mirai outside crew?

@wlandau
Copy link
Owner Author

wlandau commented Mar 26, 2023

Got a reprex, not of superfluous worker launches, but of the mirai errors:

library(mirai)
library(nanonext)
daemons(n = 1L, url = "ws://127.0.0.1:5000")
tasks <- lapply(seq_len(100L), function(x) {
  mirai(x, x = x)
})
results <- list()
px <- NULL
launches <- 0L
while(length(results) < 100L) {
  if (is.null(px) || !px$is_alive()) {
    px <- callr::r_bg(function() {
      mirai::server("ws://127.0.0.1:5000", maxtasks = 1L)
    })
    launches <- launches + 1L
  }
  done <- integer(0L)
  for (i in seq_along(tasks)) {
    if (!.unresolved(tasks[[i]])) {
      done <- c(done, i)
      results[[length(results) + 1L]] <- tasks[[i]]
    }
  }
  tasks[done] <- NULL
}
print(launches)
#> [1] 100
data <- as.character(lapply(results, function(x) x$data))
print(data)
#> [1] "1"                                                   
#> [2] "2"                                                   
#> [3] "3"                                                   
#> [4] "4"                                                   
#> [5] "5"                                                   
#> [6] "6"                                                   
#> [7] "7"                                                   
#> [8] "Error in envir[[\".expr\"]]: subscript out of bounds"
#> ...
sum(grepl("^Error", data))
#> [1] 24
daemons(0L)

@wlandau
Copy link
Owner Author

wlandau commented Mar 26, 2023

Just posted shikokuchuo/mirai#43 with the above reprex. After it's fixed, I will try the original reprex from this thread again and see how much closer to solved it is.

@wlandau
Copy link
Owner Author

wlandau commented Mar 27, 2023

Smaller version of the same test code is below. Seems to work on my Macbook.

library(mirai)
library(nanonext)
daemons(n = 1L, url = "ws://127.0.0.1:5000")
launches <- 0L
pids <- integer(0L)
while (length(pids) < 100L) {
  if (!exists("px") || !px$is_alive()) {
    px <- callr::r_bg(\() mirai::server("ws://127.0.0.1:5000", maxtasks = 1L))
    launches <- launches + 1L
  }
  if (!exists("m") || !.unresolved(m)) {
    if (exists("m")) pids <- c(pids, m$data)
    m <- mirai(ps::ps_pid())
  }
}
print(launches)
daemons(n = 0L)

@wlandau
Copy link
Owner Author

wlandau commented Mar 27, 2023

After the improvements I made yesterday, I have observed nearly all of the superfluous worker launches went away when I removed manual worker termination. (In tests/throughput/test-transient.R, there is one final worker which launches but completes no tasks, and I am not sure why or if it can be avoided.)

I think I may need to implement an exit delay similar to the exitlinger argument of mirai::server(). This can be fully asynchronous if I move exiting workers into a special queue and then manually terminate them only after a timeout is reached.

Or maybe I should only force-quit the workers that never connect within the startup window. Once a worker connects to a custom crew bus socket, we are reasonably sure it can connect to mirai as well, and then mirai can close it. But if R never starts, then the worker could be stuck in a crashed state and might need help exiting.

@wlandau
Copy link
Owner Author

wlandau commented Mar 27, 2023

@shikokuchuo, in these tests, I consistently see mirai servers with tasks_assigned = 0 and tasks_complete = 1. I did not think this would be possible because I thought all completed tasks would have had to be assigned first. I realize this is a challenging question without a reprex, but you know what might be happening?

@shikokuchuo
Copy link
Contributor

This is probably just a function of when the information snapshot is taken. I mean the important thing is that the work is actually being done, and I am confident from my own tests that it is. This is probably a next milestone thing: whether it makes sense to maintain a cumulative record at mirai of all server instances.

@wlandau
Copy link
Owner Author

wlandau commented Mar 27, 2023

Thanks for explaining.

I added some thoughts to #51 (comment):

Or maybe I should only force-quit the workers that never connect within the startup window. Once a worker connects to a custom crew bus socket, we are reasonably sure it can connect to mirai as well, and then mirai can close it. But if R never starts at all, then the worker could be stuck in a crashed state and might need help exiting.

It would really help with this to have an optional connection timeout in server(). Would this be feasible in mirai? That way, if server() starts and finds no client to connect to, it can exit after a short time without consuming unnecessary resources. In normal usage, crew will never launch a worker without a mirai client already running, but it is possible for the user to launch a worker and then terminate the whole controller before the worker starts.

@wlandau
Copy link
Owner Author

wlandau commented Mar 27, 2023

Alternatively, I would understand if you prefer to use the existing idle time for this. Just though a separate connection timeout could be shorter and add additional safety to completely persistent workers (which have idletime = Inf).

@wlandau
Copy link
Owner Author

wlandau commented Mar 27, 2023

Hmm...maybe I don't need an extra connection timeout in mirai::server(). I think I could actually implement one in crew::crew_worker() using the custom crew bus sockets you proposed in a previous thread. (I am still thankful for that critical workaround!) That would avoid starting mirai::server() in the first place unless crew is listening for the specific worker instance with the right token (previously UUID). I actually like this workaround more.

@shikokuchuo
Copy link
Contributor

OK. What you propose in terms of the timeout is certainly feasible. Currently everything seems very robust in terms of tests etc. so I would like to get a stable version of 'mirai' on to CRAN this week as soon as possible, and then start making changes from there. So you have a bit of time to think on it.

@wlandau
Copy link
Owner Author

wlandau commented Mar 27, 2023

Awesome! From my experiments, I am suspecting more and more that that the original issue in this thread comes from crew. There is still a mystery worker which does not seem to run or accept any tasks, but that's not as big a problem as the original.

@wlandau
Copy link
Owner Author

wlandau commented Mar 27, 2023

On second thought, I actually do think it would very much help to have a mirai::server() connection timeout. Otherwise I can foresee a race condition:

  1. mirai client starts.
  2. crew bus socket on the worker successfully connects.
  3. mirai client exits.
  4. crew worker calls mirai::server().

@wlandau
Copy link
Owner Author

wlandau commented Mar 28, 2023

I think I almost have this one solved now. Most of the superfluous launches are gone. There is just one dangling worker that seems to want to show up at the very end. I am not yet sure why, but this is now a simpler and less serious problem.

@wlandau
Copy link
Owner Author

wlandau commented Mar 28, 2023

This bug, if it even is a bug anymore, is becoming extremely hard to reproduce (a very good problem).

@wlandau
Copy link
Owner Author

wlandau commented Mar 28, 2023

Based on earlier hard-to-pin-down test failures, maybe this smaller case could help:

library(crew)
crew_session_start()
x <- crew_controller_callr(workers = 1L, tasks_max = 1L)
x$start()
x$push(ps::ps_pid())
x$wait(mode = "all")
pid_out <- x$pop(scale = FALSE)$result[[1]]
pid_exp <- x$launcher$workers$handle[[1]]$get_pid()
identical(pid_out, pid_exp)
x$terminate()
crew_session_terminate()

@wlandau
Copy link
Owner Author

wlandau commented Mar 28, 2023

And a controller group version:

library(crew)
crew_session_start()
for (i in seq_len(100)) {
  print(i)
  x <- crew_controller_group(
    crew_controller_callr(workers = 1L, tasks_max = 1L, name = "a"),
    crew_controller_callr(workers = 1L, tasks_max = 1L, name = "b")
  )
  x$start()
  x$push(ps::ps_pid(), controller = "b")
  x$wait(mode = "all")
  pid_out <- x$pop(scale = FALSE)$result[[1]]
  pid_exp <- x$controllers[["b"]]$launcher$workers$handle[[1]]$get_pid()
  stopifnot(identical(pid_out, pid_exp))
  x$terminate()
}
crew_session_terminate()

@wlandau
Copy link
Owner Author

wlandau commented Mar 28, 2023

It takes several iterations, but eventually I get a PID mismatch in the above test. When that happens, there is a worker still running with pid_exp, but there is no process remaining with pid_out. Even though the process with pid_out no longer exists, I am confident that pid_out once belonged to a valid worker because the task reports valid socket_data and socket_session fields which could have only been set inside crew_worker().

I think this means there was a slight mismatch between workers and tasks and the task got done anyway. At worst, it's an off-by-one error because crew does not micromanage the dispatch of tasks to mirai daemons.

@wlandau
Copy link
Owner Author

wlandau commented Mar 28, 2023

I wonder if I can reproduce this with just mirai, or if it has to do with something in crew.

@wlandau
Copy link
Owner Author

wlandau commented Mar 29, 2023

This appears to be working well:

library(callr)
library(crew) # for crew_wait()
library(mirai)
daemons(n = 1L, url = "ws://127.0.0.1:5000", .compute = "a")
daemons(n = 1L, url = "ws://127.0.0.1:5001", .compute = "b")
for (i in seq_len(100)) {
  print(i)
  m <- mirai(ps::ps_pid(), .compute = "b")
  px <- r_bg(\() mirai::server("ws://127.0.0.1:5001", maxtasks = 1L))
  crew_wait(~!unresolved(m), seconds_interval = 0.001, seconds_timeout = 5)
  stopifnot(identical(m$data, px$get_pid()))
}
daemons(n = 0L, .compute = "a")
daemons(n = 0L, .compute = "b")

@wlandau
Copy link
Owner Author

wlandau commented Mar 29, 2023

This example with just mirai seems to produce the correct number of launches.

library(callr)
library(mirai)
library(nanonext)
library(purrr)
daemons(n = 4L, url = "ws://127.0.0.1:5005")
urls <- rownames(daemons()$daemons)
tasks <- map(seq_len(200), ~mirai(ps::ps_pid()))
launches <- rep(0L, 4L)
workers <- as.list(rep(FALSE, 4L))
names(launches) <- urls
names(workers) <- urls
while ((pending <- sum(map_lgl(tasks, .unresolved))) > 0L) {
  online <- daemons()$daemons[, "status_online"]
  disconnected <- names(online)[online < 1L]
  relaunch <- head(disconnected, n = pending)
  for (url in relaunch) {
    w <- workers[[url]]
    elapsed <- 10
    if (!isFALSE(w)) {
      elapsed <- difftime(Sys.time(), w$get_start_time(), units = "secs")
    }
    if (isFALSE(w) || (!w$is_alive() && (elapsed > 5))) {
      px <- r_bg(\(url) mirai::server(url, maxtasks = 1L), args = list(url = url))
      workers[[url]] <- px
      launches[url] <- launches[url] + 1L
    }
  }
  Sys.sleep(0.001)
}
daemons(n = 0L)

@wlandau
Copy link
Owner Author

wlandau commented Mar 29, 2023

Successfully reproduced the mismatch without controller groups:

library(crew)
crew_session_start()
x <- crew_controller_callr(workers = 1L, tasks_max = 1L, name = "a")
x$start()
for (i in seq_len(100)) {
  print(i)
  x$push(ps::ps_pid(), scale = FALSE)
  x$wait(mode = "all")
  pid_out <- x$pop(scale = FALSE)$result[[1]]
  pid_exp <- x$launcher$workers$handle[[1]]$get_pid()
  stopifnot(identical(pid_out, pid_exp))
}
x$terminate()
crew_session_terminate()

@wlandau
Copy link
Owner Author

wlandau commented Mar 29, 2023

5a1f823 eliminates the remaining superfluous worker launches: right before a new launch, do a last-minute check to see if there is already a worker connected. I think this works because auto-scaling data may be out of date by the time the actual launch is reached.

@wlandau
Copy link
Owner Author

wlandau commented Mar 29, 2023

Or maybe that just masks the problem. The current which_active() checks "connected" status before it checks "discovered" status. Between those two checks, a worker could dial in.

@wlandau wlandau reopened this Mar 29, 2023
@shikokuchuo
Copy link
Contributor

shikokuchuo commented Mar 29, 2023

What is your latency here - is it because you are only polling occasionally or do you need faster updates than calling stat()? The latter can be addressed by registering pipe events (pipe_notify()) with a cv - you get instant real time updates.

Sorry this probably doesn't help, but this new feature is just so cool :)

@wlandau
Copy link
Owner Author

wlandau commented Mar 29, 2023

#51 itself does not need a faster alternative to stat(), but the prospect you describe does sound really exciting for performance! I am not sure I understand how to use condition variables myself, but I would be happy to take any suggestions for the utilities where listeners and dialers check each other:

crew/R/utils_nanonext.R

Lines 51 to 69 in 18a8b28

dialer_connected <- function(listener) {
connection_opened(listener) &&
nanonext::stat(listener$listener[[1]], "pipes") > 0L
}
dialer_discovered <- function(listener) {
connection_opened(listener) &&
nanonext::stat(listener$listener[[1]], "accept") > 0L
}
dialer_not_discovered <- function(listener) {
connection_closed(listener) ||
nanonext::stat(listener$listener[[1]], "accept") < 1L
}
listener_connected <- function(dialer) {
connection_opened(dialer) &&
nanonext::stat(dialer$dialer[[1]], "pipes") > 0L
}

For #51 itself, the underlying issue turned out to be a really tricky race condition in the crew auto-scaling logic (not a problem in mirai at all). The following scenario was happening:

  1. Worker is not connected to the bus socket.
  2. Client checks stat(connection$listener[[1]], "pipes") and observes no worker is connected.
  3. Worker connects.
  4. Client checks stat(connection$listener[[1]], "accept") and observes that a worker connected at some point in the past.
  5. Client incorrectly reasons from (2) and (4) that a worker connected and then disconnected.
  6. Meanwhile, the worker continues to be connected.
  7. Client incorrectly re-launches a new (superfluous) worker at the socket.

All I needed to do was switch the order of (2) and (4). Implemented in 18a8b28. Before 18a8b28, I could reliably reproduce the issue using #51 (comment). Now, I do not see any superfluous launches in the same test, or in tests/throughput/test-transient.R.

What a relief to have solved this!

@shikokuchuo
Copy link
Contributor

Yes, that's great, indeed swapping the order seems to work.

There might be some benefit from using condition variables, but only if latency or performance is important. You are basically registering callbacks to happen on each event - this is all asynchronous so it doesn't slow anything down, but you get the NNG stats 'for free', so I wouldn't implement unless there's a need.

But having said that, in the above, you are calling stat twice on each socket? If you have 500 sockets, that could be quite slow right? Reading the value of a condition variable would be almost instantaneous. I can give you pointers if you want to go down this route.

@wlandau
Copy link
Owner Author

wlandau commented Mar 30, 2023

Yes, for "discovered" workers, I do end up calling stat() twice. I haven't had the chance to load test 500 workers yet because I have only tried local process workers, but I will take your word for it that it could be slow. {crew} currently assumes these checks are practically instantaneous, so any slight inefficiency at scale may be noticeable.

I would love to try condition variables for the {crew} utilities I linked to. If you would be willing to help me get started on cv-powered drop-in replacements for stat(), I would really appreciate it.

@wlandau
Copy link
Owner Author

wlandau commented Mar 30, 2023

Opened #57 for this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants