Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance of {crew} vs {clustermq} #81

Closed
wlandau opened this issue May 23, 2023 · 41 comments
Closed

Performance of {crew} vs {clustermq} #81

wlandau opened this issue May 23, 2023 · 41 comments
Assignees

Comments

@wlandau
Copy link
Owner

wlandau commented May 23, 2023

Because of the blazing speed of mirai, I think crew has the potential to reach speeds comparable to clustermq. But so far, crew version 0.2.1, looks to be slower.

crew performance

In the following example, the timed part of the code took 10.477 seconds to complete on my 4-core Ubuntu machine.

library(crew)
controller <- crew_controller_local(workers = 4L)
controller$start()
controller$launch(n = 4L)
names <- character(0L)
index <- 0L
n_tasks <- 6000L
system.time(
  while (index < n_tasks || !(controller$empty())) {
    if (index < n_tasks) {
      index <- index + 1L
      controller$push(
        name = as.character(index),
        command = TRUE
      )
    }
    out <- controller$pop()
  }
)
controller$terminate()

Replacing system.time() with proffer::pprof(), it looks like task management is the bottleneck. There may be a way to rework the R code to make this more efficient.

crew

clustermq performance

The equivalent clustermq example only took 3.060 seconds.

library(clustermq)
options(clustermq.scheduler = "multicore")
f <- function(i) {
  crew::crew_eval(quote(TRUE))
}
system.time(out <- Q(fun = f, i = seq_len(6000), n_jobs = 4, verbose = FALSE))

And the flame graph:

Screenshot_20230523_081514

@wlandau wlandau self-assigned this May 23, 2023
@wlandau
Copy link
Owner Author

wlandau commented May 23, 2023

Looking at the crew flame graph, I wonder if easy speedups can be reached if I use:

  1. .subset() instead of $ (and something equally fast to replace $<-), and
  2. Lists instead of tibbles for task output.

@wlandau
Copy link
Owner Author

wlandau commented May 23, 2023

.subset2() seems to be able to select fields of R6 classes.

@nviets
Copy link

nviets commented May 23, 2023

I think R itself is going to be the limiting factor. Are there any bits that good be rewritten in C++?

@brendanf
Copy link

It seems like the largest single chunk is the updates to self$log.

If I'm not mistaken, you can use get() and assign() on an R6 object as replacements for $ and $<- which avoid dispatch. However I don't know any way in R to avoid using $ or [[ in the left hand side of something like self$log$popped_tasks[index] <- self$log$popped_tasks[index] + 1L. (Except of course doing it in C or C++).

As an aside, should this

         self$log$popped_errors[index] <- self$log$popped_errors[index] +
            !anyNA(out$error)
          self$log$popped_warnings[index] <-
            self$log$popped_warnings[index] + !anyNA(out$error)

be out$warning at the end of the last line?

@shikokuchuo
Copy link
Contributor

I noticed you classed your crew_monad but it seems only to add a print method. If you don't class it, you can use [[ on it without the extra method dispatch time, rather than the clunky .subset2(). This applies generally.

Using lists would be faster than tibbles, but it seems would negate some of the appeal of the nice interface that crew offers.

@shikokuchuo
Copy link
Contributor

I'll offer up my fast df/tibble creator. If you are creating the tibbles yourself, you can cut out all the validation code for a dramatic speedup. All you need is to add the following attributes to a list: (i) names, (ii) class, (iii) rownames. Illustrative example below.

cols <- 8L
colnames <- letters[seq_len(cols)]
rows <- 100L

df <- vector(mode = "list", length = cols)
for (i in seq_along(df))
  df[[i]] <- runif(rows)

attributes(df) <- list(names = colnames,
                       class = c("tbl_df", "tbl", "data.frame"),
                       row.names = .set_row_names(rows)))

Comes courtesy of ichimoku::matrix_df() a fast matrix to data frame converter.

@wlandau
Copy link
Owner Author

wlandau commented May 24, 2023

Thanks everyone, this is such helpful advice. Changing/limiting interactions with tibbles seems to be improving things already.

@wlandau
Copy link
Owner Author

wlandau commented May 24, 2023

I also wonder how much speed can be gained by simply removing the class attribute of all the R6 classes. This would probably require a downstream update to targets, which would take time to roll out at this point, but it would get there eventually.

@wlandau
Copy link
Owner Author

wlandau commented May 24, 2023

I also wonder how much speed can be gained by simply removing the class attribute of all the R6 classes.

Doesn't seem to make a difference.

@wlandau
Copy link
Owner Author

wlandau commented May 24, 2023

At this point, this is what the flame graph looks like on my Macbook:

Screenshot 2023-05-24 at 1 26 29 PM

And maybe the remaining bottlenecks come from the fact that crew is polling-based and single-task-based, so methods push(), pop(), and empty() need to be called a lot.

@wlandau
Copy link
Owner Author

wlandau commented May 24, 2023

A more fair comparison is the following test, and it runs in about 3.4 seconds locally (the system.time() part):

library(crew)
controller <- crew_controller_local(workers = 4L)
controller$start()
controller$launch(n = 4L)
names <- character(0L)
index <- 0L
n_tasks <- 6000L
Sys.sleep(5)
system.time({
  for (task in seq_len(n_tasks)) {
    controller$push(
      name = as.character(index),
      command = TRUE,
      scale = FALSE,
      seed = 0L
    )
  }
  while (length(controller$queue) > 0L) {
    controller$collect()
  }
})
controller$terminate()

@shikokuchuo
Copy link
Contributor

shikokuchuo commented May 24, 2023

At this point, this is what the flame graph looks like on my Macbook:

Screenshot 2023-05-24 at 1 26 29 PM

And maybe the remaining bottlenecks come from the fact that crew is polling-based and single-task-based, so methods push(), pop(), and empty() need to be called a lot.

As sample showed up on the chart, I made a quick search as I know this to be slow. It is used for setting the seed in a few places with a random number:

seed = sample.int(n = 1e9L, size = 1L)

You can just simply replace with

seed = NULL

as set.seed(seed = NULL) re-initialises the RNG.

or if you need to record the actual seed, you could use

seed = as.integer(random() / 2)

for a c. 8x speed up. The range of nanonext::random() is from 0 to 2x max integer value (as the underlying C value is a 32bit unsigned int).

@wlandau
Copy link
Owner Author

wlandau commented May 24, 2023

Thanks, @shikokuchuo. Implemented just now.

My original thinking was that the default seed should be tied to the RNG state of the calling session, but I think crew can let go of that because the submission/collection order of tasks is not reproducible anyway.

@wlandau
Copy link
Owner Author

wlandau commented May 24, 2023

After 7cb5111, the example from #81 (comment) looks like this (on the Macbook):

Screenshot 2023-05-24 at 4 51 17 PM

collect() is faster now because of improvements to vectorization. This makes me think crew controllers should have multi-push and multi-pop methods. Maybe that's the right way to handle functional programming, as opposed to a purrr-like package (c.f. #62).

@wlandau
Copy link
Owner Author

wlandau commented May 24, 2023

Moving to a more focused issue about multi-push and multi-pop.

@wlandau
Copy link
Owner Author

wlandau commented May 24, 2023

On second thought, efficiency improvements may also require a more efficient approach to queueing on crew's end.

@wlandau wlandau reopened this May 24, 2023
@wlandau
Copy link
Owner Author

wlandau commented May 25, 2023

I should probably try using environments/hash tables for the queues instead of lists that get incremented 1 task at a time.

@brendanf
Copy link

x[1] <- NULL does result in a complete copy of x in every iteration; you are presumably also causing a copy every time an element is added. Assuming my computer is about as fast as yours, this is a few percent of the total time. (I don't know the whole structure of the list elements, so I just made something up which is more complicated than a single number.)

> x <- rep(list(list(error = NULL, result = 3, warning = "")), 6000)
> system.time(while (length(x) > 0) x[1] <- NULL)
   user  system elapsed 
  0.117   0.010   0.127

How would you implement a queue in an environment? My first stab at it is much slower than the list version:

> x <- new.env()
> for (i in 1:6000) assign(sprintf("q%04d", i), list(error = NULL, result = 3, warning = ""), x)
> system.time(while (length(x) > 0) remove(list = ls(x)[1], pos = x))
   user  system elapsed 
  5.212   0.000   5.220 

The docs for ls() warn that a large fraction of the execution time is sorting the results; experiment verifies that this is the case, and using ls(sort = FALSE) is much faster than the list version. This one also includes timing on filling the queue:

> x <- new.env()
> system.time(for (i in 1:6000) assign(sprintf("q%04d", i), list(error = NULL, result = 3, warning = ""), x))
   user  system elapsed 
  0.016   0.000   0.016
> system.time(while (length(x) > 0) remove(list = ls(x, sorted = FALSE)[1], pos = x))
   user  system elapsed 
  0.002   0.000   0.003

If the results aren't coming out sorted, then it's not really a queue... but since we are doing asynchronous execution anyway, maybe it doesn't matter if pop() is FIFO.

Using a non-hash environment seems to yield a stack, but then it's slower than the list version:

> x <- new.env(hash = FALSE)
> system.time(for (i in 1:6000) assign(sprintf("q%04d", i), list(error = NULL, result = 3, warning = ""), x))
   user  system elapsed 
  0.119   0.000   0.119
> ls(x, sort = FALSE)[1]
[1] "q6000"
> ls(x, sort = FALSE)[6000]
[1] "q0001"
>system.time(while (length(x) > 0) remove(list = ls(x, sorted = FALSE)[1], pos = x))
   user  system elapsed 
  0.690   0.007   0.699

And popping the last element rather than the first (so it's FIFO) is even slower:

system.time(while (length(x) > 0) remove(list = ls(x, sorted = FALSE)[length(x)], pos = x))
   user  system elapsed 
  1.001   0.000   1.002

@wlandau
Copy link
Owner Author

wlandau commented May 25, 2023

x[1] <- NULL does result in a complete copy of x in every iteration; you are presumably also causing a copy every time an element is added.

That certainly explains a lot.

How would you implement a queue in an environment?

I found [[<- to be much faster than assign(), and I found names() to be much faster than ls() even when sort = FALSE. As of f8290b5, I am using environments as hash tables for controller$queue and controller$results where mirai objects are stored. Now, as far as adding/removing objects in those environments, the only non-instantaneous part seems to be crew::crew_random_name() to generate unique IDs. There is still an overall bottleneck, but more on that later.

@wlandau
Copy link
Owner Author

wlandau commented May 25, 2023

As I mentioned, in f8290b5 I switched crew to using environments to store mirai objects. It's not that much faster, but it does nicely consolidate all the slowness into a single identifiable bottleneck.

Now when I run:

library(crew)
controller <- crew_controller_local(workers = 4L)
controller$start()
controller$launch(n = 4L)
Sys.sleep(5)
index <- 0L
n_tasks <- 6000L
Sys.sleep(5)
px <- proffer::pprof({
  for (task in seq_len(n_tasks)) {
    controller$push(
      name = as.character(index),
      command = TRUE,
      scale = FALSE,
      seed = 0L
    )
  }
  while (length(controller$queue) > 0L) {
    controller$collect()
  }
})
controller$terminate()

I see the same clear bottleneck in both my Macbook and Ubuntu machines (although slightly less severe on the Macbook for some reason):

output

This bottleneck happens in the following lines of code. Below, queue is a new.env(hash = TRUE, parent = emptyenv()) environment containing mirai tasks. (And for what it's worth, crew gives each mirai object special attributes like "name" and "command" for internal bookkeeping purposes.)

crew/R/crew_controller.R

Lines 391 to 398 in f8290b5

not_done <- vapply(
X = names(queue),
FUN = function(id) {
.unresolved(queue[[id]])
},
FUN.VALUE = logical(1L),
USE.NAMES = TRUE
)

eapply(queue, .unresolved) is a more concise version of the above, but it's a tiny bit slower.

I don't know if there is a way in R to loop over an environment of mirai objects any faster, but if I believe the flame graph, then a solution to this bottleneck would make crew up to 8x faster in clustermq-like cases.

@shikokuchuo
Copy link
Contributor

This bottleneck happens in the following lines of code. Below, queue is a new.env(hash = TRUE, parent = emptyenv()) environment containing mirai tasks. (And for what it's worth, crew gives each mirai object special attributes like "name" and "command" for internal bookkeeping purposes.)

crew/R/crew_controller.R

Lines 391 to 398 in f8290b5

not_done <- vapply(
X = names(queue),
FUN = function(id) {
.unresolved(queue[[id]])
},
FUN.VALUE = logical(1L),
USE.NAMES = TRUE
)

eapply(queue, .unresolved) is a more concise version of the above, but it's a tiny bit slower.

I don't know if there is a way in R to loop over an environment of mirai objects any faster, but if I believe the flame graph, then a solution to this bottleneck would make crew up to 8x faster in clustermq-like cases.

I shouldn't think the hashed environment gives you much advantage as you don't need to retrieve or replace values by name/key.

I might be over-simplifying, but I think you can revert to using a list, and then it seems just this will do:

as.logical(lapply(queue, .unresolved))

Or refactor your code so that's all you need?

.unresolved() is guaranteed to return TRUE or FALSE in any case.

@brendanf
Copy link

eapply(queue, .unresolved) is a more concise version of the above, but it's a tiny bit slower.

That is depressing, this seems like the prototypical use case for eapply(). Presumable it's because it doesn't have an option to specify the type for the result (as vapply), so it can't preallocate.

I don't know if there is a way in R to loop over an environment of mirai objects any faster, but if I believe the flame graph, then a solution to this bottleneck would make crew up to 8x faster in clustermq-like cases.

Does that mean that you think the actual calls to .unresolved() should be much faster? If the potential savings is just the rest of vapply() then it looks more like a 2x speedup.

Did you try vapply(X=as.list(queue), FUN = .unresolved, FUN.VALUE = logical(1L), USE.NAMES = TRUE)?

@brendanf
Copy link

I think that my suggestion would require allocating an extra list for the result of as.list(). The question is whether that is faster than looking up every element of the queue by name.

@wlandau
Copy link
Owner Author

wlandau commented May 25, 2023

I just tried not_done <- lapply(X = as.list(queue), FUN = .unresolved), and the example yields the following flame graph on my Macbook:

Screenshot 2023-05-25 at 12 08 36 PM

as.list(queue) is only about 16% of the execution time, and there are other bottlenecks as well.

@shikokuchuo
Copy link
Contributor

I just tried not_done <- lapply(X = as.list(queue), FUN = .unresolved), and the example yields the following flame graph on my Macbook:

Screenshot 2023-05-25 at 12 08 36 PM

as.list(queue) is only about 16% of the execution time, and there are other bottlenecks as well.

Forget my suggestion about reverting to lists - I forget you were trying to avoid the other duplication costs. Just on the above, lapply() should be able to iterate over the environment without needing as.list(), although I'm not at a computer to check at the moment.

@brendanf
Copy link

I just tried not_done <- lapply(X = as.list(queue), FUN = .unresolved), and the example yields the following flame graph on my Macbook:
Screenshot 2023-05-25 at 12 08 36 PM
as.list(queue) is only about 16% of the execution time, and there are other bottlenecks as well.

Forget my suggestion about reverting to lists - I forget you were trying to avoid the other duplication costs. Just on the above, lapply() should be able to iterate over the environment without needing as.list(), although I'm not at a computer to check at the moment.

You're right! vapply() and lapply() also work on environments. I never knew that!

@brendanf
Copy link

How is the speed comparing to clustermq at this point?

@wlandau
Copy link
Owner Author

wlandau commented May 25, 2023

Forget my suggestion about reverting to lists - I forget you were trying to avoid the other duplication costs. Just on the above, lapply() should be able to iterate over the environment without needing as.list(), although I'm not at a computer to check at the moment.

Yes, great point. Further profiling shows lapply() converts the environment to a list behind the scenes anyway, so the flame graph look the same. But the bottleneck is much more concise now, which is really helpful.

not_done <- lapply(X = queue, FUN = .unresolved)

How is the speed comparing to clustermq at this point?

On my Macbook,crew is still a little slower than clustermq, about 3.5-4 seconds for the most recent example. Haven't tried the latest crew on my Ubuntu machine, but lately it has been about 8 seconds or so, mainly due to more time spent in that lapply()/eapply() bottleneck.

@wlandau
Copy link
Owner Author

wlandau commented May 25, 2023

You know what? Maybe I am optimizing the wrong thing, or reading the profiler output wrong. If I add a sleep to let the tasks finish:

for (task in seq_len(n_tasks)) {
    controller$push(
      name = as.character(index),
      command = TRUE,
      scale = FALSE,
      seed = 0L
    )
  }
  Sys.sleep(3)
  while (length(controller$queue) > 0L) {
    controller$collect(throttle = TRUE)
  }

then the controller$collect() "bottleneck", including lapply()/eapply(), vanishes:

Screenshot 2023-05-25 at 1 59 58 PM

Alternatively, when I turn on throttling in collect():

  for (task in seq_len(n_tasks)) {
    controller$push(
      name = as.character(index),
      command = TRUE,
      scale = FALSE,
      seed = 0L
    )
  }
  while (length(controller$queue) > 0L) {
    controller$collect(throttle = TRUE)
  }

then the "bottleneck" appears to be in length(controller$queue):

Screenshot 2023-05-25 at 2 05 28 PM

These may not be bottlenecks at all, they may just be the places where the profiler just happens to be sampling while crew waits for mirai tasks to resolve.

But I am pretty sure deparse_safe() and crew_random_name() are true profiler findings, so I will try to find faster alternatives to those.

@wlandau
Copy link
Owner Author

wlandau commented May 25, 2023

After solving #83, here is what I see in the flame graph after returning to the original crew example from #81 (comment).

new

I do not believe controller$empty() is a real bottleneck because all it does is check the length of a couple environments. I am pretty sure it is just waiting for tasks at that point.

crew/R/crew_controller.R

Lines 149 to 151 in ef35e06

empty = function(controllers = NULL) {
(length(self$queue) < 1L) && (length(self$results) < 1L)
},

@wlandau
Copy link
Owner Author

wlandau commented May 25, 2023

Speeds are looking really good if we remove delays waiting for tasks. The following is on my Ubuntu machine, which has proved much slower than my Macbook for these tests.

library(crew)
controller <- crew_controller_local(workers = 4L)
controller$start()
controller$launch(n = 4L)
Sys.sleep(5)


index <- 0L
n_tasks <- 6000L
system.time({
  for (index in seq_len(n_tasks)) {
    controller$push(command = TRUE)
  }
})
#>    user  system elapsed 
#>   0.571   0.022   1.122 

Sys.sleep(5)
system.time(controller$collect())
#>    user  system elapsed 
#>   0.024   0.000   0.025 

Sys.sleep(5)
system.time({
  for (index in seq_len(n_tasks)) {
    controller$pop()
  }
})
#>    user  system elapsed 
#>   0.570   0.000   0.614 

controller$terminate()

@wlandau
Copy link
Owner Author

wlandau commented May 25, 2023

And the analogous flame graphs:

library(crew)
controller <- crew_controller_local(workers = 4L)
controller$start()
controller$launch(n = 4L)
index <- 0L
n_tasks <- 6000L

When pushing tasks, deparse_safe() and rlang::call2() have the more noticeable bottlenecks.

Sys.sleep(5)
proffer::pprof({
  for (index in seq_len(n_tasks)) {
    controller$push(command = TRUE)
  }
})

1

collect() on 6000 completed tasks is a breeze.

Sys.sleep(5)
proffer::pprof(controller$collect())

2

And for pop(), I suppose I could think about replacing as.list() with something else, but users are probably not going to notice the difference.

Sys.sleep(5)
proffer::pprof({
  for (index in seq_len(n_tasks)) {
    controller$pop()
  }
})

3

@shikokuchuo
Copy link
Contributor

If we're in the business of eking out gains, one thing before I forget these details - you may pass the 'data' variables in crew_eval() to the '...' argument in mirai() again - these get used directly, whereas there is some language manipulation going on for using .args.

@wlandau
Copy link
Owner Author

wlandau commented May 25, 2023

Thanks! I implemented your suggestion and replaced rlang::call2() with base::as.call(). Both seem to really help (.895s down from 1.122s in the push() phase).

@wlandau
Copy link
Owner Author

wlandau commented May 25, 2023

The only 2 other things I can think of are to

  1. Make deparsing and storing the command optional and turned off by default.
  2. Make crew monads lists instead of environments so they can be turned into tibbles quickly.

After that, I think I will have done all the optimization I can from crew's perspective.

@wlandau
Copy link
Owner Author

wlandau commented May 25, 2023

After (2), the pop() phase in #81 (comment) only takes 0.249 seconds.

@shikokuchuo
Copy link
Contributor

The only 2 other things I can think of are to

1. Make deparsing and storing the command optional and turned off by default.

Deparsing is expensive. Why not just store the call (language object) - it still prints.

2. Make `crew` monads lists instead of environments so they can be turned into `tibble`s quickly.

I like this idea!

@wlandau
Copy link
Owner Author

wlandau commented May 25, 2023

Deparsing is expensive. Why not just store the call (language object) - it still prints.

From experience with targets, I don't like the way large sets of language objects consume memory. Probably not a huge deal, but it adds up to MBs sometimes.

@wlandau
Copy link
Owner Author

wlandau commented May 25, 2023

Implemented (1). The push() phase is now 0.764s, and the flame graph is beautiful.

Screenshot_20230525_174648

@wlandau wlandau closed this as completed May 25, 2023
wlandau pushed a commit that referenced this issue May 25, 2023
@shikokuchuo
Copy link
Contributor

Deparsing is expensive. Why not just store the call (language object) - it still prints.

From experience with targets, I don't like the way large sets of language objects consume memory. Probably not a huge deal, but it adds up to MBs sometimes.

I see, in which case, I can offer the following from mirai itself for when it deparses the call from a tryCatch error:

deparse(x, backtick = TRUE, control = NULL, nlines = 1L)

Specifying the arguments led to a real speedup (I forget of what magnitude). You can specify nlines = -1L if you want to ensure you capture absolutely everything.

@wlandau
Copy link
Owner Author

wlandau commented May 25, 2023

Thanks! I may take you up on that eventually. For now, however, the existing deparse_safe() is nearly as fast, and paste(deparse(x, backtick = TRUE, control = NULL, nlines = -1L) seems to break my tests for reasons I do not understand.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants