Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aftermath of staged parallelism refactoring #377

Merged
merged 22 commits into from
May 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ export(migrate_drake_project)
export(missed)
export(mk)
export(new_cache)
export(next_stage)
export(num_range)
export(one_of)
export(outdated)
Expand Down
7 changes: 6 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
# Version 5.1.4

- Remove staged parallelism from all the `lapply()`-like backends (`parallelism =` `"mclapply"`, `"parLapply"`, and `"future_lapply"`). Now, `drake` uses persistent workers and a master process. In the case of `"future_lapply"` parallelism, the master process is a separate `callr::r_bg()` process.
- Remove the appearance of staged parallelism from single-job `make()`'s. (Previously, there were "check" messages and a call to `staged_parallelism()`.)
- Remove the appearance of staged parallelism from single-job `make()`'s.
(Previously, there were "check" messages and a call to `staged_parallelism()`.)
- Remove all remnants of staged parallelism internals.
- Improve `predict_runtime()`. It is a more sensible way to go about predicting runtimes with multiple jobs. Likely to be more accurate.
- Calls to `make()` no longer leave targets in the user's environment.
- Attempt to fix a Solaris CRAN check error. The test at https://github.com/ropensci/drake/blob/b4dbddb840d2549621b76bcaa46c344b0fd2eccc/tests/testthat/test-edge-cases.R#L3 was previously failing on CRAN's Solaris machine (R 3.5.0). In the test, one of the threads deliberately quits in error, and the R/Solaris installation did not handle this properly. The test should work now because it no longer uses any parallelism.
- Deprecate the `imports_only` argument to `make()` and `drake_config()` in favor of `skip_targets`.
- Deprecate `migrate_drake_project()`.
- Deprecate `max_useful_jobs()`.
- For non-distributed parallel backends, stop waiting for all the imports to finish before the targets begin.
- Add an `upstream_only` argument to `failed()` so users can list failed targets that do not have any failed dependencies. Naturally accompanies `make(keep_going = TRUE)`.
- Add an RStudio R Markdown template compatible with https://krlmlr.github.io/drake-pitch/.
Expand Down
6 changes: 1 addition & 5 deletions R/Makefile.R
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,7 @@ mk <- function(
){
config <- recover_drake_config(cache_path)
old_hash <- self_hash(target = target, config = config)
build_distributed(
target = target,
meta_list = NULL,
cache_path = cache_path
)
build_distributed(target = target, cache_path = cache_path)
new_hash <- self_hash(target = target, config = config)
if (!identical(old_hash, new_hash)){
file <- time_stamp_file(target = target, config = config)
Expand Down
1 change: 1 addition & 0 deletions R/build_times.R
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ build_times <- function(
jobs = 1,
type = c("build", "command")
){
eval(parse(text = "require(methods, quietly = TRUE)")) # needed for lubridate
if (is.null(cache)){
return(empty_times())
}
Expand Down
141 changes: 141 additions & 0 deletions R/deprecate.R
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,69 @@ load_basic_example <- function(
)
}

#' @title Deprecated function
#' @description Do not use this function. `Drake`'s parallel algorithm
#' has changed since version 5.1.2, so `max_useful_jobs()`
#' will give you the wrong idea of how many jobs to use. Instead,
#' use the [predict_runtime()] function with a sensible value
#' for `known_times` and `default_time`
#' to cover any targets not built so far.
#' @details Deprecated on May 4, 2018.
#' @export
#' @keywords internal
#' @return A numeric scalar, the maximum number of useful jobs for
#' \code{\link{make}(..., jobs = ...)}.
#' @seealso [predict_runtime()]
#' @param config internal configuration list of \code{\link{make}(...)},
#' produced also with [drake_config()].
#' @param imports Set the `imports` argument to change your
#' assumptions about how fast objects/files are imported.
#' @param from_scratch logical, whether to assume
#' the next [make()] will run from scratch
#' so that all targets are attempted.
#' @examples
#' # Do not use this function. Use predict_runtime() instead.
#' # Pay special attention to the force_times and default_time
#' # arguments.
max_useful_jobs <- function(
config = drake::read_drake_config(),
imports = c("files", "all", "none"),
from_scratch = FALSE
){
.Deprecated(
"predict_runtime",
package = "drake",
msg = c(
"Do not use max_useful_jobs(). ",
"Drake's parallel scheduling algorithm has changed, ",
"so max_useful_jobs() will give you the wrong idea about ",
"how many jobs to assign to `make()`. For a better estimate, ",
"play around with predict_runtime() with sensible values, ",
"for force_times and default_time."
)
)
# nocov start
imports <- match.arg(imports)
nodes <- dataframes_graph(config, from_scratch = from_scratch)$nodes
if (imports == "none"){
nodes <- nodes[nodes$status != "imported", ]
} else if (imports == "files"){
nodes <- nodes[nodes$status != "imported" | nodes$type == "file", ]
}
if (!from_scratch){
nodes <- nodes[nodes$status != "outdated", ]
}
if (!nrow(nodes)){
return(0)
}
level <- NULL
n_per_level <- group_by(nodes, level) %>%
mutate(nrow = n())
max(n_per_level$nrow)
# nocov end
}


#' @title Deprecated function `plan`
#' @description Use [drake_plan()] instead.
#' @details Deprecated on 2017-10.
Expand Down Expand Up @@ -805,6 +868,53 @@ plot_graph <- function(
)
}

#' @title Defunct
#' @description This function is now moot because
#' staged parallelism in `drake` was replaced
#' by a much better scheduling algorithm.
#' @export
#' @keywords internal
#' @details Made defunct on May 4, 2018
#' @examples
#' # Do not use this function.
#' @return A data frame of times of the worst-case scenario
#' rate-limiting targets in each parallelizable stage.
#' @param config option internal runtime parameter list of
#' \code{\link{make}(...)},
#' produced by both [make()] and
#' [drake_config()].
#' @param targets Character vector, names of targets.
#' Find the rate-limiting times for building these targets
#' plus dependencies.
#' Defaults to all targets.
#' @param from_scratch logical, whether to assume
#' next hypothetical call to [make()]
#' is a build from scratch (after [clean()]).
#' @param targets_only logical, whether to factor in just the
#' targets or use times from everything, including the imports.
#' @param future_jobs hypothetical number of jobs
#' assumed for the predicted runtime.
#' assuming this number of jobs.
#' @param digits number of digits for rounding the times.
rate_limiting_times <- function(
config = drake::read_drake_config(),
targets = NULL,
from_scratch = FALSE,
targets_only = FALSE,
future_jobs = 1,
digits = 3
){
.Defunct(
package = "drake",
msg = c(
"The rate_limiting_times() function is moot ",
"because drake has replaced staged parallelism ",
"with a much better algorithm. ",
"Do not use rate_limiting_times()."
)
)
}

#' @title Deprecated function `read_config`
#' @description Use [read_drake_config()] instead.
#' @details Deprecated on 2017-11-12.
Expand Down Expand Up @@ -1071,6 +1181,37 @@ session <- function(
)
}

#' @title Defunct function
#' @description Staged parallelism is removed from drake,
#' so this function is moot.
#' Drake uses a much better parallel algorithm now.
#' @details Made defunct on May 4, 2018.
#' @export
#' @keywords internal
#' @return A data frame of information spelling out how
#' targets are divided into parallelizable stages
#' (according to the `stage` column).
#' @param config An configuration list output by
#' [make()] or [drake_config()].
#' @param from_scratch logical, whether to assume
#' that the next [make()] will run from scratch
#' so that all targets are attempted.
#' @examples
#' # Do not use this function.
parallel_stages <- function(
config = drake::read_drake_config(),
from_scratch = FALSE
){
.Defunct(
package = "drake",
msg = c(
"Staged parallelism is removed from drake, ",
"so the parallel_stages() function is moot. ",
"Drake uses a much better parallel algorithm now."
)
)
}

#' @title Deprecated function `summaries`
#' @description Use [summaries()] instead
#' @details Deprecated on 2017-11-12.
Expand Down
20 changes: 2 additions & 18 deletions R/distributed.R
Original file line number Diff line number Diff line change
Expand Up @@ -41,29 +41,13 @@ finish_distributed <- function(config){
unlink(file, force = TRUE)
}

build_distributed <- function(target, meta_list, cache_path){
build_distributed <- function(target, cache_path){
config <- recover_drake_config(cache_path = cache_path)
config$hook({
eval(parse(text = "base::require(drake, quietly = TRUE)"))
do_prework(config = config, verbose_packages = FALSE)
prune_envir(targets = target, config = config)
})
if (is.null(meta_list)){
meta_list <- meta_list(targets = target, config = config)
do_build <- should_build_target(
target = target,
meta = meta_list[[target]],
config = config
)
if (!do_build){
return(invisible())
}
}
build_and_store(
target = target,
meta = meta_list[[target]],
config = config
)
build_check_store(target = target, config = config)
invisible()
}

Expand Down
27 changes: 0 additions & 27 deletions R/envir.R
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,6 @@ assign_to_envir <- function(target, value, config){
invisible()
}

# Should go away when staged parallelism goes away.
assign_to_envir_batch <- function(targets, values, config){
if (config$lazy_load != "eager"){
return() # nocov
}
lightly_parallelize(
X = seq_along(along.with = targets),
FUN = assign_to_envir_single,
jobs = config$jobs,
targets = targets,
values = values,
config = config
)
invisible()
}

# Same.
assign_to_envir_single <- function(index, targets, values, config){
target <- targets[index]
value <- values[[index]]
if (is_file(target) | !(target %in% config$plan$target)){
return()
}
assign(x = target, value = value, envir = config$envir) # nocov
invisible() # nocov
}

prune_envir <- function(targets, config, downstream = NULL){
if (is.null(downstream)){
downstream <- downstream_nodes(
Expand Down
1 change: 0 additions & 1 deletion R/future_lapply.R
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ fl_worker <- function(worker, cache_path){
expr = {
config <- recover_drake_config(cache_path = cache_path)
on.exit(mc_set_done(worker = worker, config = config))
config$schedule <- targets_graph(config)
do_prework(config = config, verbose_packages = FALSE)
mc_worker(worker = worker, config = config)
},
Expand Down
32 changes: 10 additions & 22 deletions R/graph.R
Original file line number Diff line number Diff line change
Expand Up @@ -222,32 +222,20 @@ filter_upstream <- function(targets, graph){
leaf_nodes(graph)
}

# This function will go away when we get rid of staged parallelism.
# No point in testing it.
# nocov start
exclude_imports_if <- function(config){
if (!length(config$skip_imports)){
config$skip_imports <- FALSE
}
if (!config$skip_imports){
return(config)
}
delete_these <- setdiff(
V(config$schedule)$name,
config$plan$target
)
config$schedule <- delete_vertices(
graph = config$schedule,
v = delete_these
)
config
}
# nocov end

subset_graph <- function(graph, subset){
if (!length(subset)){
return(graph)
}
subset <- intersect(subset, V(graph)$name)
igraph::induced_subgraph(graph = graph, vids = subset)
}

imports_graph <- function(config){
delete_these <- intersect(config$plan$target, V(config$graph)$name)
delete_vertices(config$graph, v = delete_these)
}

targets_graph <- function(config){
delete_these <- setdiff(V(config$graph)$name, config$plan$target)
delete_vertices(config$graph, v = delete_these)
}
43 changes: 18 additions & 25 deletions R/make.R
Original file line number Diff line number Diff line change
Expand Up @@ -210,25 +210,11 @@ global_imports <- function(config){
#' @export
#' @inheritParams make_with_config
make_session <- function(config){
if (config$skip_imports && config$skip_targets){
return(invisible(config))
}
check_drake_config(config = config)
store_drake_config(config = config)
initialize_session(config = config)
do_prework(config = config, verbose_packages = config$verbose)
if (config$skip_imports){
make_targets(config = config)
} else if (config$skip_targets){
make_imports(config = config)
} else if (
config$parallelism %in% parallelism_choices(distributed_only = TRUE)
){
make_imports(config = config)
make_targets(config = config)
} else {
make_imports_targets(config = config)
}
make_with_schedules(config = config)
drake_cache_log_file(
file = config$cache_log_file,
cache = config$cache,
Expand All @@ -241,6 +227,23 @@ make_session <- function(config){
return(invisible(config))
}

make_with_schedules <- function(config){
if (config$skip_imports && config$skip_targets){
invisible(config)
} else if (config$skip_targets){
make_imports(config = config)
} else if (config$skip_imports){
make_targets(config = config)
} else if (
config$parallelism %in% parallelism_choices(distributed_only = TRUE)
){
make_imports(config = config)
make_targets(config = config)
} else {
make_imports_targets(config = config)
}
}

#' @title Just make the imports.
#' @description [make()] is the central, most important function
#' of the drake package. [make()] runs all the steps of your
Expand Down Expand Up @@ -285,11 +288,6 @@ make_imports <- function(config = drake::read_drake_config()){
invisible(config)
}

imports_graph <- function(config){
delete_these <- intersect(config$plan$target, V(config$graph)$name)
delete_vertices(config$graph, v = delete_these)
}

#' @title Just build the targets.
#' @description [make()] is the central, most important function
#' of the drake package. [make()] runs all the steps of your
Expand Down Expand Up @@ -334,11 +332,6 @@ make_targets <- function(config = drake::read_drake_config()){
invisible(config)
}

targets_graph <- function(config){
delete_these <- setdiff(V(config$graph)$name, config$plan$target)
delete_vertices(config$graph, v = delete_these)
}

make_imports_targets <- function(config){
config$schedule <- config$graph
config$jobs <- max(config$jobs)
Expand Down
Loading