From 5f58eb5459739fdc60d892ac564d97ed3cd7b749 Mon Sep 17 00:00:00 2001 From: zfengms Date: Wed, 4 Oct 2017 16:09:48 -0700 Subject: [PATCH 01/25] add documentation and sample for long running job --- docs/31-long-running-job.md | 83 +++++++++++++++++++++ docs/README.md | 4 + samples/long_running_job/long-running-job.R | 68 +++++++++++++++++ 3 files changed, 155 insertions(+) create mode 100644 docs/31-long-running-job.md create mode 100644 samples/long_running_job/long-running-job.R diff --git a/docs/31-long-running-job.md b/docs/31-long-running-job.md new file mode 100644 index 00000000..c54ad555 --- /dev/null +++ b/docs/31-long-running-job.md @@ -0,0 +1,83 @@ +# Long Running Job Management + +The doAzureParallel package allows you to manage long running jobs easily. There are 2 ways to run a job: +- Synchronous +- Asynchronous + +Long running job should run in asynchronous mode. + +## How to configure a job to run asynchronously +You can configure a job to run asynchronously by specifying wait = FALSE in job options: + +```R + options <- list(wait = FALSE) + jobId <- foreach(i = 1:number_of_iterations, .options.azure = options) %dopar% { ... } +``` +The returned value is the job ID associated with the foreach loop. Use this returned value you can get job status and job result. + +## Get job status + +getJob returns job metadata, such as chunk size, whether cloud combine is enabled, and packages specified for the job, it also returns task acount in different state + +```R + getJob(jobId) + getJob(jobId, verbose = TRUE) + + sample output: + -------------- + job metadata: + chunkSize: 1 + enableCloudCombine: TRUE + packages: httr + + tasks: + active: 1 + running: 0 + completed: 5 + succeeded: 0 + failed: 5 + total: 6 +``` + +## Get job list +You can use getJobList() to get a summary of all jobs. + +```R + getJobList() + + sample output: + -------------- + Id State Status FailedTasks TotalTasks +1 job11 active No tasks in the job 0 0 +2 job20170714215517 active 0 % 0 6 +3 job20170714220129 active 0 % 0 6 +4 job20170714221557 active 84 % 4 6 +5 job20170803210552 active 0 % 0 6 +6 job20170803212205 active 0 % 0 6 +7 job20170803212558 active 0 % 0 6 +8 job20170714211502 completed 100 % 5 6 +9 job20170714223236 completed 100 % 0 6 +``` + +You can also filter job list by job state such as Active or Completed +```R + filter <- filter <- list() + filter$state <- c("active", "completed") + getJobList(filter) +``` + +## Retrieve long running job result +Once job is completed successfully, you can call getJobResult to retrieve the job result: + +```R + jobResult <- getJobResult(jobId) +``` + +### Clean up + +Once you get the job result, you can delete the job. +```R + rAzureBatch::deleteJob(jobId) +``` + +A [working sample](../samples/long_running_job/long_running_job.R) can be found in the samples directory. diff --git a/docs/README.md b/docs/README.md index fec31aeb..853caa93 100644 --- a/docs/README.md +++ b/docs/README.md @@ -37,6 +37,10 @@ This section will provide information about how Azure works, how best to take ad Setting up your cluster to user's specific needs +9. **Long Running Job** [(link)](./31-long-running-job.md) + + Best practices for managing long running jobs + ## Additional Documentation Take a look at our [**Troubleshooting Guide**](./40-troubleshooting.md) for information on how to diagnose common issues. diff --git a/samples/long_running_job/long-running-job.R b/samples/long_running_job/long-running-job.R new file mode 100644 index 00000000..e2002be8 --- /dev/null +++ b/samples/long_running_job/long-running-job.R @@ -0,0 +1,68 @@ +# ============= +# === Setup === +# ============= + +# install packages +library(devtools) +install_github("azure/razurebatch") +install_github("azure/doazureparallel") + +# import the doAzureParallel library and its dependencies +library(doAzureParallel) + +credentialsFileName <- "credentials.json" +clusterFileName <- "cluster.json" + +# generate a credentials json file +generateCredentialsConfig(credentialsFileName) + +# set your credentials +setCredentials(credentialsFileName) + +# generate a cluster config file +generateClusterConfig(clusterFileName) + +# Create your cluster if not exist +cluster <- makeCluster(clusterFileName) + +# register your parallel backend +registerDoAzureParallel(cluster) + +# check that your workers are up +getDoParWorkers() + +# ======================================================= +# === Create long running job and get progress/result === +# ======================================================= + +options <- list(wait = FALSE) +'%dopar%' <- foreach::'%dopar%' +jobId <- + foreach::foreach( + i = 1:4, + .packages = c('httr'), + .options.azure = opt + ) %dopar% { + mean(1:3) + } + +job <- getJob(jobId) + +# get active/running job list +filter <- filter <- list() +filter$state <- c("active", "completed") +getJobList(filter) + +# get job list for all jobs +getJobList() + +# wait 2 minutes for long running job to finish +Sys.sleep(120) + +# get job result +jobResult <- getJobResult(jobId) + +doAzureParallel::stopCluster(cluster) + +# delete the job +rAzureBatch::deleteJob(jobId) From 91faa5f91682815623f1255416b18d9ac6e0c28b Mon Sep 17 00:00:00 2001 From: zfengms Date: Wed, 4 Oct 2017 16:15:17 -0700 Subject: [PATCH 02/25] update sample file name --- docs/31-long-running-job.md | 2 +- .../long_running_job/{long-running-job.R => long_running_job.R} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename samples/long_running_job/{long-running-job.R => long_running_job.R} (100%) diff --git a/docs/31-long-running-job.md b/docs/31-long-running-job.md index c54ad555..52cd411d 100644 --- a/docs/31-long-running-job.md +++ b/docs/31-long-running-job.md @@ -59,7 +59,7 @@ You can use getJobList() to get a summary of all jobs. 9 job20170714223236 completed 100 % 0 6 ``` -You can also filter job list by job state such as Active or Completed +You can also filter job list by job state such as active or completed ```R filter <- filter <- list() filter$state <- c("active", "completed") diff --git a/samples/long_running_job/long-running-job.R b/samples/long_running_job/long_running_job.R similarity index 100% rename from samples/long_running_job/long-running-job.R rename to samples/long_running_job/long_running_job.R From ea0d3e9546b7256441e6172026b4315ae06aa0a1 Mon Sep 17 00:00:00 2001 From: zfengms Date: Thu, 5 Oct 2017 12:24:36 -0700 Subject: [PATCH 03/25] update long running job doc and test --- docs/31-long-running-job.md | 10 ++++++++-- tests/testthat/test-long-running-job.R | 12 ++++++------ 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/docs/31-long-running-job.md b/docs/31-long-running-job.md index 52cd411d..58bd383d 100644 --- a/docs/31-long-running-job.md +++ b/docs/31-long-running-job.md @@ -13,11 +13,17 @@ You can configure a job to run asynchronously by specifying wait = FALSE in job options <- list(wait = FALSE) jobId <- foreach(i = 1:number_of_iterations, .options.azure = options) %dopar% { ... } ``` -The returned value is the job ID associated with the foreach loop. Use this returned value you can get job status and job result. +The returned value is the job Id associated with the foreach loop. Use this returned value you can get job status and job result. + +You can optionally specify the job Id in options as shown below: +```R + options <- list(wait = FALSE, job = 'myjob') + foreach(i = 1:number_of_iterations, .options.azure = options) %dopar% { ... } +``` ## Get job status -getJob returns job metadata, such as chunk size, whether cloud combine is enabled, and packages specified for the job, it also returns task acount in different state +getJob returns job metadata, such as chunk size, whether cloud combine is enabled, and packages specified for the job, it also returns task counts in different state ```R getJob(jobId) diff --git a/tests/testthat/test-long-running-job.R b/tests/testthat/test-long-running-job.R index da2d9d2a..97218f55 100644 --- a/tests/testthat/test-long-running-job.R +++ b/tests/testthat/test-long-running-job.R @@ -14,18 +14,18 @@ test_that("Long Running Job Test", { cluster <- doAzureParallel::makeCluster(clusterFileName) doAzureParallel::registerDoAzureParallel(cluster) - opt <- list(wait = FALSE) + options <- list(wait = FALSE, job = 'myjob') '%dopar%' <- foreach::'%dopar%' - res <- + jobId <- foreach::foreach( i = 1:4, .packages = c('httr'), - .options.azure = opt + .options.azure = options ) %dopar% { mean(1:3) } - job <- getJob(res) + job <- getJob(jobId) # get active/running job list filter <- filter <- list() @@ -39,7 +39,7 @@ test_that("Long Running Job Test", { Sys.sleep(120) # get job result - jobResult <- getJobResult(res) + jobResult <- getJobResult(jobId) doAzureParallel::stopCluster(cluster) @@ -51,5 +51,5 @@ test_that("Long Running Job Test", { list(2, 2, 2, 2)) # delete the job - rAzureBatch::deleteJob(res) + rAzureBatch::deleteJob(jobId) }) From a4f4b78365958cca0a126076411b288e81e6c024 Mon Sep 17 00:00:00 2001 From: zfengms Date: Fri, 13 Oct 2017 13:48:40 -0700 Subject: [PATCH 04/25] update metadata code --- R/doAzureParallel.R | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/R/doAzureParallel.R b/R/doAzureParallel.R index 583a1900..4a903b4d 100644 --- a/R/doAzureParallel.R +++ b/R/doAzureParallel.R @@ -360,18 +360,15 @@ setHttpTraffic <- function(value = FALSE) { chunkSizeKeyValuePair <- list(name = "chunkSize", value = as.character(chunkSize)) - if (is.null(obj$packages)) { - metadata <- - list(enableCloudCombineKeyValuePair, chunkSizeKeyValuePair) - } else { + metadata <- + list(enableCloudCombineKeyValuePair, chunkSizeKeyValuePair) + + if (!is.null(obj$packages)) { packagesKeyValuePair <- list(name = "packages", value = paste(obj$packages, collapse = ";")) - metadata <- - list(enableCloudCombineKeyValuePair, - chunkSizeKeyValuePair, - packagesKeyValuePair) + metadata[[3]] <-packagesKeyValuePair } response <- .addJob( From 2106f23c43c1b5fee26157f70a5118bd4fb86409 Mon Sep 17 00:00:00 2001 From: zfengms Date: Fri, 13 Oct 2017 15:49:53 -0700 Subject: [PATCH 05/25] add errorHandling to job metadata --- R/doAzureParallel.R | 13 ++++++++++++- R/utility.R | 5 ++++- tests/testthat/test-long-running-job.R | 4 +++- 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/R/doAzureParallel.R b/R/doAzureParallel.R index 4a903b4d..d3f9450c 100644 --- a/R/doAzureParallel.R +++ b/R/doAzureParallel.R @@ -363,12 +363,23 @@ setHttpTraffic <- function(value = FALSE) { metadata <- list(enableCloudCombineKeyValuePair, chunkSizeKeyValuePair) + metadataCount <- 3 if (!is.null(obj$packages)) { packagesKeyValuePair <- list(name = "packages", value = paste(obj$packages, collapse = ";")) - metadata[[3]] <-packagesKeyValuePair + metadata[[metadataCount]] <-packagesKeyValuePair + metadataCount <- metadataCount + 1 + } + + if (!is.null(obj$errorHandling)) { + errorHandlingKeyValuePair <- + list(name = "errorHandling", + value = as.character(obj$errorHandling)) + + metadata[[metadataCount]] <- errorHandlingKeyValuePair + metadataCount <- metadataCount + 1 } response <- .addJob( diff --git a/R/utility.R b/R/utility.R index 576c2c44..994655cd 100644 --- a/R/utility.R +++ b/R/utility.R @@ -149,7 +149,8 @@ getJob <- function(jobId, verbose = TRUE) { list( chunkSize = 1, enableCloudCombine = "TRUE", - packages = "" + packages = "", + errorHandling = "stop" ) if (!is.null(job$metadata)) { @@ -167,6 +168,8 @@ getJob <- function(jobId, verbose = TRUE) { fill = TRUE) cat(sprintf("\tpackages: %s", metadata$packages), fill = TRUE) + cat(sprintf("\terrorHandling: %s", metadata$errorHandling), + fill = TRUE) } taskCounts <- rAzureBatch::getJobTaskCounts(jobId = jobId) diff --git a/tests/testthat/test-long-running-job.R b/tests/testthat/test-long-running-job.R index 97218f55..3056cbb4 100644 --- a/tests/testthat/test-long-running-job.R +++ b/tests/testthat/test-long-running-job.R @@ -14,12 +14,14 @@ test_that("Long Running Job Test", { cluster <- doAzureParallel::makeCluster(clusterFileName) doAzureParallel::registerDoAzureParallel(cluster) - options <- list(wait = FALSE, job = 'myjob') + #options <- list(wait = FALSE, job = 'myjob') + options <- list(wait = FALSE) '%dopar%' <- foreach::'%dopar%' jobId <- foreach::foreach( i = 1:4, .packages = c('httr'), + .errorhandling = "stop", .options.azure = options ) %dopar% { mean(1:3) From 6c2fd92f3ba007d95a11d53086d8ee099d8d0f57 Mon Sep 17 00:00:00 2001 From: zfengms Date: Tue, 24 Oct 2017 16:28:04 -0700 Subject: [PATCH 06/25] add deleteJob to delete both job defintion and job result --- NAMESPACE | 1 + R/doAzureParallel.R | 2 + R/storage_management.R | 5 ++- R/utility.R | 79 ++++++++++++++++++++++++++++++++++++- docs/31-long-running-job.md | 7 +++- man/deleteJob.Rd | 23 +++++++++++ 6 files changed, 114 insertions(+), 3 deletions(-) create mode 100644 man/deleteJob.Rd diff --git a/NAMESPACE b/NAMESPACE index f10cbb06..03f79ce3 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -1,6 +1,7 @@ # Generated by roxygen2: do not edit by hand export(createOutputFile) +export(deleteJob) export(deleteStorageContainer) export(deleteStorageFile) export(generateClusterConfig) diff --git a/R/doAzureParallel.R b/R/doAzureParallel.R index d3f9450c..6ba3ca77 100644 --- a/R/doAzureParallel.R +++ b/R/doAzureParallel.R @@ -399,6 +399,8 @@ setHttpTraffic <- function(value = FALSE) { stop("The specified job already exists.") } + saveMetadataBlob(id, metadata) + break }, diff --git a/R/storage_management.R b/R/storage_management.R index dd8b7eec..ca82bf64 100644 --- a/R/storage_management.R +++ b/R/storage_management.R @@ -39,7 +39,10 @@ deleteStorageContainer <- function(container) { rAzureBatch::deleteContainer(container, content = "response") if (response$status_code == 202) { - cat(sprintf("Your container '%s' has been deleted.", container), + cat(sprintf("Your storage container '%s' has been deleted.", container), + fill = TRUE) + } else if (response$status_code == 404) { + cat(sprintf("storage container '%s' does not exist.", container), fill = TRUE) } diff --git a/R/utility.R b/R/utility.R index 994655cd..3d549f81 100644 --- a/R/utility.R +++ b/R/utility.R @@ -56,6 +56,34 @@ linuxWrapCommands <- function(commands = c()) { commandLine } +#' Delete a job +#' +#' @param jobId A job id +#' @param deleteResult TRUE to delete job result in storage blob +#' container, FALSE to keep the result in storage blob container. +#' +#' @examples +#' \dontrun{ +#' deleteJob("job-001") +#' deleteJob("job-001", deleteResult = FALSE) +#' } +#' @export +deleteJob <- function(jobId, deleteResult = TRUE) { + if (deleteResult == TRUE) { + deleteStorageContainer(jobId) + } + + response <- rAzureBatch::deleteJob(jobId, content = "response") + + if (response$status_code == 202) { + cat(sprintf("Your job '%s' has been deleted.", jobId), + fill = TRUE) + } else if (response$status_code == 404) { + cat(sprintf("Job '%s' does not exist.", jobId), + fill = TRUE) + } +} + #' Get a list of job statuses from the given filter #' #' @param filter A filter containing job state @@ -351,7 +379,14 @@ getJobResult <- function(jobId) { stop("jobId must contain at least 3 characters.") } - tempFile <- tempFile <- tempfile("getJobResult", fileext = ".rds") + metadata <- readMetadataBlob(jobId) + + if (metadata$enableCloudCombine == "FALSE" ) { + cat("enalbeCloudCombine is set to FALSE, no job merge result is available", fill = TRUE) + return() + } + + tempFile <- tempfile("getJobResult", fileext = ".rds") results <- rAzureBatch::downloadBlob( jobId, @@ -573,6 +608,48 @@ getXmlValues <- function(xmlResponse, xmlPath) { xml2::xml_text(xml2::xml_find_all(xmlResponse, xmlPath)) } +saveMetadataBlob <- function(jobId, metadata) { + xmlNode <- "" + if (length(metadata) > 0) { + for (i in 1:length(metadata)) { + xmlNode <- + paste0(xmlNode, + sprintf("<%s>%s", metadata[[i]]$name, metadata[[i]]$value, metadata[[i]]$name)) + } + } + xmlNode <- paste0(xmlNode, "") + saveXmlBlob(jobId, xmlNode, "metadata") +} + +saveXmlBlob <- function(jobId, xmlBlock, name) { + xmlFile <- paste0(jobId, "-", name, ".rds") + saveRDS(xmlBlock, file = xmlFile) + rAzureBatch::uploadBlob(jobId, paste0(getwd(), "/", xmlFile)) + file.remove(xmlFile) +} + +readMetadataBlob <- function(jobId) { + # xmlResponse <- + # rAzureBatch::listBlobs(jobId , paste0(jobId, "-metadata.rds"), content = "parsed") + tempFile <- tempfile(paste0(jobId, "-metadata"), fileext = ".rds") + result <- rAzureBatch::downloadBlob( + jobId, + paste0(jobId, "-metadata.rds"), + downloadPath = tempFile, + overwrite = TRUE + ) + result <- readRDS(tempFile) + result <- xml2::as_xml_document(result) + chunkSize <- getXmlValues(result, ".//chunkSize") + packages <- getXmlValues(result, ".//packages") + errorHandling <- getXmlValues(result, ".//errorHandling") + enableCloudCombine <- getXmlValues(result, ".//enableCloudCombine") + + metadata <- list(chunkSize = chunkSize, packages = packages, errorHandling = errorHandling, enableCloudCombine = enableCloudCombine) + + metadata +} + areShallowEqual <- function(a, b) { !is.null(a) && !is.null(b) && a == b } diff --git a/docs/31-long-running-job.md b/docs/31-long-running-job.md index 58bd383d..d2272bab 100644 --- a/docs/31-long-running-job.md +++ b/docs/31-long-running-job.md @@ -83,7 +83,12 @@ Once job is completed successfully, you can call getJobResult to retrieve the jo Once you get the job result, you can delete the job. ```R - rAzureBatch::deleteJob(jobId) + deleteJob(jobId) +``` + +Please note deleteJob will delete the job at batch service, by default, it also deletes the storage container holding the job result. If you want to keep the job result around, you can set deleteResult parameter to FALSE +```R + deleteJob(jobId, deleteResult = FALSE) ``` A [working sample](../samples/long_running_job/long_running_job.R) can be found in the samples directory. diff --git a/man/deleteJob.Rd b/man/deleteJob.Rd new file mode 100644 index 00000000..2e46a4bf --- /dev/null +++ b/man/deleteJob.Rd @@ -0,0 +1,23 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/utility.R +\name{deleteJob} +\alias{deleteJob} +\title{Delete a job} +\usage{ +deleteJob(jobId, deleteResult = TRUE) +} +\arguments{ +\item{jobId}{A job id} + +\item{deleteResult}{TRUE to delete job result in storage blob, +FALSE to keep the result in storage blob} +} +\description{ +Delete a job +} +\examples{ +\dontrun{ +deleteJob("job-001") +deleteJob("job-001", deleteResult = FALSE) +} +} From f7c5b272b3c4e858b2f60119215390cf0b08afbf Mon Sep 17 00:00:00 2001 From: zfengms Date: Tue, 24 Oct 2017 17:37:43 -0700 Subject: [PATCH 07/25] styling fix --- R/doAzureParallel.R | 2 +- R/utility.R | 29 +++++++++++++++++++------- tests/testthat/test-long-running-job.R | 7 +++---- 3 files changed, 25 insertions(+), 13 deletions(-) diff --git a/R/doAzureParallel.R b/R/doAzureParallel.R index 1e6b27df..ab92011c 100644 --- a/R/doAzureParallel.R +++ b/R/doAzureParallel.R @@ -291,7 +291,7 @@ setHttpTraffic <- function(value = FALSE) { list(name = "packages", value = paste(obj$packages, collapse = ";")) - metadata[[metadataCount]] <-packagesKeyValuePair + metadata[[metadataCount]] <- packagesKeyValuePair metadataCount <- metadataCount + 1 } diff --git a/R/utility.R b/R/utility.R index 3d549f81..c70f7b68 100644 --- a/R/utility.R +++ b/R/utility.R @@ -381,8 +381,9 @@ getJobResult <- function(jobId) { metadata <- readMetadataBlob(jobId) - if (metadata$enableCloudCombine == "FALSE" ) { - cat("enalbeCloudCombine is set to FALSE, no job merge result is available", fill = TRUE) + if (metadata$enableCloudCombine == "FALSE") { + cat("enalbeCloudCombine is set to FALSE, no job merge result is available", + fill = TRUE) return() } @@ -613,8 +614,15 @@ saveMetadataBlob <- function(jobId, metadata) { if (length(metadata) > 0) { for (i in 1:length(metadata)) { xmlNode <- - paste0(xmlNode, - sprintf("<%s>%s", metadata[[i]]$name, metadata[[i]]$value, metadata[[i]]$name)) + paste0( + xmlNode, + sprintf( + "<%s>%s", + metadata[[i]]$name, + metadata[[i]]$value, + metadata[[i]]$name + ) + ) } } xmlNode <- paste0(xmlNode, "") @@ -629,8 +637,6 @@ saveXmlBlob <- function(jobId, xmlBlock, name) { } readMetadataBlob <- function(jobId) { - # xmlResponse <- - # rAzureBatch::listBlobs(jobId , paste0(jobId, "-metadata.rds"), content = "parsed") tempFile <- tempfile(paste0(jobId, "-metadata"), fileext = ".rds") result <- rAzureBatch::downloadBlob( jobId, @@ -643,9 +649,16 @@ readMetadataBlob <- function(jobId) { chunkSize <- getXmlValues(result, ".//chunkSize") packages <- getXmlValues(result, ".//packages") errorHandling <- getXmlValues(result, ".//errorHandling") - enableCloudCombine <- getXmlValues(result, ".//enableCloudCombine") + enableCloudCombine <- + getXmlValues(result, ".//enableCloudCombine") - metadata <- list(chunkSize = chunkSize, packages = packages, errorHandling = errorHandling, enableCloudCombine = enableCloudCombine) + metadata <- + list( + chunkSize = chunkSize, + packages = packages, + errorHandling = errorHandling, + enableCloudCombine = enableCloudCombine + ) metadata } diff --git a/tests/testthat/test-long-running-job.R b/tests/testthat/test-long-running-job.R index 3056cbb4..482126f9 100644 --- a/tests/testthat/test-long-running-job.R +++ b/tests/testthat/test-long-running-job.R @@ -14,14 +14,14 @@ test_that("Long Running Job Test", { cluster <- doAzureParallel::makeCluster(clusterFileName) doAzureParallel::registerDoAzureParallel(cluster) - #options <- list(wait = FALSE, job = 'myjob') - options <- list(wait = FALSE) + options <- list(wait = FALSE, + enableCloudCombine = TRUE) '%dopar%' <- foreach::'%dopar%' jobId <- foreach::foreach( i = 1:4, .packages = c('httr'), - .errorhandling = "stop", + .errorhandling = "remove", .options.azure = options ) %dopar% { mean(1:3) @@ -42,7 +42,6 @@ test_that("Long Running Job Test", { # get job result jobResult <- getJobResult(jobId) - doAzureParallel::stopCluster(cluster) # verify the job result is correct From 58de322202e11dd5561c579510099e94ceea0429 Mon Sep 17 00:00:00 2001 From: zfengms Date: Tue, 24 Oct 2017 18:56:43 -0700 Subject: [PATCH 08/25] save foreach wait setting to metadata --- R/doAzureParallel.R | 9 +++++++++ R/utility.R | 9 +++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/R/doAzureParallel.R b/R/doAzureParallel.R index ab92011c..ee101105 100644 --- a/R/doAzureParallel.R +++ b/R/doAzureParallel.R @@ -304,6 +304,15 @@ setHttpTraffic <- function(value = FALSE) { metadataCount <- metadataCount + 1 } + if (!is.null(obj$options$azure$wait)) { + waitKeyValuePair <- + list(name = "wait", + value = as.character(obj$options$azure$wait)) + + metadata[[metadataCount]] <- waitKeyValuePair + metadataCount <- metadataCount + 1 + } + retryCounter <- 0 maxRetryCount <- 5 startupFolderName <- "startup" diff --git a/R/utility.R b/R/utility.R index c70f7b68..1a18f03a 100644 --- a/R/utility.R +++ b/R/utility.R @@ -178,7 +178,8 @@ getJob <- function(jobId, verbose = TRUE) { chunkSize = 1, enableCloudCombine = "TRUE", packages = "", - errorHandling = "stop" + errorHandling = "stop", + wait = "TRUE" ) if (!is.null(job$metadata)) { @@ -198,6 +199,8 @@ getJob <- function(jobId, verbose = TRUE) { fill = TRUE) cat(sprintf("\terrorHandling: %s", metadata$errorHandling), fill = TRUE) + cat(sprintf("\twait: %s", metadata$wait), + fill = TRUE) } taskCounts <- rAzureBatch::getJobTaskCounts(jobId = jobId) @@ -649,6 +652,7 @@ readMetadataBlob <- function(jobId) { chunkSize <- getXmlValues(result, ".//chunkSize") packages <- getXmlValues(result, ".//packages") errorHandling <- getXmlValues(result, ".//errorHandling") + wait <- getXmlValues(result, ".//wait") enableCloudCombine <- getXmlValues(result, ".//enableCloudCombine") @@ -657,7 +661,8 @@ readMetadataBlob <- function(jobId) { chunkSize = chunkSize, packages = packages, errorHandling = errorHandling, - enableCloudCombine = enableCloudCombine + enableCloudCombine = enableCloudCombine, + wait = wait ) metadata From b4248cffe0e055ae7c0c7196cf8c03cd472409ef Mon Sep 17 00:00:00 2001 From: zfengms Date: Tue, 24 Oct 2017 19:33:43 -0700 Subject: [PATCH 09/25] implement retry logic in getjobresult --- R/utility.R | 58 +++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 50 insertions(+), 8 deletions(-) diff --git a/R/utility.R b/R/utility.R index 1a18f03a..dfca25b0 100644 --- a/R/utility.R +++ b/R/utility.R @@ -387,20 +387,62 @@ getJobResult <- function(jobId) { if (metadata$enableCloudCombine == "FALSE") { cat("enalbeCloudCombine is set to FALSE, no job merge result is available", fill = TRUE) + return() } + if (metadata$wait == "FALSE") { + job <- getJob(jobId, verbose = FALSE) + + if (job$tasks$active != 0 && job$tasks$running != 0) { + stop(sprintf( + "job %s is not finished yet, please try again later", + job$jobId + )) + } + + # if the job has failed task + if (job$tasks$failed > 0 && metadata$errorHandling == "stop") { + stop( + sprintf( + "job %s has failed tasks and error handling is set to 'stop', no result will be avaialble", + job$jobId + ) + ) + } + } + tempFile <- tempfile("getJobResult", fileext = ".rds") - results <- rAzureBatch::downloadBlob( - jobId, - paste0("result/", jobId, "-merge-result.rds"), - downloadPath = tempFile, - overwrite = TRUE - ) + retryCounter <- 0 + maxRetryCount <- 3 + repeat { + if (retryCounter > maxRetryCount) { + stop( + sprintf( + "Error getting job result: Maxmium number of retries (%d) reached", + maxRetryCount + ) + ) + } else { + retryCounter <- retryCounter + 1 + } - if (is.vector(results)) { - results <- readRDS(tempFile) + results <- rAzureBatch::downloadBlob( + jobId, + paste0("result/", jobId, "-merge-result.rds"), + downloadPath = tempFile, + overwrite = TRUE + ) + + if (is.vector(results)) { + results <- readRDS(tempFile) + break + + } + + # wait for 10 seconds for the result to be available + Sys.sleep(10) } return(results) From 08e8943a971e21d440e96858d1438397c596403b Mon Sep 17 00:00:00 2001 From: zfengms Date: Wed, 25 Oct 2017 16:33:08 -0700 Subject: [PATCH 10/25] add terminateJob --- NAMESPACE | 1 + R/utility.R | 24 ++++++++++++++++++++++++ man/deleteJob.Rd | 4 ++-- man/terminateJob.Rd | 19 +++++++++++++++++++ 4 files changed, 46 insertions(+), 2 deletions(-) create mode 100644 man/terminateJob.Rd diff --git a/NAMESPACE b/NAMESPACE index 03f79ce3..6c9c2e27 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -23,5 +23,6 @@ export(setHttpTraffic) export(setReduce) export(setVerbose) export(stopCluster) +export(terminateJob) export(waitForNodesToComplete) export(waitForTasksToComplete) diff --git a/R/utility.R b/R/utility.R index dfca25b0..5d7f12ed 100644 --- a/R/utility.R +++ b/R/utility.R @@ -84,6 +84,30 @@ deleteJob <- function(jobId, deleteResult = TRUE) { } } +#' Terminate a job +#' +#' @param jobId A job id +#' +#' @examples +#' \dontrun{ +#' terminateJob("job-001") +#' } +#' @export +terminateJob <- function(jobId) { + response <- rAzureBatch::terminateJob(jobId, content = "response") + + if (response$status_code == 202) { + cat(sprintf("Your job '%s' has been terminated.", jobId), + fill = TRUE) + } else if (response$status_code == 404) { + cat(sprintf("Job '%s' does not exist.", jobId), + fill = TRUE) + } else if (response$status_code == 409) { + cat(sprintf("Job '%s' has already completed.", jobId), + fill = TRUE) + } +} + #' Get a list of job statuses from the given filter #' #' @param filter A filter containing job state diff --git a/man/deleteJob.Rd b/man/deleteJob.Rd index 2e46a4bf..8a58f838 100644 --- a/man/deleteJob.Rd +++ b/man/deleteJob.Rd @@ -9,8 +9,8 @@ deleteJob(jobId, deleteResult = TRUE) \arguments{ \item{jobId}{A job id} -\item{deleteResult}{TRUE to delete job result in storage blob, -FALSE to keep the result in storage blob} +\item{deleteResult}{TRUE to delete job result in storage blob +container, FALSE to keep the result in storage blob container.} } \description{ Delete a job diff --git a/man/terminateJob.Rd b/man/terminateJob.Rd new file mode 100644 index 00000000..d3dcdac2 --- /dev/null +++ b/man/terminateJob.Rd @@ -0,0 +1,19 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/utility.R +\name{terminateJob} +\alias{terminateJob} +\title{Terminate a job} +\usage{ +terminateJob(jobId) +} +\arguments{ +\item{jobId}{A job id} +} +\description{ +Terminate a job +} +\examples{ +\dontrun{ +terminateJob("job-001") +} +} From 11f648493e54d8831f7f77b82ddfafe396c8e87d Mon Sep 17 00:00:00 2001 From: zfengms Date: Mon, 30 Oct 2017 19:14:45 -0700 Subject: [PATCH 11/25] handle various corner cases --- CHANGELOG.md | 8 +++++++ R/utility.R | 67 +++++++++++++++++++++++++++++++--------------------- 2 files changed, 48 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a3ff1801..d5e91ac8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,12 @@ # Change Log +## [0.5.2] 2017-10-30 +### Added +- Support for users to delete job and terminate job +### Changed +- Add retry to get job result +- Add errorHandling and wait option to job metadata +- Save job metadata to job result storage blob + ## [0.5.1] 2017-09-28 ### Added - Support for users to get job and job results for long running job diff --git a/R/utility.R b/R/utility.R index 5d7f12ed..1185339d 100644 --- a/R/utility.R +++ b/R/utility.R @@ -426,13 +426,24 @@ getJobResult <- function(jobId) { } # if the job has failed task - if (job$tasks$failed > 0 && metadata$errorHandling == "stop") { - stop( - sprintf( - "job %s has failed tasks and error handling is set to 'stop', no result will be avaialble", - job$jobId + if (job$tasks$failed > 0) { + if (metadata$errorHandling == "stop") { + stop( + sprintf( + "job %s has failed tasks and error handling is set to 'stop', no result will be avaialble", + job$jobId + ) ) - ) + } else { + if (job$tasks$succeeded == 0) { + stop( + sprintf( + "all tasks failed for job %s, no result will be avaialble", + job$jobId + ) + ) + } + } } } @@ -461,15 +472,12 @@ getJobResult <- function(jobId) { if (is.vector(results)) { results <- readRDS(tempFile) - break - + return(results) } # wait for 10 seconds for the result to be available Sys.sleep(10) } - - return(results) } #' Utility function for creating an output file @@ -713,25 +721,30 @@ readMetadataBlob <- function(jobId) { downloadPath = tempFile, overwrite = TRUE ) - result <- readRDS(tempFile) - result <- xml2::as_xml_document(result) - chunkSize <- getXmlValues(result, ".//chunkSize") - packages <- getXmlValues(result, ".//packages") - errorHandling <- getXmlValues(result, ".//errorHandling") - wait <- getXmlValues(result, ".//wait") - enableCloudCombine <- - getXmlValues(result, ".//enableCloudCombine") - metadata <- - list( - chunkSize = chunkSize, - packages = packages, - errorHandling = errorHandling, - enableCloudCombine = enableCloudCombine, - wait = wait - ) + if (is.vector(result)) { + result <- readRDS(tempFile) + result <- xml2::as_xml_document(result) + chunkSize <- getXmlValues(result, ".//chunkSize") + packages <- getXmlValues(result, ".//packages") + errorHandling <- getXmlValues(result, ".//errorHandling") + wait <- getXmlValues(result, ".//wait") + enableCloudCombine <- + getXmlValues(result, ".//enableCloudCombine") + + metadata <- + list( + chunkSize = chunkSize, + packages = packages, + errorHandling = errorHandling, + enableCloudCombine = enableCloudCombine, + wait = wait + ) - metadata + metadata + } else { + stop(paste0(result, "\r\n")) + } } areShallowEqual <- function(a, b) { From e40d99f2fdf35c14c3bf53c72286985b7084bcb2 Mon Sep 17 00:00:00 2001 From: zfengms Date: Fri, 17 Nov 2017 14:38:51 -0800 Subject: [PATCH 12/25] regenerate document --- man/deleteJob.Rd | 2 +- man/getJob.Rd | 2 +- man/getJobList.Rd | 2 +- man/getJobResult.Rd | 2 +- man/terminateJob.Rd | 2 +- man/waitForTasksToComplete.Rd | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/man/deleteJob.Rd b/man/deleteJob.Rd index 8a58f838..3420e28a 100644 --- a/man/deleteJob.Rd +++ b/man/deleteJob.Rd @@ -1,5 +1,5 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/utility.R +% Please edit documentation in R/jobUtilities.R \name{deleteJob} \alias{deleteJob} \title{Delete a job} diff --git a/man/getJob.Rd b/man/getJob.Rd index b42af61d..5113b368 100644 --- a/man/getJob.Rd +++ b/man/getJob.Rd @@ -1,5 +1,5 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/utility.R +% Please edit documentation in R/jobUtilities.R \name{getJob} \alias{getJob} \title{Get a job for the given job id} diff --git a/man/getJobList.Rd b/man/getJobList.Rd index 75e7ea6b..93313b5b 100644 --- a/man/getJobList.Rd +++ b/man/getJobList.Rd @@ -1,5 +1,5 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/utility.R +% Please edit documentation in R/jobUtilities.R \name{getJobList} \alias{getJobList} \title{Get a list of job statuses from the given filter} diff --git a/man/getJobResult.Rd b/man/getJobResult.Rd index 1e001f9e..fa03e026 100644 --- a/man/getJobResult.Rd +++ b/man/getJobResult.Rd @@ -1,5 +1,5 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/utility.R +% Please edit documentation in R/jobUtilities.R \name{getJobResult} \alias{getJobResult} \title{Download the results of the job} diff --git a/man/terminateJob.Rd b/man/terminateJob.Rd index d3dcdac2..8f6aaedd 100644 --- a/man/terminateJob.Rd +++ b/man/terminateJob.Rd @@ -1,5 +1,5 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/utility.R +% Please edit documentation in R/jobUtilities.R \name{terminateJob} \alias{terminateJob} \title{Terminate a job} diff --git a/man/waitForTasksToComplete.Rd b/man/waitForTasksToComplete.Rd index d2207324..47696d4b 100644 --- a/man/waitForTasksToComplete.Rd +++ b/man/waitForTasksToComplete.Rd @@ -1,5 +1,5 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/utility.R +% Please edit documentation in R/jobUtilities.R \name{waitForTasksToComplete} \alias{waitForTasksToComplete} \title{Wait for current tasks to complete} From a64b9aa97a78d846cb56568f63db5e9b7c205a7c Mon Sep 17 00:00:00 2001 From: zfengms Date: Fri, 17 Nov 2017 14:56:01 -0800 Subject: [PATCH 13/25] add job state in getJob --- R/jobUtilities.R | 4 +++- tests/testthat/test-long-running-job.R | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/R/jobUtilities.R b/R/jobUtilities.R index 531983ed..ed8e7357 100644 --- a/R/jobUtilities.R +++ b/R/jobUtilities.R @@ -69,11 +69,13 @@ getJob <- function(jobId, verbose = TRUE) { ), fill = TRUE ) + cat(sprintf("\njob state: %s", job$state), fill = TRUE) } jobObj <- list(jobId = job$id, metadata = metadata, - tasks = tasks) + tasks = tasks, + jobState = job$state) return(jobObj) } diff --git a/tests/testthat/test-long-running-job.R b/tests/testthat/test-long-running-job.R index 16e68fd7..e92684cd 100644 --- a/tests/testthat/test-long-running-job.R +++ b/tests/testthat/test-long-running-job.R @@ -53,5 +53,5 @@ test_that("Long Running Job Test", { list(2, 2, 2, 2)) # delete the job - rAzureBatch::deleteJob(jobId) + deleteJob(jobId) }) From 2afb05d2d08878e20d264f4d46f33685fd3f32b1 Mon Sep 17 00:00:00 2001 From: zfengms Date: Mon, 20 Nov 2017 17:02:25 -0800 Subject: [PATCH 14/25] do not fail getJobResult if getMetadata failed for backward compatibility --- R/jobUtilities.R | 71 +++++++++++++++++++++++++----------------------- R/utility.R | 4 +-- 2 files changed, 39 insertions(+), 36 deletions(-) diff --git a/R/jobUtilities.R b/R/jobUtilities.R index b6a3373e..7fb22823 100644 --- a/R/jobUtilities.R +++ b/R/jobUtilities.R @@ -170,44 +170,46 @@ getJobResult <- function(jobId) { metadata <- readMetadataBlob(jobId) - if (metadata$enableCloudCombine == "FALSE") { - cat("enalbeCloudCombine is set to FALSE, no job merge result is available", - fill = TRUE) - - return() - } + if (!is.null(metadata)) { + if (metadata$enableCloudCombine == "FALSE") { + cat("enalbeCloudCombine is set to FALSE, no job merge result is available", + fill = TRUE) - if (metadata$wait == "FALSE") { - job <- getJob(jobId, verbose = FALSE) - - if (job$jobState == "active") { - stop(sprintf("job %s is not finished yet, please try again later", - job$jobId)) - } else if (job$jobState != "completed") { - stop(sprintf( - "job %s is %s state, no job result is available", - job$jobId, - job$jobState - )) + return() } - # if the job has failed task - if (job$tasks$failed > 0) { - if (metadata$errorHandling == "stop") { - stop( - sprintf( - "job %s has failed tasks and error handling is set to 'stop', no result will be avaialble", - job$jobId - ) - ) - } else { - if (job$tasks$succeeded == 0) { + if (metadata$wait == "FALSE") { + job <- getJob(jobId, verbose = FALSE) + + if (job$jobState == "active") { + stop(sprintf( + "job %s is not finished yet, please try again later", + job$jobId + )) + } else if (job$jobState != "completed") { + stop(sprintf( + "job %s is %s state, no job result is available", + job$jobId, + job$jobState + )) + } + + # if the job has failed task + if (job$tasks$failed > 0) { + if (metadata$errorHandling == "stop") { stop( sprintf( - "all tasks failed for job %s, no result will be avaialble", + "job %s has failed tasks and error handling is set to 'stop', no result will be avaialble", job$jobId ) ) + } else { + if (job$tasks$succeeded == 0) { + stop(sprintf( + "all tasks failed for job %s, no result will be avaialble", + job$jobId + )) + } } } } @@ -221,8 +223,9 @@ getJobResult <- function(jobId) { if (retryCounter > maxRetryCount) { stop( sprintf( - "Error getting job result: Maxmium number of retries (%d) reached", - maxRetryCount + "Error getting job result: Maxmium number of retries (%d) reached\r\n%s", + maxRetryCount, + paste0(results, "\r\n") ) ) } else { @@ -241,8 +244,8 @@ getJobResult <- function(jobId) { return(results) } - # wait for 10 seconds for the result to be available - Sys.sleep(10) + # wait for 5 seconds for the result to be available + Sys.sleep(5) } } diff --git a/R/utility.R b/R/utility.R index 65da271a..6328407d 100644 --- a/R/utility.R +++ b/R/utility.R @@ -234,9 +234,9 @@ readMetadataBlob <- function(jobId) { wait = wait ) - metadata + return(metadata) } else { - stop(paste0(result, "\r\n")) + return(NULL) } } From 1ce98e2fe4d9684a14024239339e439d775db9ec Mon Sep 17 00:00:00 2001 From: zfengms Date: Mon, 20 Nov 2017 20:30:32 -0800 Subject: [PATCH 15/25] add deleteJob option to foreach, by default it is true for wait=TRUE job --- R/doAzureParallel.R | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/R/doAzureParallel.R b/R/doAzureParallel.R index 5b15d2cc..8fa15526 100644 --- a/R/doAzureParallel.R +++ b/R/doAzureParallel.R @@ -232,6 +232,11 @@ setHttpTraffic <- function(value = FALSE) { wait <- obj$options$azure$wait } + deleteJob <- TRUE + if (!is.null(obj$options$azure$deleteJob)) { + wait <- obj$options$azure$deleteJob + } + inputs <- FALSE if (!is.null(obj$options$azure$inputs)) { storageCredentials <- rAzureBatch::getStorageCredentials() @@ -584,7 +589,10 @@ setHttpTraffic <- function(value = FALSE) { cat(sprintf("Number of errors: %i", numberOfFailedTasks), fill = TRUE) - rAzureBatch::deleteJob(id) + # delete job from batch service and job result from storage blob + if(deleteJob) { + deleteJob(id) + } if (identical(obj$errorHandling, "stop") && !is.null(errorValue)) { From 43621857e03589cdfe56ef52aff1e361ed03776f Mon Sep 17 00:00:00 2001 From: zfengms Date: Tue, 21 Nov 2017 19:16:12 -0800 Subject: [PATCH 16/25] styling fix --- R/doAzureParallel.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/doAzureParallel.R b/R/doAzureParallel.R index 8fa15526..6ca1c11f 100644 --- a/R/doAzureParallel.R +++ b/R/doAzureParallel.R @@ -590,7 +590,7 @@ setHttpTraffic <- function(value = FALSE) { fill = TRUE) # delete job from batch service and job result from storage blob - if(deleteJob) { + if (deleteJob) { deleteJob(id) } From 29bb5a5e0926a9d37050313cbccfd2eed02bb7d4 Mon Sep 17 00:00:00 2001 From: zfengms Date: Tue, 21 Nov 2017 19:41:10 -0800 Subject: [PATCH 17/25] update version and changelog --- CHANGELOG.md | 1 + DESCRIPTION | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 270a7bd5..3e591535 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ### Added - Support for users to delete job and terminate job ### Changed +- [BREAKING CHANGE] when wait = TRUE, both job and job results are deleted at the end of the run - Add retry to get job result - Add errorHandling and wait option to job metadata - Save job metadata to job result storage blob diff --git a/DESCRIPTION b/DESCRIPTION index 9254f395..0ec4e325 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: doAzureParallel Type: Package Title: doAzureParallel -Version: 0.6.1 +Version: 0.6.2 Author: Brian Hoang Maintainer: Brian Hoang Description: The project is for data experts who use R at scale. The project From c7b045334a2f4e3efaa2423d3f6b041a29129960 Mon Sep 17 00:00:00 2001 From: zfengms Date: Tue, 5 Dec 2017 12:51:50 -0800 Subject: [PATCH 18/25] address review feedback --- R/doAzureParallel.R | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/R/doAzureParallel.R b/R/doAzureParallel.R index 6ca1c11f..2aeb9976 100644 --- a/R/doAzureParallel.R +++ b/R/doAzureParallel.R @@ -232,6 +232,7 @@ setHttpTraffic <- function(value = FALSE) { wait <- obj$options$azure$wait } + # by default, delete both job and job result after job is completed (both synchronous and asynchronous) deleteJob <- TRUE if (!is.null(obj$options$azure$deleteJob)) { wait <- obj$options$azure$deleteJob @@ -305,14 +306,12 @@ setHttpTraffic <- function(value = FALSE) { metadata <- list(enableCloudCombineKeyValuePair, chunkSizeKeyValuePair) - metadataCount <- 3 if (!is.null(obj$packages)) { packagesKeyValuePair <- list(name = "packages", value = paste(obj$packages, collapse = ";")) - metadata[[metadataCount]] <- packagesKeyValuePair - metadataCount <- metadataCount + 1 + metadata[[length(metadata) + 1]] <- packagesKeyValuePair } if (!is.null(obj$errorHandling)) { @@ -320,8 +319,7 @@ setHttpTraffic <- function(value = FALSE) { list(name = "errorHandling", value = as.character(obj$errorHandling)) - metadata[[metadataCount]] <- errorHandlingKeyValuePair - metadataCount <- metadataCount + 1 + metadata[[length(metadata) + 1]] <- errorHandlingKeyValuePair } if (!is.null(obj$options$azure$wait)) { @@ -329,8 +327,7 @@ setHttpTraffic <- function(value = FALSE) { list(name = "wait", value = as.character(obj$options$azure$wait)) - metadata[[metadataCount]] <- waitKeyValuePair - metadataCount <- metadataCount + 1 + metadata[[length(metadata) + 1]] <- waitKeyValuePair } retryCounter <- 0 @@ -568,7 +565,7 @@ setHttpTraffic <- function(value = FALSE) { numberOfFailedTasks <- sum(unlist(failTasks)) - if (numberOfFailedTasks > 0) { + if (numberOfFailedTasks > 0 && deleteJob == FALSE) { .createErrorViewerPane(id, failTasks) } From a9eec76f0fae41ab23614334d172b32b9f1e5837 Mon Sep 17 00:00:00 2001 From: zfengms Date: Tue, 5 Dec 2017 19:34:12 -0800 Subject: [PATCH 19/25] add setJobAutoDelete function --- CHANGELOG.md | 10 +++--- NAMESPACE | 1 + R/doAzureParallel.R | 40 +++++++++++++++------ R/jobUtilities.R | 9 ++--- docs/31-long-running-job.md | 8 +++-- man/deleteJob.Rd | 6 +--- man/setJobAutoDelete.Rd | 17 +++++++++ samples/long_running_job/long_running_job.R | 2 +- tests/testthat/test-long-running-job.R | 2 +- 9 files changed, 62 insertions(+), 33 deletions(-) create mode 100644 man/setJobAutoDelete.Rd diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e591535..caaf9ab4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,17 +1,15 @@ # Change Log -## [0.6.2] 2017-11-17 + +## [0.6.1] 2017-12-05 ### Added +- Support for users to use programmatically generated credentials and cluster config - Support for users to delete job and terminate job ### Changed -- [BREAKING CHANGE] when wait = TRUE, both job and job results are deleted at the end of the run +- [BREAKING CHANGE] when wait = TRUE, both job and job results are deleted at the end of the run, set jobAutoComplete to FALSE to keep them - Add retry to get job result - Add errorHandling and wait option to job metadata - Save job metadata to job result storage blob -## [0.6.1] 2017-11-13 -### Added -- Support for users to use programmatically generated credentials and cluster config - ## [0.6.0] 2017-11-03 ### Added - Support for users to run custom versions of R via Docker containers diff --git a/NAMESPACE b/NAMESPACE index 6c9c2e27..0d9bb6fd 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -20,6 +20,7 @@ export(resizeCluster) export(setChunkSize) export(setCredentials) export(setHttpTraffic) +export(setJobAutoDelete) export(setReduce) export(setVerbose) export(stopCluster) diff --git a/R/doAzureParallel.R b/R/doAzureParallel.R index 2aeb9976..e153d7ab 100644 --- a/R/doAzureParallel.R +++ b/R/doAzureParallel.R @@ -73,6 +73,20 @@ setChunkSize <- function(value = 1) { assign("chunkSize", value, envir = .doAzureBatchGlobals) } +#' Specify whether to delete job/jobresult after asychronous job is completed. +#' +#' @param value boolean of TRUE or FALSE +#' +#' @examples +#' setJobAutoDelete(FALSE) +#' @export +setJobAutoDelete <- function(value = TRUE) { + if (!is.logical(value)) + stop("setJobAutoDelete requires a boolean argument") + + assign("jobAutoDelete", value, envir = .doAzureBatchGlobals) +} + #' Apply reduce function on a group of iterations of the foreach loop together per task. #' #' @param fun The number of iterations to group @@ -232,10 +246,16 @@ setHttpTraffic <- function(value = FALSE) { wait <- obj$options$azure$wait } - # by default, delete both job and job result after job is completed (both synchronous and asynchronous) - deleteJob <- TRUE - if (!is.null(obj$options$azure$deleteJob)) { - wait <- obj$options$azure$deleteJob + # by default, delete both job and job result after synchronous job is completed + jobAutoDelete <- TRUE + + if (exists("jobAutoDelete", envir = .doAzureBatchGlobals)) { + jobAutoDelete <- get("jobAutoDelete", envir = .doAzureBatchGlobals) + } + + if (!is.null(obj$options$azure$jobAutoDelete) && + is.logical(obj$options$azure$jobAutoDelete)) { + jobAutoDelete <- obj$options$azure$jobAutoDelete } inputs <- FALSE @@ -288,6 +308,10 @@ setHttpTraffic <- function(value = FALSE) { chunkSize <- 1 + if (exists("chunkSize", envir = .doAzureBatchGlobals)) { + chunkSize <- get("chunkSize", envir = .doAzureBatchGlobals) + } + if (!is.null(obj$options$azure$chunkSize)) { chunkSize <- obj$options$azure$chunkSize } @@ -296,10 +320,6 @@ setHttpTraffic <- function(value = FALSE) { chunkSize <- obj$options$azure$chunksize } - if (exists("chunkSize", envir = .doAzureBatchGlobals)) { - chunkSize <- get("chunkSize", envir = .doAzureBatchGlobals) - } - chunkSizeKeyValuePair <- list(name = "chunkSize", value = as.character(chunkSize)) @@ -565,7 +585,7 @@ setHttpTraffic <- function(value = FALSE) { numberOfFailedTasks <- sum(unlist(failTasks)) - if (numberOfFailedTasks > 0 && deleteJob == FALSE) { + if (numberOfFailedTasks > 0 && jobAutoDelete == FALSE) { .createErrorViewerPane(id, failTasks) } @@ -587,7 +607,7 @@ setHttpTraffic <- function(value = FALSE) { fill = TRUE) # delete job from batch service and job result from storage blob - if (deleteJob) { + if (jobAutoDelete) { deleteJob(id) } diff --git a/R/jobUtilities.R b/R/jobUtilities.R index d278dc4c..ac4c3d64 100644 --- a/R/jobUtilities.R +++ b/R/jobUtilities.R @@ -252,19 +252,14 @@ getJobResult <- function(jobId) { #' Delete a job #' #' @param jobId A job id -#' @param deleteResult TRUE to delete job result in storage blob -#' container, FALSE to keep the result in storage blob container. #' #' @examples #' \dontrun{ #' deleteJob("job-001") -#' deleteJob("job-001", deleteResult = FALSE) #' } #' @export -deleteJob <- function(jobId, deleteResult = TRUE) { - if (deleteResult == TRUE) { - deleteStorageContainer(jobId) - } +deleteJob <- function(jobId) { + deleteStorageContainer(jobId) response <- rAzureBatch::deleteJob(jobId, content = "response") diff --git a/docs/31-long-running-job.md b/docs/31-long-running-job.md index d2272bab..f9a7ebe4 100644 --- a/docs/31-long-running-job.md +++ b/docs/31-long-running-job.md @@ -43,6 +43,8 @@ getJob returns job metadata, such as chunk size, whether cloud combine is enable succeeded: 0 failed: 5 total: 6 + + job state: completed ``` ## Get job list @@ -81,14 +83,14 @@ Once job is completed successfully, you can call getJobResult to retrieve the jo ### Clean up -Once you get the job result, you can delete the job. +Once you get the job result, you can delete the job and its result. ```R deleteJob(jobId) ``` -Please note deleteJob will delete the job at batch service, by default, it also deletes the storage container holding the job result. If you want to keep the job result around, you can set deleteResult parameter to FALSE +Please note deleteJob will delete the job at batch service and the storage container holding the job result. ```R - deleteJob(jobId, deleteResult = FALSE) + deleteJob(jobId) ``` A [working sample](../samples/long_running_job/long_running_job.R) can be found in the samples directory. diff --git a/man/deleteJob.Rd b/man/deleteJob.Rd index 3420e28a..4635a8e2 100644 --- a/man/deleteJob.Rd +++ b/man/deleteJob.Rd @@ -4,13 +4,10 @@ \alias{deleteJob} \title{Delete a job} \usage{ -deleteJob(jobId, deleteResult = TRUE) +deleteJob(jobId) } \arguments{ \item{jobId}{A job id} - -\item{deleteResult}{TRUE to delete job result in storage blob -container, FALSE to keep the result in storage blob container.} } \description{ Delete a job @@ -18,6 +15,5 @@ Delete a job \examples{ \dontrun{ deleteJob("job-001") -deleteJob("job-001", deleteResult = FALSE) } } diff --git a/man/setJobAutoDelete.Rd b/man/setJobAutoDelete.Rd new file mode 100644 index 00000000..e12f1ee0 --- /dev/null +++ b/man/setJobAutoDelete.Rd @@ -0,0 +1,17 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/doAzureParallel.R +\name{setJobAutoDelete} +\alias{setJobAutoDelete} +\title{Specify whether to delete job/jobresult after asychronous job is completed.} +\usage{ +setJobAutoDelete(value = TRUE) +} +\arguments{ +\item{value}{boolean of TRUE or FALSE} +} +\description{ +Specify whether to delete job/jobresult after asychronous job is completed. +} +\examples{ +setJobAutoDelete(FALSE) +} diff --git a/samples/long_running_job/long_running_job.R b/samples/long_running_job/long_running_job.R index e2002be8..c98b003d 100644 --- a/samples/long_running_job/long_running_job.R +++ b/samples/long_running_job/long_running_job.R @@ -65,4 +65,4 @@ jobResult <- getJobResult(jobId) doAzureParallel::stopCluster(cluster) # delete the job -rAzureBatch::deleteJob(jobId) +deleteJob(jobId) diff --git a/tests/testthat/test-long-running-job.R b/tests/testthat/test-long-running-job.R index e92684cd..1c6c8d78 100644 --- a/tests/testthat/test-long-running-job.R +++ b/tests/testthat/test-long-running-job.R @@ -52,6 +52,6 @@ test_that("Long Running Job Test", { testthat::expect_equal(jobResult, list(2, 2, 2, 2)) - # delete the job + # delete the job and its result deleteJob(jobId) }) From 25cd3253042e7fb55c3f2e482fc05f4e996c8cb5 Mon Sep 17 00:00:00 2001 From: zfengms Date: Wed, 6 Dec 2017 12:42:50 -0800 Subject: [PATCH 20/25] rename jobAutoDelete to autoDeleteJob to workaround R bugs and update docs --- NAMESPACE | 2 +- R/doAzureParallel.R | 35 ++++++++------- docs/23-persistent-storage.md | 2 +- docs/40-troubleshooting.md | 13 +++++- docs/42-faq.md | 5 ++- man/setAutoDeleteJob.Rd | 17 ++++++++ man/setJobAutoDelete.Rd | 17 -------- tests/testthat/test-autodeletejob.R | 67 +++++++++++++++++++++++++++++ 8 files changed, 121 insertions(+), 37 deletions(-) create mode 100644 man/setAutoDeleteJob.Rd delete mode 100644 man/setJobAutoDelete.Rd create mode 100644 tests/testthat/test-autodeletejob.R diff --git a/NAMESPACE b/NAMESPACE index 0d9bb6fd..0e0d3134 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -17,10 +17,10 @@ export(listStorageFiles) export(makeCluster) export(registerDoAzureParallel) export(resizeCluster) +export(setAutoDeleteJob) export(setChunkSize) export(setCredentials) export(setHttpTraffic) -export(setJobAutoDelete) export(setReduce) export(setVerbose) export(stopCluster) diff --git a/R/doAzureParallel.R b/R/doAzureParallel.R index e153d7ab..66ad11e2 100644 --- a/R/doAzureParallel.R +++ b/R/doAzureParallel.R @@ -73,18 +73,18 @@ setChunkSize <- function(value = 1) { assign("chunkSize", value, envir = .doAzureBatchGlobals) } -#' Specify whether to delete job/jobresult after asychronous job is completed. +#' Specify whether to delete job and its result after asychronous job is completed. #' #' @param value boolean of TRUE or FALSE #' #' @examples -#' setJobAutoDelete(FALSE) +#' setAutoDeleteJob(FALSE) #' @export -setJobAutoDelete <- function(value = TRUE) { +setAutoDeleteJob <- function(value = TRUE) { if (!is.logical(value)) - stop("setJobAutoDelete requires a boolean argument") + stop("setAutoDeleteJob requires a boolean argument") - assign("jobAutoDelete", value, envir = .doAzureBatchGlobals) + assign("autoDeleteJob", value, envir = .doAzureBatchGlobals) } #' Apply reduce function on a group of iterations of the foreach loop together per task. @@ -247,15 +247,15 @@ setHttpTraffic <- function(value = FALSE) { } # by default, delete both job and job result after synchronous job is completed - jobAutoDelete <- TRUE + autoDeleteJob <- TRUE - if (exists("jobAutoDelete", envir = .doAzureBatchGlobals)) { - jobAutoDelete <- get("jobAutoDelete", envir = .doAzureBatchGlobals) + if (exists("autoDeleteJob", envir = .doAzureBatchGlobals)) { + autoDeleteJob <- get("autoDeleteJob", envir = .doAzureBatchGlobals) } - if (!is.null(obj$options$azure$jobAutoDelete) && - is.logical(obj$options$azure$jobAutoDelete)) { - jobAutoDelete <- obj$options$azure$jobAutoDelete + if (!is.null(obj$options$azure$autoDeleteJob) && + is.logical(obj$options$azure$autoDeleteJob)) { + autoDeleteJob <- obj$options$azure$autoDeleteJob } inputs <- FALSE @@ -585,7 +585,7 @@ setHttpTraffic <- function(value = FALSE) { numberOfFailedTasks <- sum(unlist(failTasks)) - if (numberOfFailedTasks > 0 && jobAutoDelete == FALSE) { + if (numberOfFailedTasks > 0 && autoDeleteJob == FALSE) { .createErrorViewerPane(id, failTasks) } @@ -607,15 +607,18 @@ setHttpTraffic <- function(value = FALSE) { fill = TRUE) # delete job from batch service and job result from storage blob - if (jobAutoDelete) { + if (autoDeleteJob) { deleteJob(id) } if (identical(obj$errorHandling, "stop") && !is.null(errorValue)) { - msg <- sprintf("task %d failed - '%s'", - errorIndex, - conditionMessage(errorValue)) + msg <- + sprintf( + "task %d failed - '%s'.\r\nBy default job and its result is deleted after run is over, use setAutoDeleteJob(FALSE) or autoDeleteJob = FALSE option to keep them for investigation.", + errorIndex, + conditionMessage(errorValue) + ) stop(simpleError(msg, call = expr)) } else { diff --git a/docs/23-persistent-storage.md b/docs/23-persistent-storage.md index 1e21b428..8c4c60d6 100644 --- a/docs/23-persistent-storage.md +++ b/docs/23-persistent-storage.md @@ -22,7 +22,7 @@ By default, *wait* is set to TRUE. This blocks the R session. By setting *wait* ## Getting results from storage -When the user is ready to get their results in a new session, the user use the following command: +When the user is ready to get their results in a new session, the user uses the following command: ```R my_job_id <- "my_unique_job_id" diff --git a/docs/40-troubleshooting.md b/docs/40-troubleshooting.md index e105d01c..bee49c3f 100644 --- a/docs/40-troubleshooting.md +++ b/docs/40-troubleshooting.md @@ -67,7 +67,18 @@ This issue is due to certain compiler flags not available in the default version Since doAzureParallel uses Microsoft R Open version 3.3 as the default version of R, it will automatically try to pull package from [MRAN](https://mran.microsoft.com/) rather than CRAN. This is a big benefit when wanting to use a constant version of a package but does not always contain references to the latest versions. To use a specific version from CRAN or a different MRAN snapshot date, use the [command line](./30-customize-cluster.md#running-commands-when-the-cluster-starts) in the cluster configuration to manually install the packages you need. ## Viewing files from Azure Storage -In every foreach run, the job will push its logs into Azure Storage that can be fetched by the user. For more information on reading log files, check out [managing storage](./41-managing-storage-via-R.md). +In every foreach run, the job will push its logs into Azure Storage that can be fetched by the user. For more information on reading log files, check out [managing storage](./41-managing-storage-via-R.md). + +By default, when wait is set to TRUE, job and its result is automatically deleted after the run is completed. To keep the job and its result for investigation purpose, you can set a global environment setting or specify an option in foreach loop to keep it. + +```R +# This will set a global setting to keep job and its result after run is completed. +setAutoDeleteJob(FALSE) + +# This will keep job and its result at each job level after run is completed. +options <- list(autoDeleteJob = FALSE) +foreach::foreach(i = 1:4, .options.azure = opt) %dopar% { ... } +``` ## Viewing files directly from compute node Cluster setup logs are not persisted. `getClusterFile` function will fetch any files including stdout and stderr log files in the cluster. This is particularly useful for users that utilizing [customize script](./30-customize-cluster.md) on their nodes and installing specific [packages](./20-package-management.md). diff --git a/docs/42-faq.md b/docs/42-faq.md index afbb63ea..6881a8c8 100644 --- a/docs/42-faq.md +++ b/docs/42-faq.md @@ -19,4 +19,7 @@ Yes. The [command line](./30-customize-cluster.md#running-commands-when-the-clus No. doAzureParallel is built on top of the Linux CentOS distribution and will not work with Windows-specific packages. ## Why am I getting the error: could not find function "startsWith"? -doAzureParallel requires you to run R 3.3 or greater on you local machine. \ No newline at end of file +doAzureParallel requires you to run R 3.3 or greater on you local machine. + +## My job failed but I can't find my job and its result? +if you set wait = TRUE, job and its result is automatically deleted, to keep them for investigation purpose, you can set global option using setAutoDeleteJob(FALSE), or use autoDeleteJob option at foreach level. \ No newline at end of file diff --git a/man/setAutoDeleteJob.Rd b/man/setAutoDeleteJob.Rd new file mode 100644 index 00000000..1aec0d62 --- /dev/null +++ b/man/setAutoDeleteJob.Rd @@ -0,0 +1,17 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/doAzureParallel.R +\name{setAutoDeleteJob} +\alias{setAutoDeleteJob} +\title{Specify whether to delete job and its result after asychronous job is completed.} +\usage{ +setAutoDeleteJob(value = TRUE) +} +\arguments{ +\item{value}{boolean of TRUE or FALSE} +} +\description{ +Specify whether to delete job and its result after asychronous job is completed. +} +\examples{ +setAutoDeleteJob(FALSE) +} diff --git a/man/setJobAutoDelete.Rd b/man/setJobAutoDelete.Rd deleted file mode 100644 index e12f1ee0..00000000 --- a/man/setJobAutoDelete.Rd +++ /dev/null @@ -1,17 +0,0 @@ -% Generated by roxygen2: do not edit by hand -% Please edit documentation in R/doAzureParallel.R -\name{setJobAutoDelete} -\alias{setJobAutoDelete} -\title{Specify whether to delete job/jobresult after asychronous job is completed.} -\usage{ -setJobAutoDelete(value = TRUE) -} -\arguments{ -\item{value}{boolean of TRUE or FALSE} -} -\description{ -Specify whether to delete job/jobresult after asychronous job is completed. -} -\examples{ -setJobAutoDelete(FALSE) -} diff --git a/tests/testthat/test-autodeletejob.R b/tests/testthat/test-autodeletejob.R new file mode 100644 index 00000000..bf6c2643 --- /dev/null +++ b/tests/testthat/test-autodeletejob.R @@ -0,0 +1,67 @@ +# Run this test for users to make sure the autodeletejob feature +# of doAzureParallel is still working +context("auto delete job scenario test") +test_that("auto delete job as foreach option test", { + testthat::skip("Live test") + testthat::skip_on_travis() + credentialsFileName <- "credentials.json" + clusterFileName <- "cluster.json" + + doAzureParallel::generateCredentialsConfig(credentialsFileName) + doAzureParallel::generateClusterConfig(clusterFileName) + + doAzureParallel::setCredentials(credentialsFileName) + cluster <- doAzureParallel::makeCluster(clusterFileName) + doAzureParallel::registerDoAzureParallel(cluster) + + # use autoDeleteJob flag to keep the job and its result + '%dopar%' <- foreach::'%dopar%' + res <- + foreach::foreach(i = 1:10, + .options.azure = list(autoDeleteJob = FALSE)) %dopar% { + i + } + + testthat::expect_equal(length(res), + 10) + + for (i in 1:10) { + testthat::expect_equal(res[[i]], + i) + } + + # find the job id from the output of above command and call deleteJob(jobId) when you no longer need the job and its result +}) + +test_that("auto delete job as global setting test", { + testthat::skip("Live test") + testthat::skip_on_travis() + credentialsFileName <- "credentials.json" + clusterFileName <- "cluster.json" + + doAzureParallel::generateCredentialsConfig(credentialsFileName) + doAzureParallel::generateClusterConfig(clusterFileName) + + doAzureParallel::setCredentials(credentialsFileName) + cluster <- doAzureParallel::makeCluster(clusterFileName) + doAzureParallel::registerDoAzureParallel(cluster) + + # set autoDeleteJob flag to FALSE to keep the job and its result + setAutoDeleteJob(FALSE) + + '%dopar%' <- foreach::'%dopar%' + res <- + foreach::foreach(i = 1:10) %dopar% { + i + } + + testthat::expect_equal(length(res), + 10) + + for (i in 1:10) { + testthat::expect_equal(res[[i]], + i) + } + + # find the job id from the output of above command and call deleteJob(jobId) when you no longer need the job and its result +}) From 9429b42fae297f467cdcb68f6a1c104707734739 Mon Sep 17 00:00:00 2001 From: zfengms Date: Wed, 6 Dec 2017 12:46:52 -0800 Subject: [PATCH 21/25] update faq --- docs/42-faq.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/42-faq.md b/docs/42-faq.md index 6881a8c8..71d2d162 100644 --- a/docs/42-faq.md +++ b/docs/42-faq.md @@ -22,4 +22,7 @@ No. doAzureParallel is built on top of the Linux CentOS distribution and will no doAzureParallel requires you to run R 3.3 or greater on you local machine. ## My job failed but I can't find my job and its result? -if you set wait = TRUE, job and its result is automatically deleted, to keep them for investigation purpose, you can set global option using setAutoDeleteJob(FALSE), or use autoDeleteJob option at foreach level. \ No newline at end of file +if you set wait = TRUE, job and its result is automatically deleted, to keep them for investigation purpose, you can set global option using setAutoDeleteJob(FALSE), or use autoDeleteJob option at foreach level. + +## How do I cancel a job? +You can call terminateJob(jobId) to cancel a job. From 4c757e384b6ff8e6ab02cf53dcfd6067021853e5 Mon Sep 17 00:00:00 2001 From: zfengms Date: Wed, 6 Dec 2017 13:04:15 -0800 Subject: [PATCH 22/25] fix styling issues --- R/doAzureParallel.R | 5 ++++- tests/testthat/test-autodeletejob.R | 3 ++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/R/doAzureParallel.R b/R/doAzureParallel.R index 66ad11e2..d0841370 100644 --- a/R/doAzureParallel.R +++ b/R/doAzureParallel.R @@ -615,7 +615,10 @@ setHttpTraffic <- function(value = FALSE) { !is.null(errorValue)) { msg <- sprintf( - "task %d failed - '%s'.\r\nBy default job and its result is deleted after run is over, use setAutoDeleteJob(FALSE) or autoDeleteJob = FALSE option to keep them for investigation.", + paste0( + "task %d failed - '%s'.\r\nBy default job and its result is deleted after run is over, use", + " setAutoDeleteJob(FALSE) or autoDeleteJob = FALSE option to keep them for investigation." + ), errorIndex, conditionMessage(errorValue) ) diff --git a/tests/testthat/test-autodeletejob.R b/tests/testthat/test-autodeletejob.R index bf6c2643..7fd9c2e6 100644 --- a/tests/testthat/test-autodeletejob.R +++ b/tests/testthat/test-autodeletejob.R @@ -63,5 +63,6 @@ test_that("auto delete job as global setting test", { i) } - # find the job id from the output of above command and call deleteJob(jobId) when you no longer need the job and its result + # find the job id from the output of above command and call + # deleteJob(jobId) when you no longer need the job and its result }) From 0b0423c2d5b039fb35ff2cd2fdee51f1adb4f90d Mon Sep 17 00:00:00 2001 From: zfengms Date: Wed, 6 Dec 2017 13:11:44 -0800 Subject: [PATCH 23/25] more styling fix --- tests/testthat/test-autodeletejob.R | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/testthat/test-autodeletejob.R b/tests/testthat/test-autodeletejob.R index 7fd9c2e6..6b142c02 100644 --- a/tests/testthat/test-autodeletejob.R +++ b/tests/testthat/test-autodeletejob.R @@ -30,7 +30,8 @@ test_that("auto delete job as foreach option test", { i) } - # find the job id from the output of above command and call deleteJob(jobId) when you no longer need the job and its result + # find the job id from the output of above command and call + # deleteJob(jobId) when you no longer need the job and its result }) test_that("auto delete job as global setting test", { From b58c6478ae246edd1f5dd3fb6c491693b32ffa06 Mon Sep 17 00:00:00 2001 From: zfengms Date: Wed, 6 Dec 2017 14:54:47 -0800 Subject: [PATCH 24/25] roll back manual update to DESCRIPTION --- DESCRIPTION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DESCRIPTION b/DESCRIPTION index 0ec4e325..f9e7cb52 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: doAzureParallel Type: Package Title: doAzureParallel -Version: 0.6.2 +Version: 0.6.0 Author: Brian Hoang Maintainer: Brian Hoang Description: The project is for data experts who use R at scale. The project From 3457c9b8a0574d1fee3fa8a4e04b55a2e338704e Mon Sep 17 00:00:00 2001 From: zfengms Date: Wed, 6 Dec 2017 15:06:34 -0800 Subject: [PATCH 25/25] add namespace to api call --- tests/testthat/test-long-running-job.R | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/testthat/test-long-running-job.R b/tests/testthat/test-long-running-job.R index 1c6c8d78..461ddf2a 100644 --- a/tests/testthat/test-long-running-job.R +++ b/tests/testthat/test-long-running-job.R @@ -28,21 +28,21 @@ test_that("Long Running Job Test", { mean(1:3) } - job <- getJob(jobId) + job <- doAzureParallel::getJob(jobId) # get active/running job list filter <- filter <- list() filter$state <- c("active", "completed") - getJobList(filter) + doAzureParallel::getJobList(filter) # get job list for all jobs - getJobList() + doAzureParallel::getJobList() # wait 2 minutes for job to finish Sys.sleep(120) # get job result - jobResult <- getJobResult(jobId) + jobResult <- doAzureParallel::getJobResult(jobId) doAzureParallel::stopCluster(cluster) # verify the job result is correct @@ -53,5 +53,5 @@ test_that("Long Running Job Test", { list(2, 2, 2, 2)) # delete the job and its result - deleteJob(jobId) + doAzureParallel::deleteJob(jobId) })