Skip to content
This repository was archived by the owner on Oct 12, 2023. It is now read-only.

Commit cbbe32b

Browse files
authored
Feature/longrunjob, long running job improvement, add deleteJob and terminateJob (#174)
* add documentation and sample for long running job * update sample file name * update long running job doc and test * update metadata code * add errorHandling to job metadata * add deleteJob to delete both job defintion and job result * styling fix * save foreach wait setting to metadata * implement retry logic in getjobresult * add terminateJob * handle various corner cases * regenerate document * add job state in getJob * do not fail getJobResult if getMetadata failed for backward compatibility * add deleteJob option to foreach, by default it is true for wait=TRUE job * styling fix * update version and changelog * address review feedback * add setJobAutoDelete function * rename jobAutoDelete to autoDeleteJob to workaround R bugs and update docs * update faq * fix styling issues * more styling fix * roll back manual update to DESCRIPTION * add namespace to api call
1 parent 8f90cd9 commit cbbe32b

16 files changed

+440
-45
lines changed

CHANGELOG.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
11
# Change Log
2-
## [0.6.1] 2017-11-13
2+
3+
## [0.6.1] 2017-12-05
34
### Added
45
- Support for users to use programmatically generated credentials and cluster config
6+
- Support for users to delete job and terminate job
7+
### Changed
8+
- [BREAKING CHANGE] when wait = TRUE, both job and job results are deleted at the end of the run, set jobAutoComplete to FALSE to keep them
9+
- Add retry to get job result
10+
- Add errorHandling and wait option to job metadata
11+
- Save job metadata to job result storage blob
512

613
## [0.6.0] 2017-11-03
714
### Added

NAMESPACE

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Generated by roxygen2: do not edit by hand
22

33
export(createOutputFile)
4+
export(deleteJob)
45
export(deleteStorageContainer)
56
export(deleteStorageFile)
67
export(generateClusterConfig)
@@ -16,11 +17,13 @@ export(listStorageFiles)
1617
export(makeCluster)
1718
export(registerDoAzureParallel)
1819
export(resizeCluster)
20+
export(setAutoDeleteJob)
1921
export(setChunkSize)
2022
export(setCredentials)
2123
export(setHttpTraffic)
2224
export(setReduce)
2325
export(setVerbose)
2426
export(stopCluster)
27+
export(terminateJob)
2528
export(waitForNodesToComplete)
2629
export(waitForTasksToComplete)

R/doAzureParallel.R

Lines changed: 69 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,20 @@ setChunkSize <- function(value = 1) {
7373
assign("chunkSize", value, envir = .doAzureBatchGlobals)
7474
}
7575

76+
#' Specify whether to delete job and its result after asychronous job is completed.
77+
#'
78+
#' @param value boolean of TRUE or FALSE
79+
#'
80+
#' @examples
81+
#' setAutoDeleteJob(FALSE)
82+
#' @export
83+
setAutoDeleteJob <- function(value = TRUE) {
84+
if (!is.logical(value))
85+
stop("setAutoDeleteJob requires a boolean argument")
86+
87+
assign("autoDeleteJob", value, envir = .doAzureBatchGlobals)
88+
}
89+
7690
#' Apply reduce function on a group of iterations of the foreach loop together per task.
7791
#'
7892
#' @param fun The number of iterations to group
@@ -232,6 +246,18 @@ setHttpTraffic <- function(value = FALSE) {
232246
wait <- obj$options$azure$wait
233247
}
234248

249+
# by default, delete both job and job result after synchronous job is completed
250+
autoDeleteJob <- TRUE
251+
252+
if (exists("autoDeleteJob", envir = .doAzureBatchGlobals)) {
253+
autoDeleteJob <- get("autoDeleteJob", envir = .doAzureBatchGlobals)
254+
}
255+
256+
if (!is.null(obj$options$azure$autoDeleteJob) &&
257+
is.logical(obj$options$azure$autoDeleteJob)) {
258+
autoDeleteJob <- obj$options$azure$autoDeleteJob
259+
}
260+
235261
inputs <- FALSE
236262
if (!is.null(obj$options$azure$inputs)) {
237263
storageCredentials <- rAzureBatch::getStorageCredentials()
@@ -282,6 +308,10 @@ setHttpTraffic <- function(value = FALSE) {
282308

283309
chunkSize <- 1
284310

311+
if (exists("chunkSize", envir = .doAzureBatchGlobals)) {
312+
chunkSize <- get("chunkSize", envir = .doAzureBatchGlobals)
313+
}
314+
285315
if (!is.null(obj$options$azure$chunkSize)) {
286316
chunkSize <- obj$options$azure$chunkSize
287317
}
@@ -290,25 +320,34 @@ setHttpTraffic <- function(value = FALSE) {
290320
chunkSize <- obj$options$azure$chunksize
291321
}
292322

293-
if (exists("chunkSize", envir = .doAzureBatchGlobals)) {
294-
chunkSize <- get("chunkSize", envir = .doAzureBatchGlobals)
295-
}
296-
297323
chunkSizeKeyValuePair <-
298324
list(name = "chunkSize", value = as.character(chunkSize))
299325

300-
if (is.null(obj$packages)) {
301-
metadata <-
302-
list(enableCloudCombineKeyValuePair, chunkSizeKeyValuePair)
303-
} else {
326+
metadata <-
327+
list(enableCloudCombineKeyValuePair, chunkSizeKeyValuePair)
328+
329+
if (!is.null(obj$packages)) {
304330
packagesKeyValuePair <-
305331
list(name = "packages",
306332
value = paste(obj$packages, collapse = ";"))
307333

308-
metadata <-
309-
list(enableCloudCombineKeyValuePair,
310-
chunkSizeKeyValuePair,
311-
packagesKeyValuePair)
334+
metadata[[length(metadata) + 1]] <- packagesKeyValuePair
335+
}
336+
337+
if (!is.null(obj$errorHandling)) {
338+
errorHandlingKeyValuePair <-
339+
list(name = "errorHandling",
340+
value = as.character(obj$errorHandling))
341+
342+
metadata[[length(metadata) + 1]] <- errorHandlingKeyValuePair
343+
}
344+
345+
if (!is.null(obj$options$azure$wait)) {
346+
waitKeyValuePair <-
347+
list(name = "wait",
348+
value = as.character(obj$options$azure$wait))
349+
350+
metadata[[length(metadata) + 1]] <- waitKeyValuePair
312351
}
313352

314353
retryCounter <- 0
@@ -454,6 +493,10 @@ setHttpTraffic <- function(value = FALSE) {
454493
job <- rAzureBatch::getJob(id)
455494
cat(sprintf("Id: %s", job$id), fill = TRUE)
456495

496+
if (!is.null(job$id)) {
497+
saveMetadataBlob(job$id, metadata)
498+
}
499+
457500
ntasks <- length(argsList)
458501

459502
startIndices <- seq(1, length(argsList), chunkSize)
@@ -542,7 +585,7 @@ setHttpTraffic <- function(value = FALSE) {
542585

543586
numberOfFailedTasks <- sum(unlist(failTasks))
544587

545-
if (numberOfFailedTasks > 0) {
588+
if (numberOfFailedTasks > 0 && autoDeleteJob == FALSE) {
546589
.createErrorViewerPane(id, failTasks)
547590
}
548591

@@ -563,13 +606,22 @@ setHttpTraffic <- function(value = FALSE) {
563606
cat(sprintf("Number of errors: %i", numberOfFailedTasks),
564607
fill = TRUE)
565608

566-
rAzureBatch::deleteJob(id)
609+
# delete job from batch service and job result from storage blob
610+
if (autoDeleteJob) {
611+
deleteJob(id)
612+
}
567613

568614
if (identical(obj$errorHandling, "stop") &&
569615
!is.null(errorValue)) {
570-
msg <- sprintf("task %d failed - '%s'",
571-
errorIndex,
572-
conditionMessage(errorValue))
616+
msg <-
617+
sprintf(
618+
paste0(
619+
"task %d failed - '%s'.\r\nBy default job and its result is deleted after run is over, use",
620+
" setAutoDeleteJob(FALSE) or autoDeleteJob = FALSE option to keep them for investigation."
621+
),
622+
errorIndex,
623+
conditionMessage(errorValue)
624+
)
573625
stop(simpleError(msg, call = expr))
574626
}
575627
else {

R/jobUtilities.R

Lines changed: 132 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ getJob <- function(jobId, verbose = TRUE) {
1919
list(
2020
chunkSize = 1,
2121
enableCloudCombine = "TRUE",
22-
packages = ""
22+
packages = "",
23+
errorHandling = "stop",
24+
wait = "TRUE"
2325
)
2426

2527
if (!is.null(job$metadata)) {
@@ -37,6 +39,10 @@ getJob <- function(jobId, verbose = TRUE) {
3739
fill = TRUE)
3840
cat(sprintf("\tpackages: %s", metadata$packages),
3941
fill = TRUE)
42+
cat(sprintf("\terrorHandling: %s", metadata$errorHandling),
43+
fill = TRUE)
44+
cat(sprintf("\twait: %s", metadata$wait),
45+
fill = TRUE)
4046
}
4147

4248
taskCounts <- rAzureBatch::getJobTaskCounts(jobId = jobId)
@@ -63,11 +69,13 @@ getJob <- function(jobId, verbose = TRUE) {
6369
),
6470
fill = TRUE
6571
)
72+
cat(sprintf("\njob state: %s", job$state), fill = TRUE)
6673
}
6774

6875
jobObj <- list(jobId = job$id,
6976
metadata = metadata,
70-
tasks = tasks)
77+
tasks = tasks,
78+
jobState = job$state)
7179

7280
return(jobObj)
7381
}
@@ -160,20 +168,132 @@ getJobResult <- function(jobId) {
160168
stop("jobId must contain at least 3 characters.")
161169
}
162170

163-
tempFile <- tempFile <- tempfile("getJobResult", fileext = ".rds")
171+
metadata <- readMetadataBlob(jobId)
164172

165-
results <- rAzureBatch::downloadBlob(
166-
jobId,
167-
paste0("result/", jobId, "-merge-result.rds"),
168-
downloadPath = tempFile,
169-
overwrite = TRUE
170-
)
173+
if (!is.null(metadata)) {
174+
if (metadata$enableCloudCombine == "FALSE") {
175+
cat("enalbeCloudCombine is set to FALSE, no job merge result is available",
176+
fill = TRUE)
177+
178+
return()
179+
}
180+
181+
if (metadata$wait == "FALSE") {
182+
job <- getJob(jobId, verbose = FALSE)
183+
184+
if (job$jobState == "active") {
185+
stop(sprintf(
186+
"job %s is not finished yet, please try again later",
187+
job$jobId
188+
))
189+
} else if (job$jobState != "completed") {
190+
stop(sprintf(
191+
"job %s is %s state, no job result is available",
192+
job$jobId,
193+
job$jobState
194+
))
195+
}
196+
197+
# if the job has failed task
198+
if (job$tasks$failed > 0) {
199+
if (metadata$errorHandling == "stop") {
200+
stop(
201+
sprintf(
202+
"job %s has failed tasks and error handling is set to 'stop', no result will be avaialble",
203+
job$jobId
204+
)
205+
)
206+
} else {
207+
if (job$tasks$succeeded == 0) {
208+
stop(sprintf(
209+
"all tasks failed for job %s, no result will be avaialble",
210+
job$jobId
211+
))
212+
}
213+
}
214+
}
215+
}
216+
}
217+
218+
tempFile <- tempfile("getJobResult", fileext = ".rds")
219+
220+
retryCounter <- 0
221+
maxRetryCount <- 3
222+
repeat {
223+
if (retryCounter > maxRetryCount) {
224+
stop(
225+
sprintf(
226+
"Error getting job result: Maxmium number of retries (%d) reached\r\n%s",
227+
maxRetryCount,
228+
paste0(results, "\r\n")
229+
)
230+
)
231+
} else {
232+
retryCounter <- retryCounter + 1
233+
}
234+
235+
results <- rAzureBatch::downloadBlob(
236+
jobId,
237+
paste0("result/", jobId, "-merge-result.rds"),
238+
downloadPath = tempFile,
239+
overwrite = TRUE
240+
)
171241

172-
if (is.vector(results)) {
173-
results <- readRDS(tempFile)
242+
if (is.vector(results)) {
243+
results <- readRDS(tempFile)
244+
return(results)
245+
}
246+
247+
# wait for 5 seconds for the result to be available
248+
Sys.sleep(5)
174249
}
250+
}
175251

176-
return(results)
252+
#' Delete a job
253+
#'
254+
#' @param jobId A job id
255+
#'
256+
#' @examples
257+
#' \dontrun{
258+
#' deleteJob("job-001")
259+
#' }
260+
#' @export
261+
deleteJob <- function(jobId) {
262+
deleteStorageContainer(jobId)
263+
264+
response <- rAzureBatch::deleteJob(jobId, content = "response")
265+
266+
if (response$status_code == 202) {
267+
cat(sprintf("Your job '%s' has been deleted.", jobId),
268+
fill = TRUE)
269+
} else if (response$status_code == 404) {
270+
cat(sprintf("Job '%s' does not exist.", jobId),
271+
fill = TRUE)
272+
}
273+
}
274+
275+
#' Terminate a job
276+
#'
277+
#' @param jobId A job id
278+
#'
279+
#' @examples
280+
#' \dontrun{
281+
#' terminateJob("job-001")
282+
#' }
283+
#' @export
284+
terminateJob <- function(jobId) {
285+
response <- rAzureBatch::terminateJob(jobId, content = "response")
286+
287+
if (response$status_code == 202) {
288+
cat(sprintf("Your job '%s' has been terminated.", jobId),
289+
fill = TRUE)
290+
} else if (response$status_code == 404) {
291+
cat(sprintf("Job '%s' does not exist.", jobId),
292+
fill = TRUE)
293+
} else if (response$status_code == 409) {
294+
cat(sprintf("Job '%s' has already completed.", jobId),
295+
fill = TRUE)
296+
}
177297
}
178298

179299
#' Wait for current tasks to complete

R/storage_management.R

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,10 @@ deleteStorageContainer <- function(container) {
3939
rAzureBatch::deleteContainer(container, content = "response")
4040

4141
if (response$status_code == 202) {
42-
cat(sprintf("Your container '%s' has been deleted.", container),
42+
cat(sprintf("Your storage container '%s' has been deleted.", container),
43+
fill = TRUE)
44+
} else if (response$status_code == 404) {
45+
cat(sprintf("storage container '%s' does not exist.", container),
4346
fill = TRUE)
4447
}
4548

0 commit comments

Comments
 (0)