Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic branching #685

Closed
wlandau opened this issue Jan 18, 2019 · 61 comments
Closed

Dynamic branching #685

wlandau opened this issue Jan 18, 2019 · 61 comments

Comments

@wlandau
Copy link
Member

wlandau commented Jan 18, 2019

We want to declare targets and modify the dependency graph while make() is running. Sometimes, we do not know what the targets should be until we see the values of previous targets. The following plan sketches the idea.

library(dplyr)
library(drake)
drake_plan(
  summaries = mtcars %>%
    group_by(cyl) %>%
    summarize(mean_mpg = mean(mpg)),
  individual_summary = target(
    filter(summaries, cyl == cyl_value),
    transform = cross(cyl_value = summaries$cyl)
  )
)

Issues:

  1. How will outdated() work now? Do we have to read the targets back into memory to check if the downstream stuff is up to date?
  2. This is the biggest implementation challenge drake has faced. Hopefully the work will migrate to the workers package.
@wlandau wlandau mentioned this issue Jan 18, 2019
@wlandau wlandau changed the title Dynamic branching Initial planning phase: dynamic branching Jan 20, 2019
@billdenney
Copy link
Contributor

One implementation idea that seems as though it could be "simple" to implement (says the guy who hasn't implemented anything in drake yet): Could there be a "plan in a plan" concept?

My thought is:

  • Generate outer_plan and inner_plan
  • Make a step in outer_plan that is: if (selection_criterion) make(inner_plan)
  • From an outdated() standpoint, you could add detection logic for "the inner_plan never needed to run", and as long as selection_criterion is up to date, it could have a status "not required to run". Or even simpler, if selection_criterion and inner_plan don't change, then I think outdated would show up correctly for outer_plan without modification-- the steps of inner_plan would just never show up.

@wlandau
Copy link
Member Author

wlandau commented Feb 20, 2019

A similar idea was proposed in #304. I would actually prefer to avoid nested plans because of the complexity and pre-planning they would require.

@billdenney
Copy link
Contributor

Fair enough. If I think of something else, I'll post it.

@brendanf
Copy link
Contributor

I want to point out the way Snakemake currently handles this, as a possible inspiration:

  • targets which will trigger a rebuild of the plan are declared differently (in Snakemake, as "checkpoint" rather than "rule")
  • dependencies which are conditional on the output of a checkpoint are marked by a call to "checkpoints..get()"

The second part is quite idiosyncratic to Python, so I wouldn't suggest it be implemented in the same way, but it seems easier to make the user explicitly mark the cases where dynamic branching needs to happen, than to try to detect it from the structure of their dependencies.

Using your example:

library(dplyr)
library(drake)
drake_plan(
  summaries = mtcars %>%
    group_by(cyl) %>%
    summarize(mean_mpg = mean(mpg)),
  individual_summary = target(
    filter(summaries, cyl == cyl_value),
    transform = cross(cyl_value = dynamic(summaries$cyl))
  )
)

One clear difference is that, using an R-based framework rather than a file-based framework, the output of summaries is still only one object. In a file-based framework, it might have been an unknown number of files.

As I wrote that example, I realized that it's actually much more similar to how Snakemake used to do it. In the end, they decided against that way, so maybe it would be good to know why and learn from that. Perhaps it's not relevant in drake's framework though.

@wlandau
Copy link
Member Author

wlandau commented May 6, 2019

Discussion from #233 carries over to this thread.

@wlandau
Copy link
Member Author

wlandau commented May 20, 2019

Users really want this flexibility, and often just assume drake already supports it, but I am beginning to question this dream scenario. If we try to implement dynamic branching deeply in drake's internals, we would need to rip half the package apart and double the complexity. Even if we offload scheduling to workers, we would still be in trouble. We would need to update config$graph, config$queue (the priority queue), and config$layout all mid-make(). drake is simply not designed for this.

The more I think about it, the more wisdom I see in @krlmlr's thinking behind #304. Possible compromise: a new split() transformation. @kendonB's #833 certainly seems to address this use case, not to mention #77.

@wlandau wlandau mentioned this issue May 20, 2019
4 tasks
@wlandau
Copy link
Member Author

wlandau commented May 23, 2019

Update: we now have split() in the dev version: https://ropenscilabs.github.io/drake-manual/plans.html#split. Should cover many use cases that would have otherwise required dynamic branching.

@wlandau
Copy link
Member Author

wlandau commented Jun 3, 2019

A more expedient approach

After talking with @dgkf at SDSS last week, I am no longer as reluctant as in #685 (comment). We can avoid a mess if we give dynamic branching its own DSL that works in tandem with the existing transformation DSL. This new dynamic DSL is just the transformation DSL invoked at runtime.

Proposal

library(drake)
plan <- drake_plan(
    vector_of_settings = target(
        f(x),
        transform = map(x = c(1, 2))
    ),
    analysis = target(
        g(x, y),
        transform = map(x),
        dynamic = map(y = vector_of_settings)
    )
)

print(plan)
#> # A tibble: 4 x 3
#>   target               command dynamic   
#>   <chr>                <expr>  <list>    
#> 1 vector_of_settings_1 f(1)    <lgl [1]> 
#> 2 vector_of_settings_2 f(2)    <lgl [1]> 
#> 3 analysis_1           g(1, y) <language>
#> 4 analysis_2           g(2, y) <language>

print(plan$dynamic)
#> [[1]]
#> [1] NA
#> 
#> [[2]]
#> [1] NA
#> 
#> [[3]]
#> map(y = vector_of_settings_1)
#> 
#> [[4]]
#> map(y = vector_of_settings_2)

drake_plan_source(plan)
#> drake_plan(
#>   vector_of_settings_1 = f(1),
#>   vector_of_settings_2 = f(2),
#>   analysis_1 = target(
#>     command = g(1, y),
#>     dynamic = map(y = vector_of_settings_1)
#>   ),
#>   analysis_2 = target(
#>     command = g(2, y),
#>     dynamic = map(y = vector_of_settings_2)
#>   )
#> )

Created on 2019-06-03 by the reprex package (v0.3.0)

When we create new targets, we probably do not need to register them in config$layout or the priority queue. Suppose make() is running, it just built/checked vector_of_settings_1, and we are about to build analysis_1. Suppose vector_of_settings_1 evaluated to c("a", "b"). Then, we could

  1. Create new "target" names analysis_1_a and analysis_1_b.
  2. Create new commands for each.
  3. Submit those new commands to the scheduler as consecutive jobs, skipping outdated targets.
    • Need to check analysis_1_a and analysis_1_b individually, based on previously-cached metadata.
  4. Store a value and metadata list for each target.
  5. Store a value for target analysis_1 that
    1. Is human-readable, and
    2. Reacts to changes in analysis_1_a and analysis_1_b.
cache <- drake_cache() # successor of get_cache()
cache$get_hash("analysis_1_a")
#> [1] "1d5108bacae437a0"
cache$get_hash("analysis_1_b")
#> [1] "17b1fbe1609400b9"
readd(analysis_1)
#> target             hash
#> 1 analysis_1_a 1d5108bacae437a0
#> 2 analysis_1_b 17b1fbe1609400b9

Remarks

  • Can anyone think of a better name than dynamic for this new field?
  • We need to detect dependencies in the dynamic column (or whatever we name it) just like command and triggers.
  • dynamic should print as an <expr> column in the tibble just like command.

Thanks

This idea, along with the original DSL, were inspired by @krlmlr in #233
#304.

@wlandau
Copy link
Member Author

wlandau commented Jun 3, 2019

Hmm... what about targets downstream of analysis_1_a and analysis_1_b?

@wlandau
Copy link
Member Author

wlandau commented Jun 3, 2019

Easy, actually: just give a special attribute (maybe an S3 class) to the analysis_1 value (the data frame of hashes). That way, when we load analysis_1 as a dependency of a dynamic transformation (say, dynamic = map(analysis_1)) we will know to map over analysis_1_a and analysis_1_b instead.

@wlandau
Copy link
Member Author

wlandau commented Jun 3, 2019

We also need to think about how the new target names and splits are constructed. If vector_of_settings_1 is a list of large objects, we need to make up sensible names. Hashes? could be slow. We should also make an effort to handle group_by() data frames.

@wlandau
Copy link
Member Author

wlandau commented Jun 3, 2019

Come to think of it, we probably need a trace (drake_plan(trace = TRUE)) in those special data frames so that combine(.by) still works.

@brendanf
Copy link
Contributor

brendanf commented Jun 4, 2019

We also need to think about how the new target names and splits are constructed. If vector_of_settings_1 is a list of large objects, we need to make up sensible names. Hashes? could be slow. We should also make an effort to handle group_by() data frames.

The useful cases seem to be:

  • For a grouped data frame or indexed data table, use the grouping variables/index columns.
  • For a named vector/list, use the names, unique-ifying as needed.
  • For a non-named vector of character/integer/logical/factor values, use the values as names. Maybe also for numeric? complex? raw?
  • For other objects, default integer indices.

If users want the hash behavior, then they can use (and perhaps drake can provide?) a function which names a vector/list according to the hashes of its elements. e.g.

name_by_hash <- function(x, ...) {
  n <- vapply(x, digest::digest, "", ...)
  names(x) <- n
  x
}

Alternatively, always default to integer indices, and if the user wants something smarter, they can specify it with .id = (and allow tidy evaluation, so that something like .id = !!vapply(vector_of_settings, digest::digest, "") would work.

@dpmccabe
Copy link

Hi, I just wanted to describe another use case that would greatly benefit from dynamic branching. In my case, I have a very large data frame somewhat like this:

> d <- tibble(year = rep(2010:2015, each = 5), x = 1:30)
# A tibble: 30 x 2
    year     x
   <int> <int>
 1  2010     1
 2  2010     2
 3  2010     3
 4  2010     4
 5  2010     5
 6  2011     6
 7  2011     7
 8  2011     8
 9  2011     9
10  2011    10
# ... with 20 more rows

I'd like to be able to split d by year, and then create a new column f(x) := x + 100 in those batches (where f is actually an expensive function). Since I'm not often adding new rows to the data frame for very old years, I want to benefit from cached results for the 2010, 2011, ... splits and only recalculate for 2019, which is the only split that has changed/new data.

It would be a game-changer to be able to use drake like this, since more often than not I can think of a splitting scheme that would effectively partition the data into stale and up-to-date splits.

@wlandau
Copy link
Member Author

wlandau commented Jul 9, 2019

@dpmccabe, I see what you mean. I just encountered a very similar situation for a project at work.

I am realizing that #685 (comment) has serious problems:

  1. It would be extremely difficult to make the current DSL behave exactly the same for dynamic branching, and the inevitable subtle differences would add a lot of confusion.
  2. To work around (1), we would need to make an entire new DSL just for dynamic branching, and that adds a lot of additional complexity for users.
  3. Either way, Dynamic branching #685 (comment) could add a lot of complexity to the code base.

An alternative is @brendanf's suggestion of checkpointing (#685 (comment)). For drake, this essentially means turning a plan into a bunch of subplans. Take this plan as an example:

drake_plan(
  vector_of_settings_1 = f(1),
  vector_of_settings_2 = f(2),
  analysis_1 = target(
    command = g(1, y),
    transform = map(y = vector_of_settings_1)
  ),
  analysis_2 = target(
    command = g(2, y),
    transform = map(y = vector_of_settings_2)
  )
)

It is already natural for users to think about it as two separate plans:

drake_plan(
  vector_of_settings_1 = f(1),
  vector_of_settings_2 = f(2)
)

drake_plan(
  analysis_1 = target(
    command = g(1, y),
    transform = map(y = vector_of_settings_1)
  ),
  analysis_2 = target(
    command = g(2, y),
    transform = map(y = vector_of_settings_2)
  )
)

Maybe make() could do something similar: split a monolithic plan into chunks as appropriate, and then transform/make() those chunks in topological order.

@wlandau
Copy link
Member Author

wlandau commented Oct 22, 2019

A couple notes:

  1. For the non-HPC case, let's do the simple thing and spawn dynamic targets inside something like local_build(). We may even be able to leverage that technique for the HPC backends.
  2. Data recovery for dynamic sub-targets is going to require some work. Fortunately, a dynamic target is just a vector of hashes, and we can just match up those hashes to the sub-target names.

@wlandau
Copy link
Member Author

wlandau commented Oct 22, 2019

As I attempt an implementation, I am finding that because I am trying to avoid saving metadata lists, I have to reinvent a lot of internal machinery. Maybe it's better to save that metadata for dynamic sub-targets. The internal overhaul may not be as catastrophic, and we still gain efficiency because we do not need to actually check the metadata as often.

@wlandau
Copy link
Member Author

wlandau commented Oct 23, 2019

Yeah, we will need metadata for things like seeds and warnings. But we will still see performance gains in other ways.

@wlandau
Copy link
Member Author

wlandau commented Oct 26, 2019

It is coming time to work on split(), and I am rethinking it. It should look like split(..., .by) and probably take multiple variables for ... and .by.

@wlandau
Copy link
Member Author

wlandau commented Oct 26, 2019

On second thought, let's hold off on split(). map() might already have everything we need.

@wlandau
Copy link
Member Author

wlandau commented Oct 27, 2019

Thoughts on dynamic triggering:

  • For sub-targets, hold on to the metadata of the parent. Put it in the layout.
  • Use different S3 methods for handling triggers: static vs dynamic parent vs dynamic sub-target.
  • condition and change triggers should apply to sub-targets, not parents.

@wlandau
Copy link
Member Author

wlandau commented Oct 28, 2019

On second thought, let's leave the condition and change triggers as they are. Let's prevent people from using dynamic grouping variables inside condition and change. E.g. this should not be allowed:

drake_plan(
  x = seq_len(4),
  y = target(x, trigger = trigger(condition = x > 2), dynamic = map(x)),
  z = target(x, trigger = trigger(change = x), dynamic = map(x)),
)

@wlandau
Copy link
Member Author

wlandau commented Oct 30, 2019

To avoid duplicating code over various HPC backends, let's have backend_loop() use the priority queue. With that direction, I will likely work on HPC before triggers.

@wlandau
Copy link
Member Author

wlandau commented Oct 30, 2019

Registering dynamic sub-targets requires us to modify config objects, specifically the layout, graph, and priority queue. Because of the way the internals are currently structured, it would be best to modify these objects by reference. We already do this with the priority queue, and it is straightforward enough to use an environment instead of a list for the layout. But we may have to wrap the graph in an environment of its own. Added some action items.

@wlandau
Copy link
Member Author

wlandau commented Nov 2, 2019

Unfortunately, dynamic branching is currently slower than static branching when it comes to actually building targets.

library(drake)

plan_dynamic <- drake_plan(
  x = seq_len(1e4),
  y = target(x, dynamic = map(x))
)

plan_static <- drake_plan(
  z = target(w, transform = map(w = !!seq_len(1e4)))
)

cache_dynamic <- storr::storr_rds(tempfile())
cache_static <- storr::storr_rds(tempfile())

system.time(
  config_dynamic <- drake_config(
    plan_dynamic,
    cache = cache_dynamic,
    verbose = 0L
  )
)
#>    user  system elapsed 
#>   0.026   0.003   0.030

system.time(
  config_static <- drake_config(
    plan_static,
    cache = cache_static,
    verbose = 0L
  )
)
#>    user  system elapsed 
#>   1.904   0.004   1.910

system.time(
  suppressWarnings( # different issue
    make(config = config_dynamic)
  )
)
#>    user  system elapsed 
#>  78.014   3.630  81.767

system.time(
  suppressWarnings(
    make(config = config_static)
  )
)
#>    user  system elapsed 
#>  32.712   3.195  36.049

Created on 2019-11-02 by the reprex package (v0.3.0)

@wlandau
Copy link
Member Author

wlandau commented Nov 2, 2019

The good news is that make() is much faster to initialize. Because we have smaller plans, drake_config() runs super quickly. And for subsequent make()s, it is faster to check if everything is up to date.

library(drake)
library(profile)
library(jointprof)

plan_dynamic <- drake_plan(
  x = seq_len(1e4),
  y = target(x, dynamic = map(x))
)

plan_static <- drake_plan(
  z = target(w, transform = map(w = !!seq_len(1e4)))
)

cache_dynamic <- storr::storr_rds(tempfile())
cache_static <- storr::storr_rds(tempfile())

system.time(
  config_dynamic <- drake_config(
    plan_dynamic,
    cache = cache_dynamic,
    verbose = 0L
  )
)
#>    user  system elapsed 
#>   0.027   0.003   0.032

system.time(
  config_static <- drake_config(
    plan_static,
    cache = cache_static,
    verbose = 0L
  )
)
#>    user  system elapsed 
#>   3.525   0.004   3.530

Rprof(filename = "dynamic.rprof")
suppressWarnings(
  system.time(make(config = config_dynamic), gcFirst = FALSE)
)
#>    user  system elapsed 
#>  99.096   3.656 102.928
Rprof(NULL)
data <- read_rprof("dynamic.rprof")
write_pprof(data, "dynamic.pprof")

Rprof(filename = "static.rprof")
suppressWarnings(
  system.time(make(config = config_static), gcFirst = FALSE)
)
#>    user  system elapsed 
#>  52.112   3.708  55.916
Rprof(NULL)
data <- read_rprof("static.rprof")
write_pprof(data, "static.pprof")

suppressWarnings(
  system.time(make(config = config_dynamic), gcFirst = FALSE)
)
#>    user  system elapsed 
#>   3.239   0.164   3.418

suppressWarnings(
  system.time(make(config = config_static), gcFirst = FALSE)
)
#>    user  system elapsed 
#>  13.847   0.472  14.347

file.copy("dynamic.pprof", "~/Downloads")
#> [1] TRUE
file.copy("static.pprof", "~/Downloads")
#> [1] TRUE

Created on 2019-11-02 by the reprex package (v0.3.0)

@wlandau
Copy link
Member Author

wlandau commented Nov 2, 2019

I used those pprof files at the bottom to generate the flame graphs below. The one on the left is from static branching, and the one on the right is from dynamic branching.

Screenshot_20191102_194727

It looks like the main hangup is loading sub-target dependencies and registering sub-targets. Not too surprising. Speeding this up is going to be another slow-going long-term project. If you have more examples that demonstrate slowness, please post them. It took a long time to get static branching as fast as it is now, and I expect the same for dynamic branching.

@wlandau
Copy link
Member Author

wlandau commented Nov 3, 2019

Corrections to #685 (comment)

The implementation in #1042 is different from #685 (comment). In particular, the flowchart in https://user-images.githubusercontent.com/1580860/66722470-27ede180-eddc-11e9-97ea-930c5a93d287.png.

Procedure for sub-targets

The procedure for sub-targets is actually simpler than I had originally planned.

  1. Check the static triggers of the dynamic target.
  2. If any static trigger fires, build all the sub-targets.
  3. If the static triggers do not fire, check all the sub-targets individually. It is not enough to check the dynamic dependencies as a whole because some of the sub-targets could have been deleted since the last make().

Procedure for dynamic targets as a whole

Each dynamic target has its own value alongside the values of the sub-targets. We recompute this value if

  1. Any sub-target changed, or
  2. Any dynamic dependency changed as a whole.

Why (2)? Because in some situations, we already have all the sub-targets, but we use fewer of them.

library(drake)
plan <- drake_plan(
  x = seq_len(3),
  y = target(x, dynamic = map(x))
)
make(plan)
#> target x
#> subtarget y_0b3474bd
#> subtarget y_b2a5c9b8
#> subtarget y_71f311ad

# readd() and loadd() understand dynamic targets.
readd(y)
#> [[1]]
#> [1] 1
#> 
#> [[2]]
#> [1] 2
#> 
#> [[3]]
#> [1] 3

# But a dynamic target is really just a vector of hashes.
cache <- drake_cache()
cache$get("y")
#> [1] "3908fe5069df3c28" "16b3cb68bd4872ed" "1a3b3c0d06147d80"
#> attr(,"class")
#> [1] "drake_dynamic"

# What if we shorten y?
plan <- drake_plan(
  x = seq_len(2),
  y = target(x, dynamic = map(x))
)

# y needs to change, but we leave the sub-targets alone.
make(plan)
#> target x

# readd() and loadd() understand dynamic targets.
readd(y)
#> [[1]]
#> [1] 1
#> 
#> [[2]]
#> [1] 2

# But a dynamic target is really just a vector of hashes.
cache$get("y")
#> [1] "3908fe5069df3c28" "16b3cb68bd4872ed"
#> attr(,"class")
#> [1] "drake_dynamic"

Created on 2019-11-02 by the reprex package (v0.3.0)

Why the cryptic sub-target names?

The sub-target names are ugly (e.g. y_71f311ad1) but incredibly useful.

  1. The suffixes of sub-targets are hashes of dynamic sub-dependencies. In other words, the act of computing the name is the same as checking if it is already up to date! All we need to do is check that the name exists in the cache! (After static triggers, of course.)
  2. The prefixes of static DSL get long and cumbersome too easily. A hash solves this problem because it has a fixed length by design, and it remains valid for all kinds of dynamic dependencies.
  3. A natural alternative is to index the sub-targets numerically, e.g. y_1, y_2, etc. (In fact, that is what I originally proposed in Dynamic branching #685 (comment).) But if we did that, we would invalidate y_2 every time we insert an element in the middle of x. With hashes, we do not have this problem: the sub-targets of y can be in any order and still remain valid.
library(drake)
plan <- drake_plan(
  x = c("a", "b"),
  y = target(x, dynamic = map(x))
)

make(plan)
#> In drake, consider r_make() instead of make(). r_make() runs make() in a fresh R session for enhanced robustness and reproducibility.
#> target x
#> subtarget y_89ca58a1
#> subtarget y_38e75e51

plan <- drake_plan(
  x = c("a", "inserted_element", "b"),
  y = target(x, dynamic = map(x))
)

# Only one sub-target needs to build.
make(plan)
#> target x
#> subtarget y_06d53fef

# Permute x.
plan <- drake_plan(
  x = c("inserted_element", "b", "a"),
  y = target(x, dynamic = map(x))
)

# All sub-targets are still up to date!
make(plan)
#> target x

Created on 2019-11-02 by the reprex package (v0.3.0)

@wlandau
Copy link
Member Author

wlandau commented Nov 3, 2019

Implemented in #1042.

@wlandau wlandau closed this as completed Nov 3, 2019
@wlandau
Copy link
Member Author

wlandau commented Nov 3, 2019

Also noteworthy: mapping over rows: #1042 (comment)

@wlandau
Copy link
Member Author

wlandau commented Nov 4, 2019

New chapter in the manual: https://ropenscilabs.github.io/drake-manual/dynamic.html

@wlandau
Copy link
Member Author

wlandau commented Nov 4, 2019

One source of overhead I overlooked: computing the hashes of sub-values that go into the names of sub-targets. Unavoidable, but not terrible.

@wlandau
Copy link
Member Author

wlandau commented Nov 4, 2019

Dynamic parent targets are already vectors of hashes, so we can avoid this overhead if the dynamic dependency is itself dynamic: 5a07f67. Otherwise, we need to compute the hashes of all the sub-values.

@wlandau
Copy link
Member Author

wlandau commented Dec 6, 2019

Update: dynamic branching just got a huge speed boost in #1089 thanks to help from @billdenney and @eddelbuettel. With improvements both in development drake and development digest, dynamic branching is now about 33% faster than static branching overall. Benchmarking workflow: https://github.com/wlandau/drake-examples/blob/master/overhead/dynamic.R vs https://github.com/wlandau/drake-examples/blob/master/overhead/static.R.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants