-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Incorrect cumulative “complete” stats on Linux #90
Comments
I added a new test script in https://github.com/wlandau/crew/blob/main/tests/mirai/test-tallies.R (also inline below) which reproduces the bug on my Ubuntu and RHEL 7 machines. It faithfully represents what The only piece library(crew)
library(mirai)
# Implements throttling to avoid overburdening the {mirai} dispatcher.
throttler <- crew::crew_schedule()
# Efficient and convenient data structure to keep track of {mirai} tasks.
# It has a hash table for new tasks and a first-in/first-out linked list
# for resolved tasks. It calls nanonext::.unresolved() to collect resolved
# tasks, but otherwise it does not rely on {mirai}/{nanonext}. I highly doubt
# it is the source of the {crew} bugs in #88 or #89.
schedule <- crew::crew_schedule()
schedule$start()
# Start the {mirai} client.
n <- 20L
mirai::daemons(
n = n,
url = "ws://127.0.0.1:5000",
dispatcher = TRUE,
token = TRUE
)
# Mutable structure with {crew} worker info. This is the primary
# data structure of each {crew} launcher.
workers <- new.env(parent = emptyenv()) # For mutability.
workers$workers <- tibble::tibble(
handle = replicate(n, new.env(), simplify = FALSE), # callr::r_bg() handles
socket = environment(mirai::daemons)$..$default$urls, # starting URLs
launches = rep(0L, n), # number of times a worker was launched at this index
launched = rep(FALSE, n), # FALSE if the worker is definitely done.
assigned = rep(0L, n), # Cumulative "assigned" stat to check backlog (#79).
complete = rep(0L, n) # Cumulative "complete" stat to check backlog (#79).
)
# For {mirai} servers with online == 0L and instance == 1L,
# rotate the websocket URL. Also set workers$launched to FALSE,
# which signals that tally() can safely update the cumulative
# "assigned" and "complete" statistics (#79).
rotate <- function(workers) {
info <- mirai::daemons()$daemons
done <- which(info[, "online"] < 1L & info[, "instance"] > 0L)
for (index in done) {
socket <- mirai::saisei(i = index, force = FALSE)
if (!is.null(socket)) {
workers$workers$socket[index] <- socket # Next launch is at this URL.
workers$workers$launched[index] <- FALSE # Lets tally() update stats.
}
}
}
# For workers that are definitely done and not going to dial in until the
# next launch, update the cumulative "assigned" and "complete" which {crew}
# uses to detect backlogged workers (#79). A backlogged worker is a {mirai}
# server with more assigned than complete tasks. Detecting the backlog
# is important becuase if a worker is disconnected and backlogged,
# then {crew} will need to relaunch it so the backlogged tasks can run.
tally <- function(workers) {
info <- mirai::daemons()$daemons
# New stats from daemons()
new_assigned <- as.integer(info[, "assigned"])
new_complete <- as.integer(info[, "complete"])
# Current cumulative stats.
old_assigned <- workers$workers$assigned
old_complete <- workers$workers$complete
# Not all worker stats can be safely updated. We need to make sure
# the worker is completely done and the websocket is rotated.
# Otherwise, the counts could change between now and the next official
# launch of the worker. It is tricky to avoid this race condition.
index <- !(workers$workers$launched) # Workers safe to update.
workers$workers$assigned[index] <- old_assigned[index] + new_assigned[index]
workers$workers$complete[index] <- old_complete[index] + new_complete[index]
invisible()
}
# In {crew}, the scale() method of the launcher class
# re-launches all backlogged non-launched workers,
# and then it may launch additional non-launched workers
# in order to meet the demand of the task load.
# The scale() function below is a simplified version which launches
# all non-launched workers.
scale <- function(workers) {
for (index in which(!workers$workers$launched)) { # non-launched workers
# I would have used mirai::launch_server() here, but callr::r_bg()
# allows me to manually terminate the server without calling
# mirai::daemons(n = 0L). This is important for updating the final
# assigned and complete tallies later on.
workers$workers$handle[[index]] <- callr::r_bg(
func = function(url) mirai::server(url = url, maxtasks = 100L),
args = list(url = workers$workers$socket[index])
)
# Increment the launch count.
workers$workers$launches[index] <- workers$workers$launches[index] + 1L
# Signal to tally() to wait for this worker to complete
# instead of updating the cumulative assigned and complete stats.
workers$workers$launched[index] <- TRUE
}
}
index <- 0L # current task
n_tasks <- 60000L # all tasks
while (index < n_tasks || schedule$nonempty()) { # while there is work to do
if (!throttler$throttle()) { # avoid overburdening the {mirai} dispatcher
rotate(workers) # Rotate the URLs of done workers.
tally(workers) # Update the cumulative stats for done workers.
scale(workers) # Re-launch all the done workers.
}
# If there are still tasks to launch, launch one.
if (index < n_tasks) {
index <- index + 1L
cat("push", index, "\n")
task <- mirai(index, index = index)
# The "schedule" is nothing fancy for the purposes of #88 and #89,
# it is just a fast data structure for bookkeeping {mirai} objects
# without the other frills in {crew}.
schedule$push(task)
}
# Try to process the results of finished tasks.
if (schedule$nonempty()) { # If there are still tasks to process...
# Call nanonext::.unresolved() and move resolved tasks
# from the hash table in schedule$pushed to the first-in/first-out
# linked list in schedule$collected.
schedule$collect()
task <- schedule$pop() # Return a task that was resolved and collected.
# pop() returns NULL if there is no resolved/collected task.
if (!is.null(task)) {
cat("pop", task$data, "\n")
}
}
}
# Manually terminate the workers without calling mirai::daemons(n = 0L).
# This allows the final tally to be updated correctly.
for (handle in workers$workers$handle) {
if (inherits(handle, "r_process") && handle$is_alive()) {
handle$kill()
}
}
# Update the final tally and clean up the dispatcher.
rotate(workers)
tally(workers)
daemons(n = 0L)
# The cumulative assigned and complete statistics should be equal for
# each worker.
print("worker info")
print(workers$workers[, c("launches", "assigned", "complete")])
# Should equal n_tasks.
print("total assigned")
print(sum(workers$workers$assigned))
# Should equal n_tasks.
print("total complete")
print(sum(workers$workers$complete))
# The backlog should be 0 for all workers.
print("backlog per worker")
print(table(workers$workers$assigned - workers$workers$complete)) |
Yes, #87 broke things, but it also made it so easy to create reproducible examples like the one above. |
The discrepancy between |
I tested 34800d9 (just before #87) and it turns out that the cumulative stats issue was present before the major refactor I did last week. @shikokuchuo, I have been assuming that the correct |
The stats on dispatcher are actually not complicated. They are just counters that are incremented:
This is all done in R, so nothing to do with buffered messages or anything like that. |
That helps, thanks. As you say, straightforward and not at the C or NNG level. I started keeping track of individual snapshots of the Some servers show 101 tasks assigned, which makes sense because task dispatch and server termination can be asynchronous. And indeed, the total assigned stat has always shown the correct number of tasks, so I don't think there is anything to worry about with regard to But for the library(crew)
library(mirai)
packageVersion("nanonext")
#> [1] ‘0.9.0.9008’
packageVersion("mirai")
#> [1] ‘0.8.7.9027’
packageVersion("crew")
#> [1] ‘0.2.1.9011’
throttler <- crew::crew_schedule()
schedule <- crew::crew_schedule()
schedule$start()
mirai::daemons(
n = 20L,
url = "ws://127.0.0.1:5000",
dispatcher = TRUE,
token = TRUE
)
# Start all servers right away to simplify things.
for (url in environment(mirai::daemons)$..$default$urls) {
launch_server(url = url, maxtasks = 100L)
}
envir <- new.env(parent = emptyenv())
envir$assigned <- list()
envir$complete <- list()
envir$workers <- list()
# scale() takes a snapshot of the stats of the servers.
# If the server is online (or never launched) then
# assigned and complete are 0. Otherwise, we note the values from
# daemons() at that moment. So any stats above 0 are the final
# stats from the last completed instance of the server.
scale <- function(envir) {
info <- as.data.frame(daemons()$daemons)
done <- info$online < 1L & info$instance > 0L
envir$assigned[[length(envir$assigned) + 1L]] <- done * info$assigned
envir$complete[[length(envir$complete) + 1L]] <- done * info$complete
for (index in which(done)) {
url <- saisei(i = index)
if (!is.null(url)) {
envir$workers[[index]] <- callr::r_bg(
func = function(url) mirai::server(url = url, maxtasks = 100L),
args = list(url = url)
)
}
}
}
# Run the tasks.
index <- 0L
n_tasks <- 60000L
while (index < n_tasks || schedule$nonempty()) {
if (!throttler$throttle()) {
scale(envir)
}
if (index < n_tasks) {
index <- index + 1L
cat("push", index, "\n")
task <- mirai(index, index = index)
schedule$push(task)
}
if (schedule$nonempty()) {
schedule$collect()
task <- schedule$pop()
if (!is.null(task)) {
cat("pop", task$data, "\n")
}
}
}
# Terminate all the servers and take 1 final snapshot.
purrr::walk(envir$workers, ~.x$kill())
scale(envir)
# Matrices with 1 column per server and 1 row per snapshot.
assigned <- do.call(rbind, envir$assigned)
complete <- do.call(rbind, envir$complete)
sum(assigned) # The assigned stats are okay.
#> [1] 60000
sum(complete) # The "complete" stats look too low because all the tasks actually finished.
#> [1] 59940
# Let's look at the distribution of tasks of workers that completed,
# excluding the ad hoc snapshot at the end.
# Sometimes more than 100 tasks were assigned, which makes sense.
table(assigned[-nrow(assigned), ])
#> 0 100 101
#> 5882 578 20
# But what would cause some self-terminating servers to count 99 complete tasks
# even though maxtasks = 100?
table(complete[-nrow(complete), ])
#> 0 99 100
#> 5882 59 539
daemons(0L) |
That doesn't look right. As this is a throughput test, it might be possible something like this is happening:
A different way to do this would be for dispatcher to record the cumulative stats and not reset. The in the above case:
As it seems this statistic is not driving anything, it is just a matter of if you want to ensure 'accounting' accuracy on your end from using these stats - it is also possible to log complete stats through counting the resolved tasks themselves for example. |
Incidentally shikokuchuo/mirai@2200fac helps, reducing the discrepancy a lot, but not eliminating altogether. |
It would be amazing if I could get cumulative assigned and complete stats directly from the dispatcher. As well as robustly solving this issue so crew can more accurately auto-scale, this would simplify crew a whole lot and reduce pooling. |
I think I am doing all I can using the information from the dispatcher. Accurate accounting matters for auto-scaling because as we found earlier, backlogged workers need to be re-launched. Workers incorrectly flagged as backlogged will have unnecessarily launches, and that could waste resources. |
This would need to be done on a server-by-server basis. Unless I am missing something, I think it would be difficult to attribute all resolved tasks to the servers they ran on for the purposes of assessing backlog. I would need to download all the data from all the resolved tasks and then look for the metadata from crew which tracks this using its own environment variables. For large data objects or lots of tasks, I do not think crew could keep up. |
This is done in shikokuchuo/mirai@1251a54 Interestingly this doesn't actually break {crew} tests, but you will have to re-work your scaling logic based on this. I also tested your original example above #90 (comment) and it still completes fine. If you call |
Wow, this is absolutely perfect! Thank you so much! I think it doesn't break |
Great that it all worked out as it simplifies things on both ends! I would have suggested this sooner but I was under the impression you needed the per instance stats to drive your autoscaling. I am going to strip out refhook (for now) and attempt another CRAN release of mirai as soon as you confirm #88 is fixed. This should simplify development going forward and provide a good base for adding further features such as TLS (almost ready in nanonext). |
Similarly - I should just ask - do you need the instance counter to reset to 0 when That seems to be the only other place these stats reset. This shouldn't cause any issues, but with all the async stuff that is going on, it seems cleaner if this can be avoided. |
I have no need for
A CRAN release of |
Yes, I was referring specifically to
That's a good timeframe. I am all prepared on my end. Also for the record I re-implemented a more ambitious fix in shikokuchuo/mirai@2200fac, which aims not to waste any cycles. |
Related to #88. When I run https://github.com/wlandau/crew/blob/main/tests/throughput/test-backlog-tasks_max.R on my local Ubuntu machine, I always see incorrect (too low) assigned and complete stats. I occasionally see
complete
too low by 1 or 2 on a remote RHEL 7 node. Can't reproduce it on my local Macbook.The text was updated successfully, but these errors were encountered: