-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Hanging tasks in a targets
pipeline
#75
Comments
This is related to shikokuchuo/mirai#58 and shikokuchuo/mirai#53. Elusive intermittent problems like this have been persisting for weeks, and I have no idea how to solve them. Every time I try to simplify a reproducible example and peel back layers, everything starts working. The only thing I can think of to do at this point is to fork |
So far, this only happens when there are a lot of |
I don't think that would be fruitful as we've just seen the problem isn't likely to be within I wonder if it worked for you when you removed When it stops for me - it does after about 10 or so IDs, there is no error message. But my laptop fan turns on and if I check htop I see activity on all cores, even after quite some time and stays like this. This seems to suggest something is stuck in a (moderate) spin cycle somewhere. I say moderate as the CPUs aren't exactly maxing out, but enough for the fan to constantly be on. |
> sessionInfo()
R version 4.3.0 (2023-04-21)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Ubuntu 22.04.2 LTS
Matrix products: default
BLAS/LAPACK: /opt/intel/oneapi/mkl/2023.1.0/lib/intel64/libmkl_rt.so.2; LAPACK version 3.10.1
locale:
[1] LC_CTYPE=en_GB.UTF-8 LC_NUMERIC=C LC_TIME=en_GB.UTF-8 LC_COLLATE=en_GB.UTF-8
[5] LC_MONETARY=en_GB.UTF-8 LC_MESSAGES=en_GB.UTF-8 LC_PAPER=en_GB.UTF-8 LC_NAME=C
[9] LC_ADDRESS=C LC_TELEPHONE=C LC_MEASUREMENT=en_GB.UTF-8 LC_IDENTIFICATION=C
time zone: Europe/London
tzcode source: system (glibc)
attached base packages:
[1] stats graphics grDevices utils datasets methods base
other attached packages:
[1] crew_0.1.1.9006
loaded via a namespace (and not attached):
[1] vctrs_0.6.2 cli_3.6.1 knitr_1.42 rlang_1.1.1 xfun_0.39
[6] processx_3.8.1 targets_1.0.0.9001 data.table_1.14.8 glue_1.6.2 nanonext_0.8.3.9001
[11] mirai_0.8.7.9001 backports_1.4.1 ps_1.7.5 fansi_1.0.4 tibble_3.2.1
[16] base64url_1.4 yaml_2.3.7 lifecycle_1.0.3 compiler_4.3.0 codetools_0.2-19
[21] igraph_1.4.2 pkgconfig_2.0.3 rstudioapi_0.14 getip_0.1-3 digest_0.6.31
[26] R6_2.5.1 tidyselect_1.2.0 utf8_1.2.3 pillar_1.9.0 callr_3.7.3
[31] magrittr_2.0.3 tools_4.3.0 |
How I might approach it is: (i) current seconds_launch = 120,
workers = 20L,
seconds_idle = 5,
seconds_exit = 5 but (ii) presumably some set of arguments do. Start from the working example and change a single thing at a time until it breaks. |
Following my own advice, I removed all arguments except However, oddly exactly 19 IDs get printed, but if I check So to check if all the results are actually returned, I tried the following to collect the names: library(crew)
controller <- crew::crew_controller_local(
workers = 20L,
)
controller$start()
for (i in seq_len(6000)) {
print(i)
controller$push(
command = targets:::target_run(target, globalenv(), "_targets"),
data = list(target = targets::tar_target(y, tibble::tibble(x = x, a = 1, b = 2, c = "3"))),
)
tmp <- controller$pop()
}
names <- list()
while (!controller$empty()) {
out <- controller$pop()
if (!is.null(out)) {
names[[length(names) + 1L]] <- out$name
}
}
controller$terminate() The really strange thing is when I inspect the list > controller$log
# A tibble: 20 × 5
popped_tasks popped_seconds popped_errors popped_warnings controller
<int> <dbl> <int> <int> <chr>
1 5730 6.63 0 0 2b1d327c845473edc574c83593ec49e174102dad
2 97 0.146 0 0 2b1d327c845473edc574c83593ec49e174102dad
3 8 0.025 0 0 2b1d327c845473edc574c83593ec49e174102dad
4 4 0.042 0 0 2b1d327c845473edc574c83593ec49e174102dad
5 7 0.043 0 0 2b1d327c845473edc574c83593ec49e174102dad
6 6 0.025 0 0 2b1d327c845473edc574c83593ec49e174102dad
7 8 0.019 0 0 2b1d327c845473edc574c83593ec49e174102dad
8 4 0.014 0 0 2b1d327c845473edc574c83593ec49e174102dad
9 6 0.02 0 0 2b1d327c845473edc574c83593ec49e174102dad
10 25 0.101 0 0 2b1d327c845473edc574c83593ec49e174102dad
11 9 0.025 0 0 2b1d327c845473edc574c83593ec49e174102dad
12 5 0.015 0 0 2b1d327c845473edc574c83593ec49e174102dad
13 5 0.019 0 0 2b1d327c845473edc574c83593ec49e174102dad
14 10 0.02 0 0 2b1d327c845473edc574c83593ec49e174102dad
15 11 0.026 0 0 2b1d327c845473edc574c83593ec49e174102dad
16 26 0.071 0 0 2b1d327c845473edc574c83593ec49e174102dad
17 13 0.028 0 0 2b1d327c845473edc574c83593ec49e174102dad
18 5 0.016 0 0 2b1d327c845473edc574c83593ec49e174102dad
19 10 0.026 0 0 2b1d327c845473edc574c83593ec49e174102dad
20 11 0.021 0 0 2b1d327c845473edc574c83593ec49e174102dad |
This was again a bug in the testing script, and not so strange after all. The log does not lie and the 'missing' names were actually retrieved when library(crew)
controller <- crew::crew_controller_local(
workers = 20L,
)
controller$start()
tmp <- list()
for (i in seq_len(6000)) {
print(i)
controller$push(
command = targets:::target_run(target, globalenv(), "_targets"),
data = list(target = targets::tar_target(y, tibble::tibble(x = x, a = 1, b = 2, c = "3"))),
)
tmp[[length(tmp) + 1L]] <- controller$pop()
}
names <- list()
while (!controller$empty()) {
out <- controller$pop()
if (!is.null(out)) {
names[[length(names) + 1L]] <- out$name
}
}
controller$terminate() The lists |
When I remove > as.list(environment(mirai::daemons)$..)[[controller$router$name]]$urls
[1] "ws://10.0.0.100:44833/1/bd04f62038ee5a427854380633132a1e341d8df1"
[2] "ws://10.0.0.100:44833/2/e75a4d4cdf22a90aa1376f0cffb56c322826e148"
[3] "ws://10.0.0.100:44833/3/bcb54a27487e29c4937a2eaa4cf50c6d57fb9961"
[4] "\001"
[5] "ws://10.0.0.100:44833/5/765cd0485c20f6134a91754857302fcbeeace267"
[6] "ws://10.0.0.100:44833/6/2a1edd40efb40728153fafc187dcb13b87638b8e"
[7] "ws://10.0.0.100:44833/7/54018f30cb88bc887166c3b20295e4a891d2d814"
[8] "ws://10.0.0.100:44833/8/091fade50440a56d139b3568d52bcbb8289ced4d"
[9] "ws://10.0.0.100:44833/9/c87e60f05e3847b018a9b4bae8e710badf8f620a"
[10] "ws://10.0.0.100:44833/10/51558bf422b88aee4c46823e49f6d4271eba9ea5"
[11] "ws://10.0.0.100:44833/11/d298952ac3a432543e735c864743376df0e8411a"
[12] "ws://10.0.0.100:44833/12/69893838d7da504a154f2150c24d2c53ecadb89f"
[13] "ws://10.0.0.100:44833/13/cb6cf87635ce5a765d8034c95343e9015a2a8195"
[14] "ws://10.0.0.100:44833/14/eb948ce064ec9b3fed3f8bed1835631045856e7a"
[15] "ws://10.0.0.100:44833/15/f3ffd4ee05eaa8fcd68f1806b2320e9112266ba2"
[16] "ws://10.0.0.100:44833/16/e6c0159466116083760aca3dfdd1792d2563b504"
[17] "ws://10.0.0.100:44833/17/4403df7d7352f29718fdd4a0ad09054c73fdb576"
[18] "ws://10.0.0.100:44833/18/c952bb8aceff7e5b751b0b816edc2195cd9cadf3"
[19] "ws://10.0.0.100:44833/19/1fff8ebd2478e698f0a53a0068109f1b68f5df46"
[20] "ws://10.0.0.100:44833/20/941b05cf3cf3b6c5802ec278a8110ebc6cb05298" |
That's quite early. On my end, it's usually the 5900'th task or so. |
Circumstances where I am confident:
|
I finally created a reproducible example that does not use mirai_launcher_class <- R6::R6Class(
classname = "mirai_launcher_class",
inherit = crew::crew_class_launcher,
public = list(
launch_worker = function(call, launcher, worker, instance) {
settings <- eval(parse(text = call)[[1L]]$settings)
do.call(what = mirai::launch_server, args = settings)
}
)
)
crew_controller_mirai <- function(
name = "mirai",
workers = 1L,
host = NULL,
port = NULL,
seconds_launch = 30,
seconds_interval = 0.01,
seconds_timeout = 5,
seconds_idle = Inf,
seconds_wall = Inf,
seconds_exit = 1,
tasks_max = Inf,
tasks_timers = 0L,
reset_globals = TRUE,
reset_packages = FALSE,
reset_options = FALSE,
garbage_collection = FALSE,
auto_scale = "demand"
) {
router <- crew::crew_router(
name = name,
workers = workers,
host = host,
port = port,
seconds_interval = seconds_interval,
seconds_timeout = seconds_timeout
)
launcher <- mirai_launcher_class$new(
name = name,
seconds_launch = seconds_launch,
seconds_interval = seconds_interval,
seconds_timeout = seconds_timeout,
seconds_idle = seconds_idle,
seconds_wall = seconds_wall,
seconds_exit = seconds_exit,
tasks_max = tasks_max,
tasks_timers = tasks_timers,
reset_globals = reset_globals,
reset_packages = reset_packages,
reset_options = reset_options,
garbage_collection = garbage_collection
)
controller <- crew::crew_controller(
router = router,
launcher = launcher,
auto_scale = auto_scale
)
controller$validate()
controller
}
library(crew)
controller <- crew_controller_mirai(
workers = 20L,
tasks_max = 100
)
controller$start()
names <- character(0L)
index <- 0L
n_tasks <- 6000L
while (index < n_tasks || !(controller$empty())) {
if (index < n_tasks) {
index <- index + 1L
cat("submit", index, "\n")
controller$push(
name = as.character(index),
command = TRUE
)
}
out <- controller$pop()
if (!is.null(out)) {
cat("collect", out$name, "\n")
names[[length(names) + 1L]] <- out$name
}
}
# unresolved tasks
lapply(controller$queue, function(x) x$handle[[1L]]$data)
controller$terminate() |
As I mentioned, Lines 219 to 225 in d696d43
Lines 303 to 305 in d696d43
Just now, when I ran #75 (comment) with the following change to poll = function() {
out <- mirai::daemons(.compute = self$name)$daemons
if (!daemons_valid(out)) {
print(out)
stop("invalid daemons")
}
self$daemons <- out
invisible()
} I see: 'errorValue' int 5 | Timed out
Error in self$router$poll() : invalid daemons I will have to see if this happens in the motivating |
I've run on multiple machines and don't get this error. Have you compared the versions against the |
Alternatively, do you get all 6000 results as I described using the code I posted earlier: #75 (comment) |
It has always been the case that a |
This is tough to untangle because the corrupted URL seems to only explain the "Error in |
I am most concerned about this one: |
I just ran on Posit cloud free to check it's reproducible and there's nothing special about my setup. I installed The following code succeeds without error and prints out all 6000 IDs. There are only 2 workers as otherwise it runs out of memory: library(crew)
controller <- crew::crew_controller_local(
seconds_launch = 120,
workers = 2L,
seconds_idle = 5,
seconds_exit = 5
)
controller$start()
for (i in seq_len(6000)) {
print(i)
controller$push(
command = targets:::target_run(target, globalenv(), "_targets"),
data = list(target = targets::tar_target(y, tibble::tibble(x = x, a = 1, b = 2, c = "3"))),
)
#tmp <- controller$pop()
}
while (!controller$empty()) {
out <- controller$pop()
if (!is.null(out)) {
print(out$name)
}
}
controller$terminate()
|
Thanks for testing. I forgot to mention: except for GitHub Actions, these issues only seem to appear when the number of servers/workers is high, higher than probably necessary for the task load. With your version of the example from #75 (comment) which uses only 2 workers, everything seems to work. After 57c8174, which implements #75 (comment) permanently, the GitHub Actions |
From https://github.com/ropensci/targets/actions/runs/4983866816/jobs/8921453397, I actually see a variety of issues on GitHub Actions. Sometimes tests show |
I have compared
My Ubuntu machine and my company's RHEL7 SGE cluster produce similar issues that present as hanging tasks. My Macbook does not seem to show issues.
I will try reinstalling |
That's good to know. I was just afraid somehow you or I had different local code. For me I'm not able to reproduce like on your Macbook. If you have any other reprexes, please do share. It might be useful.
Thanks, re-installing the packages will just help confirm. Sorry what I meant by corrupted was perhaps the installed bytecode somehow linked outdated dependencies.
Error 5 would be if client thinks there is a connection but dispatcher has crashed for instance or network is otherwise unavailable. Error 7 means that dispatcher has actually closed the socket so the client knows this was an intentional disconnection (could happen if for example the dispatcher tries to listen to an invalid URL, errors and exits 'gracefully'). Potentially symptoms of the same problem - I wouldn't read too much into the distinction at present. |
On this one, I see probably nearer what you're seeing - almost all get collected, bar about 10 unresolved at the end. Sometimes literally it stops at Just want to check on the scaling logic - at the end all the servers get reinstated repeatedly I guess, as the queue is not empty and I guess that is what is keeping the cores busy. If it is not currently the case, try reinstating ALL of them as a test - could it be that the unresolved ones are stuck at one server that is somehow being left out? |
Glad you see what I am seeing.
I tried reinstalling nanonext and mirai on both the SGE cluster and local Ubuntu machine, then running the big
Do you mean rescaling them up to the full 20 servers instead of just |
Precisely, I mean without knowing more, it looks the same as if the mirai were sent to a URL where there is no active server instance and hence they just remain waiting for a server to come online. Meanwhile I am seeing if I can make the daemons control socket more robust to rule that out. |
I have committed c6f23c7 as v0.8.7.9003. This upgrades the bus sockets to req/rep. While the Req/rep should prevent this. As the requests are synchronous i.e. one after the other, I should not think I need to be even safer and wrap everything in contexts - the global socket context should be sufficient. While this does not solve the hanging per se, it should hopefully eliminate this type of error. Please have a try with your initial reprex removing Hopefully it brings us one step closer. Thanks! |
I believe this is the issue! Using the 'big targets pipeline' #75 (comment). If I interrupt when it hangs, then manually launch servers using |
Now that you mention it, I think this is part of the picture. I just reran the original motivating big It's odd though, a single re-launched worker should have been enough to run a single task. 50 is too many. |
I ran it again, and this time I got 5 stuck tasks. When I ran In the SGE cluster case, I think restarting all possible workers will cause the remaining stuck tasks to run. But I don't think it should require that many. If there are 3 stuck tasks and I submit 3 workers, those 3 workers should be enough. It feels like some tasks may be dispatched to the incorrect websocket URLs. It makes sense as a hypothesis because when I re-launch more workers, it increases the number of URLs that can accept tasks, so it's more of a chance that the dispatcher will get lucky if it picks the wrong URL. |
Yeah, calling |
This leaves us tantalisingly close. We've confirmed the hypothesis that we are simply launching the wrong servers - I'll explain below. This can be so as mirai can already be assigned to a particular server (but is not always the case) [1] - in a nutshell as we are relying on a bit of NNG's reliable delivery magic [2]. If so, that particular server must be re-launched when scaling otherwise it will just remain there and never get done. Luckily it is a straightforward solution. All that's needed is to add one step to Once implemented, this issue falls away. -- Details below for additional explanation. If any of it is not clear just let me know. [1] if a task is sent to a server, but that server then shuts down by timing out etc., that task is retained at the NNG library level at that socket, when a new instance connects it is resent automatically. This could happen not just because there is an 'exitlinger' period, but it may not be possible to ensure a server closes it's connection (and for this to get registered at dispatcher) before dispatcher has had the chance to send it the next task, as everything happens asynchronously. [2] It is possible to override this but then (i) any solution will necessarily be more brittle and potentially introduce a source of bugs and (ii) be less performant as we'd be attempting to re-implement in R what is already part of the C library. [3] The cumulative stats are needed, not just the snapshot for each instance, as 'assigned' must equal 'complete' over all instances but not necessarily any particular instance. Using a max one task server as an example: first instance, 2 assigned, 1 complete. assigned > complete so re-launch this server. Second instance 0 assigned, 1 complete (server finishes the previous task, doesn't get sent a new one) or 1 assigned, 1 complete (server finishes the previous task, also gets sent a new one). Neither of those cases assigned > complete, but in the second case you'd want to re-launch as there is a task waiting to complete at that server. |
I am eager to try this, thanks for explaining. I will have questions about counters/stats later today, but for now I will ask: when I relaunch a server in this particular scenario when there is a task is still in NNG, should I avoid calling saisei() first? saisei() is super helpful in preventing conflicts with servers that time out on launch, so I always call it on relaunch. But it rotates the websocket URL, which seems like it may orphan a task which is permanently glued to a particular websocket at the NNG level. |
That we solved before - as |
Great to know, that simplifies things.
Are the
Yes, I have observed this happening sometimes. If I see a server like this that is also "inactive" (online = 0 and either instance > 0 or launching timed out) I can make it a top priority for relaunch.
So this is where the "cumulative" part comes in, right? Since the stats reset on relaunch, I will need to sum the existing stats with the cumulative stats before the last reset in order to reason about assigned tasks. |
I guess 'yes' is the answer. If you see online and instance as '0' with stats for 'assigned' and 'complete' then you know those are the 'cumulative' ones for the instance that just ended. When a new instance connects those are zeroed out.
Just so you are confident in what is going on here - at dispatcher when a task is sent to the server, 'assigned' is incremented. The receive is asynchronous, and can complete at any time thereafter. Now if the server times out and the instance changes, the task is retained and resent at the NNG level as I mentioned - this part is not seen by R hence it is not logged. When the result is eventually sent back, the receive completes - at this point 'complete' is incremented. That is why for any individual instance, you can have 'assigned' being different to 'complete'.
Precisely. |
Sorry just so there's no confusion - what I mean is actually if you see online as '0' with stats for 'assigned' and 'complete' then you know those are the 'cumulative' ones for the instance that just ended. If you call Then if a server is launched and connects into that URL, online and instance both turn to '1' and the 'assigned' and 'complete' stats are zeroed out. |
I think I might hit a race condition trying to update the cumulative versions of
It would be convenient to update the stats during (1) when polling happens, but then new tasks could complete between (1) and the next call to
Step (5) is needed in case any tasks complete between (1) and This is all technically possible to implement, but it doubles the amount of polling required. Is there a more efficient way to keep track of the cumulative counts? Could |
I am not sure if the following works: poll daemons() and update the cumulative stats only for those servers where online is '0' and instance is '1' ie for ended instances. We know for these it is not possible for any updates to the stats to occur as no server is connected at this point. For all of these call saisei(). This changes the URL and zeroes instance. This means next time daemons() is polled you don't add the stats again as instance is not '1'. Actually only launch servers for those instances where you determine you need to.
This would be equally inefficient, as getting the stats involves a separate call to dispatcher equivalent to daemons(). I have made the 2 completely separate as one returns integer information and one character information. FYI I am retrieving real binary vectors here not a serialised R object, so these necessitate 2 separate nanonext sends. I wouldn't be worried too much about additional calls as they have already been made as lean as possible. If it's easier in terms of logic to do them, I'd just go ahead. |
I think this almost works. Besides servers with online = 0 and instance = 1, there may be lurking servers with online = 0 and instance = 0 which launch and complete tasks between daemons() and saisei(). I think I could poll, saisei() these "lost" workers, poll again, and then then update the cumulative stats for servers with online = 0 and instance = 1. Unless... would I ever see instance = 0 with assigned > 0 or complete > 0? |
Is that true? I thought the use of saisei() prevented this. As soon as saisei() is called the URL changes and also instance is set to 0. So even if there is a lurking server it will just die - it will never connect or affect the stats. So every time you see online as 0 and instance as 1, you know a server has come and gone, irrespective of whether it has completed any tasks. Don't forget you only ever launch servers after calling saisei() first. Does that make sense? |
Sorry I missed this bit. No you wouldn't, instance would always be 1 once a server connects and there must be a server for there to be assigned or competed tasks. Not until you call saisei(), at that point the instance does zero out. |
True, but in order to know which servers need
To prevent the bug in (6), I think
Very helpful, thanks. |
Thanks for laying it out so clearly. I agree with your reasoning. You're getting really good at these! I'd happily make the extra daemons() call at 5. As I mentioned, I've already updated these to be as efficient as possible to avoid serialisation/deserialisation costs and minimise the amount of bytes actually transmitted. 2 calls probably equals 1 call previously! |
As of cfb4ce2, the hanging with long pipelines is solved! Thank you so much for the constant help, I could not have figured it out on my own. I still get sporadic instances of |
What an outcome! It only took some 200 odd Github messages across this issue as well as shikokuchuo/mirai#53 and shikokuchuo/mirai#58! |
Tasks hang when running this example. The
mirai
s show as "unresolved".And if I wait long enough, I see
The text was updated successfully, but these errors were encountered: