Skip to content
This repository was archived by the owner on Oct 12, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
5f58eb5
add documentation and sample for long running job
zfengms Oct 4, 2017
91faa5f
update sample file name
zfengms Oct 4, 2017
ea0d3e9
update long running job doc and test
zfengms Oct 5, 2017
a4f4b78
update metadata code
zfengms Oct 13, 2017
560293f
Merge branch 'master' into feature/longrunjob
zfengms Oct 13, 2017
2106f23
add errorHandling to job metadata
zfengms Oct 13, 2017
6c2fd92
add deleteJob to delete both job defintion and job result
zfengms Oct 24, 2017
9b8f70a
merge from master
zfengms Oct 25, 2017
f7c5b27
styling fix
zfengms Oct 25, 2017
58de322
save foreach wait setting to metadata
zfengms Oct 25, 2017
b4248cf
implement retry logic in getjobresult
zfengms Oct 25, 2017
08e8943
add terminateJob
zfengms Oct 25, 2017
11f6484
handle various corner cases
zfengms Oct 31, 2017
7e8154e
merge from master to feature/longrunjob
zfengms Nov 17, 2017
e40d99f
regenerate document
zfengms Nov 17, 2017
a64b9aa
add job state in getJob
zfengms Nov 17, 2017
24d7ccb
merge from master
zfengms Nov 17, 2017
2afb05d
do not fail getJobResult if getMetadata failed for backward compatibi…
zfengms Nov 21, 2017
1ce98e2
add deleteJob option to foreach, by default it is true for wait=TRUE job
zfengms Nov 21, 2017
4362185
styling fix
zfengms Nov 22, 2017
b33df85
Merge branch 'master' into feature/longrunjob
zfengms Nov 22, 2017
29bb5a5
update version and changelog
zfengms Nov 22, 2017
c7b0453
address review feedback
zfengms Dec 5, 2017
21e529c
Merge branch 'master' into feature/longrunjob
zfengms Dec 5, 2017
a9eec76
add setJobAutoDelete function
zfengms Dec 6, 2017
25cd325
rename jobAutoDelete to autoDeleteJob to workaround R bugs and update…
zfengms Dec 6, 2017
9429b42
update faq
zfengms Dec 6, 2017
4c757e3
fix styling issues
zfengms Dec 6, 2017
0b0423c
more styling fix
zfengms Dec 6, 2017
b58c647
roll back manual update to DESCRIPTION
zfengms Dec 6, 2017
3457c9b
add namespace to api call
zfengms Dec 6, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
# Change Log
## [0.6.1] 2017-11-13

## [0.6.1] 2017-12-05
### Added
- Support for users to use programmatically generated credentials and cluster config
- Support for users to delete job and terminate job
### Changed
- [BREAKING CHANGE] when wait = TRUE, both job and job results are deleted at the end of the run, set jobAutoComplete to FALSE to keep them
- Add retry to get job result
- Add errorHandling and wait option to job metadata
- Save job metadata to job result storage blob

## [0.6.0] 2017-11-03
### Added
Expand Down
3 changes: 3 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Generated by roxygen2: do not edit by hand

export(createOutputFile)
export(deleteJob)
export(deleteStorageContainer)
export(deleteStorageFile)
export(generateClusterConfig)
Expand All @@ -16,11 +17,13 @@ export(listStorageFiles)
export(makeCluster)
export(registerDoAzureParallel)
export(resizeCluster)
export(setAutoDeleteJob)
export(setChunkSize)
export(setCredentials)
export(setHttpTraffic)
export(setReduce)
export(setVerbose)
export(stopCluster)
export(terminateJob)
export(waitForNodesToComplete)
export(waitForTasksToComplete)
86 changes: 69 additions & 17 deletions R/doAzureParallel.R
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,20 @@ setChunkSize <- function(value = 1) {
assign("chunkSize", value, envir = .doAzureBatchGlobals)
}

#' Specify whether to delete job and its result after asychronous job is completed.
#'
#' @param value boolean of TRUE or FALSE
#'
#' @examples
#' setAutoDeleteJob(FALSE)
#' @export
setAutoDeleteJob <- function(value = TRUE) {
if (!is.logical(value))
stop("setAutoDeleteJob requires a boolean argument")

assign("autoDeleteJob", value, envir = .doAzureBatchGlobals)
}

#' Apply reduce function on a group of iterations of the foreach loop together per task.
#'
#' @param fun The number of iterations to group
Expand Down Expand Up @@ -232,6 +246,18 @@ setHttpTraffic <- function(value = FALSE) {
wait <- obj$options$azure$wait
}

# by default, delete both job and job result after synchronous job is completed
autoDeleteJob <- TRUE

if (exists("autoDeleteJob", envir = .doAzureBatchGlobals)) {
autoDeleteJob <- get("autoDeleteJob", envir = .doAzureBatchGlobals)
}

if (!is.null(obj$options$azure$autoDeleteJob) &&
is.logical(obj$options$azure$autoDeleteJob)) {
autoDeleteJob <- obj$options$azure$autoDeleteJob
}

inputs <- FALSE
if (!is.null(obj$options$azure$inputs)) {
storageCredentials <- rAzureBatch::getStorageCredentials()
Expand Down Expand Up @@ -282,6 +308,10 @@ setHttpTraffic <- function(value = FALSE) {

chunkSize <- 1

if (exists("chunkSize", envir = .doAzureBatchGlobals)) {
chunkSize <- get("chunkSize", envir = .doAzureBatchGlobals)
}

if (!is.null(obj$options$azure$chunkSize)) {
chunkSize <- obj$options$azure$chunkSize
}
Expand All @@ -290,25 +320,34 @@ setHttpTraffic <- function(value = FALSE) {
chunkSize <- obj$options$azure$chunksize
}

if (exists("chunkSize", envir = .doAzureBatchGlobals)) {
chunkSize <- get("chunkSize", envir = .doAzureBatchGlobals)
}

chunkSizeKeyValuePair <-
list(name = "chunkSize", value = as.character(chunkSize))

if (is.null(obj$packages)) {
metadata <-
list(enableCloudCombineKeyValuePair, chunkSizeKeyValuePair)
} else {
metadata <-
list(enableCloudCombineKeyValuePair, chunkSizeKeyValuePair)

if (!is.null(obj$packages)) {
packagesKeyValuePair <-
list(name = "packages",
value = paste(obj$packages, collapse = ";"))

metadata <-
list(enableCloudCombineKeyValuePair,
chunkSizeKeyValuePair,
packagesKeyValuePair)
metadata[[length(metadata) + 1]] <- packagesKeyValuePair
}

if (!is.null(obj$errorHandling)) {
errorHandlingKeyValuePair <-
list(name = "errorHandling",
value = as.character(obj$errorHandling))

metadata[[length(metadata) + 1]] <- errorHandlingKeyValuePair
}

if (!is.null(obj$options$azure$wait)) {
waitKeyValuePair <-
list(name = "wait",
value = as.character(obj$options$azure$wait))

metadata[[length(metadata) + 1]] <- waitKeyValuePair
}

retryCounter <- 0
Expand Down Expand Up @@ -454,6 +493,10 @@ setHttpTraffic <- function(value = FALSE) {
job <- rAzureBatch::getJob(id)
cat(sprintf("Id: %s", job$id), fill = TRUE)

if (!is.null(job$id)) {
saveMetadataBlob(job$id, metadata)
}

ntasks <- length(argsList)

startIndices <- seq(1, length(argsList), chunkSize)
Expand Down Expand Up @@ -542,7 +585,7 @@ setHttpTraffic <- function(value = FALSE) {

numberOfFailedTasks <- sum(unlist(failTasks))

if (numberOfFailedTasks > 0) {
if (numberOfFailedTasks > 0 && autoDeleteJob == FALSE) {
.createErrorViewerPane(id, failTasks)
}

Expand All @@ -563,13 +606,22 @@ setHttpTraffic <- function(value = FALSE) {
cat(sprintf("Number of errors: %i", numberOfFailedTasks),
fill = TRUE)

rAzureBatch::deleteJob(id)
# delete job from batch service and job result from storage blob
if (autoDeleteJob) {
deleteJob(id)
}

if (identical(obj$errorHandling, "stop") &&
!is.null(errorValue)) {
msg <- sprintf("task %d failed - '%s'",
errorIndex,
conditionMessage(errorValue))
msg <-
sprintf(
paste0(
"task %d failed - '%s'.\r\nBy default job and its result is deleted after run is over, use",
" setAutoDeleteJob(FALSE) or autoDeleteJob = FALSE option to keep them for investigation."
),
errorIndex,
conditionMessage(errorValue)
)
stop(simpleError(msg, call = expr))
}
else {
Expand Down
144 changes: 132 additions & 12 deletions R/jobUtilities.R
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ getJob <- function(jobId, verbose = TRUE) {
list(
chunkSize = 1,
enableCloudCombine = "TRUE",
packages = ""
packages = "",
errorHandling = "stop",
wait = "TRUE"
)

if (!is.null(job$metadata)) {
Expand All @@ -37,6 +39,10 @@ getJob <- function(jobId, verbose = TRUE) {
fill = TRUE)
cat(sprintf("\tpackages: %s", metadata$packages),
fill = TRUE)
cat(sprintf("\terrorHandling: %s", metadata$errorHandling),
fill = TRUE)
cat(sprintf("\twait: %s", metadata$wait),
fill = TRUE)
}

taskCounts <- rAzureBatch::getJobTaskCounts(jobId = jobId)
Expand All @@ -63,11 +69,13 @@ getJob <- function(jobId, verbose = TRUE) {
),
fill = TRUE
)
cat(sprintf("\njob state: %s", job$state), fill = TRUE)
}

jobObj <- list(jobId = job$id,
metadata = metadata,
tasks = tasks)
tasks = tasks,
jobState = job$state)

return(jobObj)
}
Expand Down Expand Up @@ -160,20 +168,132 @@ getJobResult <- function(jobId) {
stop("jobId must contain at least 3 characters.")
}

tempFile <- tempFile <- tempfile("getJobResult", fileext = ".rds")
metadata <- readMetadataBlob(jobId)

results <- rAzureBatch::downloadBlob(
jobId,
paste0("result/", jobId, "-merge-result.rds"),
downloadPath = tempFile,
overwrite = TRUE
)
if (!is.null(metadata)) {
if (metadata$enableCloudCombine == "FALSE") {
cat("enalbeCloudCombine is set to FALSE, no job merge result is available",
fill = TRUE)

return()
}

if (is.vector(results)) {
results <- readRDS(tempFile)
if (metadata$wait == "FALSE") {
job <- getJob(jobId, verbose = FALSE)

if (job$jobState == "active") {
stop(sprintf(
"job %s is not finished yet, please try again later",
job$jobId
))
} else if (job$jobState != "completed") {
stop(sprintf(
"job %s is %s state, no job result is available",
job$jobId,
job$jobState
))
}

# if the job has failed task
if (job$tasks$failed > 0) {
if (metadata$errorHandling == "stop") {
stop(
sprintf(
"job %s has failed tasks and error handling is set to 'stop', no result will be avaialble",
job$jobId
)
)
} else {
if (job$tasks$succeeded == 0) {
stop(sprintf(
"all tasks failed for job %s, no result will be avaialble",
job$jobId
))
}
}
}
}
}

return(results)
tempFile <- tempfile("getJobResult", fileext = ".rds")

retryCounter <- 0
maxRetryCount <- 3
repeat {
if (retryCounter > maxRetryCount) {
stop(
sprintf(
"Error getting job result: Maxmium number of retries (%d) reached\r\n%s",
maxRetryCount,
paste0(results, "\r\n")
)
)
} else {
retryCounter <- retryCounter + 1
}

results <- rAzureBatch::downloadBlob(
jobId,
paste0("result/", jobId, "-merge-result.rds"),
downloadPath = tempFile,
overwrite = TRUE
)

if (is.vector(results)) {
results <- readRDS(tempFile)
return(results)
}

# wait for 5 seconds for the result to be available
Sys.sleep(5)
}
}

#' Delete a job
#'
#' @param jobId A job id
#'
#' @examples
#' \dontrun{
#' deleteJob("job-001")
#' }
#' @export
deleteJob <- function(jobId) {
deleteStorageContainer(jobId)

response <- rAzureBatch::deleteJob(jobId, content = "response")

if (response$status_code == 202) {
cat(sprintf("Your job '%s' has been deleted.", jobId),
fill = TRUE)
} else if (response$status_code == 404) {
cat(sprintf("Job '%s' does not exist.", jobId),
fill = TRUE)
}
}

#' Terminate a job
#'
#' @param jobId A job id
#'
#' @examples
#' \dontrun{
#' terminateJob("job-001")
#' }
#' @export
terminateJob <- function(jobId) {
response <- rAzureBatch::terminateJob(jobId, content = "response")

if (response$status_code == 202) {
cat(sprintf("Your job '%s' has been terminated.", jobId),
fill = TRUE)
} else if (response$status_code == 404) {
cat(sprintf("Job '%s' does not exist.", jobId),
fill = TRUE)
} else if (response$status_code == 409) {
cat(sprintf("Job '%s' has already completed.", jobId),
fill = TRUE)
}
}

#' Wait for current tasks to complete
Expand Down
5 changes: 4 additions & 1 deletion R/storage_management.R
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ deleteStorageContainer <- function(container) {
rAzureBatch::deleteContainer(container, content = "response")

if (response$status_code == 202) {
cat(sprintf("Your container '%s' has been deleted.", container),
cat(sprintf("Your storage container '%s' has been deleted.", container),
fill = TRUE)
} else if (response$status_code == 404) {
cat(sprintf("storage container '%s' does not exist.", container),
fill = TRUE)
}

Expand Down
Loading