Skip to content

Commit

Permalink
Add support for retrying tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
richfitz committed Jan 4, 2024
1 parent bce2ed8 commit bfff0d9
Show file tree
Hide file tree
Showing 13 changed files with 352 additions and 24 deletions.
2 changes: 2 additions & 0 deletions R/root.R
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ hipercow_root <- function(root = NULL) {
root = path,
tasks = file.path(path, "hipercow", "tasks"),
environments = file.path(path, "hipercow", "environments"),
retry = file.path(path, "hipercow", "retry"),
config = file.path(path, "hipercow", "config"))
if (file.exists(ret$path$config)) {
## TODO: for now we assume that config is saved/loaded by rds;
Expand All @@ -70,6 +71,7 @@ hipercow_root <- function(root = NULL) {
ret$config <- set_names(lapply(files, readRDS),
sub("\\.rds$", "", basename(files)))
}
ret$retry_map <- read_retry_map(ret$path$retry)
ret$cache$task_driver <- character()
ret$cache$task_status_terminal <- character()
class(ret) <- "hipercow_root"
Expand Down
105 changes: 105 additions & 0 deletions R/task-retry.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
##' Retry one or more tasks. This creates a new task that copies the
##' work of the old one. Most of the time this is transparent. We'll
##' document this in the "advanced" vignette once it's written.
##'
##' This ends up being a little more complicated than ideal in order
##' to keep things relatively fast, while keeping our usual guarantees
##' about race conditions etc. Basically; retrying is the only way a
##' task can move out of a terminal state but it still does not modify
##' the existing task. Instead, we keep a separate register of
##' whether a task has been retried or not. Each time we retry we
##' write into this register. When you query about the status etc of
##' a task you can then add a `follow` argument to control whether or
##' not we check the register. We assume that you never call this in
##' parallel; if you do then retries may be lost. You can run
##' `task_retry(NULL)` to refresh the cached copy of the retry map if
##' you need to.
##'
##' @title Retry a task
##'
##' @param id The identifier or identifiers of tasks to retry.
##'
##' @inheritParams task_create_explicit
##'
##' @return export
##' @return New identifiers for the retried tasks
task_retry <- function(id, submit = NULL, root = NULL) {
root <- hipercow_root(root)
id_real <- follow_retry_map(id, root)
status <- task_status(id_real, follow = FALSE, root)
err <- !(status %in% c("success", "failure", "cancelled"))
if (any(err)) {
n <- sum(err)
i <- utils::head(which(err), 5)
details <- set_names(sprintf("%s: %s", id_real[i], status[i]), "x")
if (length(i) < n) {
details <- c(details, c(i = "...and {n - length(i)} other task{?s}"))
}
cli::cli_abort(c("{n} {?task does/tasks do} not have terminal status",
details))
}
id_base <- base_retry_map(id_real, root)
id_new <- vcapply(seq_along(id), function(i) {
task_create(root, "retry", NULL, NULL,
parent = id_real[[i]], base = id_base[[i]])
})

update_retry_map(id_new, id_real, id_base, root)

task_submit_maybe(id_new, submit, root, rlang::current_env())

id_new
}


read_retry_map <- function(path) {
if (file.exists(path)) {
ret <- read.csv(path, header = FALSE)
names(ret) <- c("id", "parent", "base")
ret
} else {
data.frame(id = character(), parent = character(), base = character())
}
}


update_retry_map <- function(id, parent, base, root) {
if (length(id) > 0) {
append_lines(sprintf("%s,%s,%s", id, parent, base),
root$path$retry)
}
root$retry_map <- read_retry_map(root$path$retry)
}


## Lots of ways to optimise this if it becomes a timesink. Probably
## worth special case if length(id) == 1, possibly worth a lookup we
## invalidate with each map read?
follow_retry_map <- function(id, root) {
map <- root$retry_map
if (nrow(map) == 0 || length(id) == 0) {
## If noone has done a retry, early exit here
return(id)
}

ret <- id
while (!all(is.na(id))) {
id <- map$id[match(id, map$parent)]
i <- !is.na(id)
ret[i] <- id[i]
}
ret
}


base_retry_map <- function(id, root) {
map <- root$retry_map
if (nrow(map) == 0 || length(id) == 0) {
return(id)
}
i <- match(id, map$id)
ret <- map$base[i]
j <- is.na(i)
ret[j] <- id[j]
ret
}
50 changes: 34 additions & 16 deletions R/task.R
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ task_eval <- function(id, envir = .GlobalEnv, verbose = FALSE, root = NULL) {
cli::cli_alert_info("starting at: {t0}")
}
path <- file.path(root$path$tasks, id)
status <- task_status(id, root = root)
status <- task_status(id, follow = FALSE, root = root)
if (status %in% c("running", "success", "failure", "cancelled")) {
cli::cli_abort("Can't start task '{id}', which has status '{status}'")
}
Expand All @@ -192,6 +192,13 @@ task_eval <- function(id, envir = .GlobalEnv, verbose = FALSE, root = NULL) {
cli::cli_alert_info("task type: {data$type}")
}
result <- rlang::try_fetch({
if (data$type == "retry") {
data <- readRDS(file.path(root$path$tasks, data$base, EXPR))
if (verbose) {
cli::cli_alert_info("pointing at {data$id} ({data$type})")
}
data$id <- id
}
environment_apply(data$environment, envir, root, top)
check_globals(data$variables$globals, envir, top)
withr::local_dir(file.path(root$path$root, data$path))
Expand Down Expand Up @@ -268,13 +275,16 @@ task_eval <- function(id, envir = .GlobalEnv, verbose = FALSE, root = NULL) {
##'
##' @param id The task identifier
##'
##' @param follow Logical, indicating if we should follow any retried
##' tasks.
##'
##' @inheritParams task_eval
##'
##' @return A string with the task status. Tasks that do not exist
##' will have a status of `NA`.
##'
##' @export
task_status <- function(id, root = NULL) {
task_status <- function(id, follow = TRUE, root = NULL) {
## This function is fairly complicated because we try to do as
## little work as possible; querying the network file system is
## fairly expensive and we assume that hitting the underlying driver
Expand All @@ -293,6 +303,10 @@ task_status <- function(id, root = NULL) {

status <- rep(NA_character_, length(id))

if (follow) {
id <- follow_retry_map(id, root)
}

## Fastest possible exit; we know that this task has a terminal
## status so we return it from the cache
i <- match(id, names(root$cache$task_status_terminal))
Expand Down Expand Up @@ -385,12 +399,13 @@ task_get_driver <- function(id, root = NULL) {
##'
##' @return The value of the queued expression
##' @export
task_result <- function(id, root = NULL) {
task_result <- function(id, follow = TRUE, root = NULL) {
root <- hipercow_root(root)
id <- follow_retry_map(id, root)
path <- file.path(root$path$tasks, id)
path_result <- file.path(path, RESULT)
if (!file.exists(path_result)) {
status <- task_status(id, root)
status <- task_status(id, follow = FALSE, root = root)
if (allow_load_drivers()) {
task_driver <- task_get_driver(id, root = root)
} else {
Expand Down Expand Up @@ -436,9 +451,9 @@ task_result <- function(id, root = NULL) {
##'
##' @rdname task_log
##' @export
task_log_show <- function(id, outer = FALSE, root = NULL) {
task_log_show <- function(id, follow = TRUE, outer = FALSE, root = NULL) {
root <- hipercow_root(root)
result <- task_log_fetch(id, outer, root)
result <- task_log_fetch(id, follow, outer, root)
if (is.null(result)) {
cli::cli_alert_danger("No logs for task '{id}' (yet?)")
} else if (length(result) == 0) {
Expand All @@ -452,9 +467,9 @@ task_log_show <- function(id, outer = FALSE, root = NULL) {

##' @rdname task_log
##' @export
task_log_value <- function(id, outer = FALSE, root = NULL) {
task_log_value <- function(id, follow = TRUE, outer = FALSE, root = NULL) {
root <- hipercow_root(root)
task_log_fetch(id, outer, root)
task_log_fetch(id, follow, outer, root)
}


Expand All @@ -465,9 +480,10 @@ task_log_value <- function(id, outer = FALSE, root = NULL) {
##' @inheritParams logwatch::logwatch
##'
##' @export
task_log_watch <- function(id, poll = 1, skip = 0, timeout = Inf,
task_log_watch <- function(id, follow = TRUE, poll = 1, skip = 0, timeout = Inf,
progress = NULL, root = NULL) {
root <- hipercow_root(root)
id <- follow_retry_map(id, root)

## As in task_log_fetch; no need to do this each time around:
driver <- task_get_driver(id, root = root)
Expand All @@ -481,7 +497,7 @@ task_log_watch <- function(id, poll = 1, skip = 0, timeout = Inf,
ensure_package("logwatch")
res <- logwatch::logwatch(
"task",
get_status = function() task_status(id, root = root),
get_status = function() task_status(id, follow = FALSE, root = root),
get_log = function() dat$driver$log(id, FALSE, dat$config, root$path$root),
status_waiting = "submitted",
status_running = "running",
Expand All @@ -494,7 +510,8 @@ task_log_watch <- function(id, poll = 1, skip = 0, timeout = Inf,
}


task_log_fetch <- function(id, outer, root) {
task_log_fetch <- function(id, follow, outer, root) {
id <- follow_retry_map(id, root)
driver <- task_get_driver(id, root = root)
dat <- hipercow_driver_prepare(driver, root, environment())
dat$driver$log(id, outer, dat$config, root$path$root)
Expand Down Expand Up @@ -543,10 +560,11 @@ final_status_to_logical <- function(status) {
##' `FALSE` otherwise.
##'
##' @export
task_wait <- function(id, timeout = Inf, poll = 1, progress = NULL,
root = NULL) {
task_wait <- function(id, follow = TRUE, timeout = Inf, poll = 1,
progress = NULL, root = NULL) {
root <- hipercow_root(root)
status <- task_status(id, root = root)
id <- follow_retry_map(id, root)
status <- task_status(id, follow = FALSE, root = root)

if (status == "created") {
cli::cli_abort(
Expand All @@ -559,7 +577,7 @@ task_wait <- function(id, timeout = Inf, poll = 1, progress = NULL,
ensure_package("logwatch")
res <- logwatch::logwatch(
sprintf("task '%s'", id),
function() task_status(id, root = root),
function() task_status(id, follow = FALSE, root = root),
function() NULL,
show_log = FALSE,
show_spinner = show_progress(progress, call),
Expand Down Expand Up @@ -593,7 +611,7 @@ task_wait <- function(id, timeout = Inf, poll = 1, progress = NULL,
task_cancel <- function(id, root = NULL) {
root <- hipercow_root(root)
cancelled <- rep(FALSE, length(id))
status <- task_status(id, root)
status <- task_status(id, follow = FALSE, root = root)
eligible <- status %in% c("submitted", "running")
if (any(eligible)) {
task_driver <- vcapply(id, task_get_driver, root = root)
Expand Down
13 changes: 12 additions & 1 deletion R/util.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@


set_names <- function(x, nms) {
if (length(nms) == 1 && length(nms) != length(x)) {
nms <- rep(nms, length(x))
}
names(x) <- nms
x
}
Expand Down Expand Up @@ -92,7 +95,8 @@ package_version_if_installed <- function(name) {
tryCatch(utils::packageVersion(name),
error = function(e) NULL)
}



eval_with_hr <- function(expr, title, verbose) {
if (verbose) {
cli::cli_rule(right = "{title} {cli::symbol$arrow_down}")
Expand Down Expand Up @@ -150,3 +154,10 @@ show_collected_warnings <- function(warnings) {
cli::cli_alert_info("Only last {nwarnings} distinct warnings shown")
}
}


append_lines <- function(text, path) {
con <- file(path, "a")
on.exit(close(con))
writeLines(text, con)
}
8 changes: 6 additions & 2 deletions man/task_log.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion man/task_result.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 45 additions & 0 deletions man/task_retry.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit bfff0d9

Please sign in to comment.