Skip to content
This repository was archived by the owner on Oct 12, 2023. It is now read-only.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ warnings_are_errors: false
r_github_packages:
- Azure/rAzureBatch
- jimhester/lintr
- hadley/nycflights13
34 changes: 24 additions & 10 deletions R/doAzureParallel.R
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,12 @@ setHttpTraffic <- function(value = FALSE) {
assign("bioconductor", bioconductorPackages, .doAzureBatchGlobals)
assign("pkgName", pkgName, .doAzureBatchGlobals)

isDataSet <- hasDataSet(argsList)

if (!isDataSet) {
assign("argsList", argsList, .doAzureBatchGlobals)
}

if (!is.null(obj$options$azure$job)) {
id <- obj$options$azure$job
}
Expand Down Expand Up @@ -528,18 +534,26 @@ setHttpTraffic <- function(value = FALSE) {
tasks <- lapply(1:length(endIndices), function(i) {
startIndex <- startIndices[i]
endIndex <- endIndices[i]
taskId <- paste0(id, "-task", i)
taskId <- as.character(i)

args <- NULL
if (isDataSet) {
args <- argsList[startIndex:endIndex]
}

.addTask(
jobId = id,
taskId = taskId,
rCommand = sprintf(
"Rscript --vanilla --verbose $AZ_BATCH_JOB_PREP_WORKING_DIR/worker.R > $AZ_BATCH_TASK_ID.txt"),
args = argsList[startIndex:endIndex],
"Rscript --vanilla --verbose $AZ_BATCH_JOB_PREP_WORKING_DIR/worker.R %i %i %i > $AZ_BATCH_TASK_ID.txt",
startIndex,
endIndex,
isDataSet),
envir = .doAzureBatchGlobals,
packages = obj$packages,
outputFiles = obj$options$azure$outputFiles,
containerImage = data$containerImage
containerImage = data$containerImage,
args = args
)

cat("\r", sprintf("Submitting tasks (%s/%s)", i, length(endIndices)), sep = "")
Expand All @@ -548,14 +562,11 @@ setHttpTraffic <- function(value = FALSE) {
return(taskId)
})

rAzureBatch::updateJob(id)

if (enableCloudCombine) {
cat("\nSubmitting merge task")
mergeTaskId <- paste0(id, "-merge")
.addTask(
jobId = id,
taskId = mergeTaskId,
taskId = "merge",
rCommand = sprintf(
"Rscript --vanilla --verbose $AZ_BATCH_JOB_PREP_WORKING_DIR/merger.R %s %s %s > $AZ_BATCH_TASK_ID.txt",
length(tasks),
Expand All @@ -564,14 +575,17 @@ setHttpTraffic <- function(value = FALSE) {
),
envir = .doAzureBatchGlobals,
packages = obj$packages,
dependsOn = tasks,
dependsOn = list(taskIdRanges = list(list(start = 1, end = length(tasks)))),
cloudCombine = cloudCombine,
outputFiles = obj$options$azure$outputFiles,
containerImage = data$containerImage
)
cat(". . .")
}

# Updating the job to terminate after all tasks are completed
rAzureBatch::updateJob(id)

if (wait) {
if (!is.null(obj$packages) ||
!is.null(githubPackages) ||
Expand All @@ -588,7 +602,7 @@ setHttpTraffic <- function(value = FALSE) {
response <-
rAzureBatch::downloadBlob(
id,
paste0("result/", id, "-merge-result.rds"),
paste0("result/", "merge-result.rds"),
sasToken = sasToken,
accountName = storageCredentials$name,
downloadPath = tempFile,
Expand Down
33 changes: 15 additions & 18 deletions R/helpers.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,27 @@

args <- list(...)
.doAzureBatchGlobals <- args$envir
argsList <- args$args
dependsOn <- args$dependsOn
argsList <- args$args
cloudCombine <- args$cloudCombine
userOutputFiles <- args$outputFiles
containerImage <- args$containerImage

resultFile <- paste0(taskId, "-result", ".rds")
accountName <- storageCredentials$name

resourceFiles <- NULL
if (!is.null(argsList)) {
assign("argsList", argsList, .doAzureBatchGlobals)
envFile <- paste0(taskId, ".rds")
saveRDS(argsList, file = envFile)
rAzureBatch::uploadBlob(jobId, file.path(getwd(), envFile))
file.remove(envFile)

readToken <- rAzureBatch::createSasToken("r", "c", jobId)
envFileUrl <-
rAzureBatch::createBlobUrl(storageCredentials$name, jobId, envFile, readToken)
resourceFiles <-
list(rAzureBatch::createResourceFile(url = envFileUrl, fileName = envFile))
}

# Only use the download command if cloudCombine is enabled
Expand All @@ -34,22 +44,9 @@
commands <- c(downloadCommand)
}

envFile <- paste0(taskId, ".rds")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I might be missing something here, but don't we still need to upload a single argument blob to storage? Is that what was indended with the azbatchenv environment in the job prep? Are there any cases where this object gets too big will not work? Would we ever need to chunk it up?

saveRDS(argsList, file = envFile)
rAzureBatch::uploadBlob(jobId, paste0(getwd(), "/", envFile))
file.remove(envFile)

sasToken <- rAzureBatch::createSasToken("r", "c", jobId)
writeToken <- rAzureBatch::createSasToken("w", "c", jobId)

envFileUrl <-
rAzureBatch::createBlobUrl(storageCredentials$name, jobId, envFile, sasToken)
resourceFiles <-
list(rAzureBatch::createResourceFile(url = envFileUrl, fileName = envFile))

exitConditions <- NULL
if (!is.null(args$dependsOn)) {
dependsOn <- list(taskIds = dependsOn)
dependsOn <- args$dependsOn
}
else {
exitConditions <- list(default = list(dependencyAction = "satisfy"))
Expand All @@ -59,7 +56,7 @@
rAzureBatch::createBlobUrl(
storageAccount = storageCredentials$name,
containerName = jobId,
sasToken = writeToken
sasToken = rAzureBatch::createSasToken("w", "c", jobId)
)

outputFiles <- list(
Expand Down Expand Up @@ -101,7 +98,7 @@

commands <-
c(commands,
dockerRunCommand(containerImage, rCommand, taskId))
dockerRunCommand(containerImage, rCommand))

commands <- linuxWrapCommands(commands)

Expand Down
2 changes: 1 addition & 1 deletion R/jobUtilities.R
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ waitForTasksToComplete <-
# Wait for merge task to complete
repeat {
# Verify that the merge cloud task didn't have any errors
mergeTask <- rAzureBatch::getTask(jobId, paste0(jobId, "-merge"))
mergeTask <- rAzureBatch::getTask(jobId, "merge")

# This test needs to go first as Batch service will not return an execution info as null
if (is.null(mergeTask$executionInfo$result)) {
Expand Down
13 changes: 13 additions & 0 deletions R/utility.R
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,16 @@ readMetadataBlob <- function(jobId) {
areShallowEqual <- function(a, b) {
!is.null(a) && !is.null(b) && a == b
}

hasDataSet <- function(list) {
if (length(list) > 0) {
for (arg in list[[1]]) {
# Data frames are shown as list in the foreach iterator
if (typeof(arg) == "list") {
return(TRUE)
}
}
}

return(FALSE)
}
7 changes: 4 additions & 3 deletions inst/startup/merger.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ chunkSize <- as.integer(args[2])
errorHandling <- args[3]

batchJobId <- Sys.getenv("AZ_BATCH_JOB_ID")
batchTaskId <- Sys.getenv("AZ_BATCH_TASK_ID")
batchJobPreparationDirectory <-
Sys.getenv("AZ_BATCH_JOB_PREP_WORKING_DIR")
batchTaskWorkingDirectory <- Sys.getenv("AZ_BATCH_TASK_WORKING_DIR")
Expand Down Expand Up @@ -75,14 +76,14 @@ if (typeof(cloudCombine) == "list" && enableCloudCombine) {
}

for (t in 1:length(task)) {
results[count] <- task[t]
results[[count]] <- task[[t]]
count <- count + 1
}
}

saveRDS(results, file = file.path(
batchTaskWorkingDirectory,
paste0(batchJobId, "-merge-result.rds")
paste0(batchTaskId, "-result.rds")
))
}
else if (errorHandling == "pass") {
Expand Down Expand Up @@ -111,7 +112,7 @@ if (typeof(cloudCombine) == "list" && enableCloudCombine) {

saveRDS(results, file = file.path(
batchTaskWorkingDirectory,
paste0(batchJobId, "-merge-result.rds")
paste0(batchTaskId, "-result.rds")
))
}

Expand Down
15 changes: 12 additions & 3 deletions inst/startup/worker.R
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
args <- commandArgs(trailingOnly = TRUE)
workerErrorStatus <- 0

startIndex <- as.integer(args[1])
endIndex <- as.integer(args[2])
isDataSet <- as.logical(as.integer(args[3]))

jobPrepDirectory <- Sys.getenv("AZ_BATCH_JOB_PREP_WORKING_DIR")
.libPaths(c(
jobPrepDirectory,
Expand Down Expand Up @@ -68,7 +72,12 @@ setwd(batchTaskWorkingDirectory)

azbatchenv <-
readRDS(paste0(batchJobPreparationDirectory, "/", batchJobEnvironment))
taskArgs <- readRDS(batchTaskEnvironment)

if (isDataSet) {
argsList <- readRDS(batchTaskEnvironment)
} else {
argsList <- azbatchenv$argsList[startIndex:endIndex]
}

for (package in azbatchenv$packages) {
library(package, character.only = TRUE)
Expand All @@ -83,7 +92,7 @@ if (!is.null(azbatchenv$inputs)) {
options("az_config" = list(container = azbatchenv$inputs))
}

result <- lapply(taskArgs, function(args) {
result <- lapply(argsList, function(args) {
tryCatch({
lapply(names(args), function(n)
assign(n, args[[n]], pos = azbatchenv$exportenv))
Expand All @@ -99,7 +108,7 @@ result <- lapply(taskArgs, function(args) {
})
})

if (!is.null(azbatchenv$gather) && length(taskArgs) > 1) {
if (!is.null(azbatchenv$gather) && length(argsList) > 1) {
result <- Reduce(azbatchenv$gather, result)
}

Expand Down
24 changes: 24 additions & 0 deletions tests/testthat/test-hasdataset.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
if (requireNamespace("nycflights13", quietly = TRUE)) {
context("hasDataSet function")

test_that("Arguments contains data set", {
byCarrierList <- split(nycflights13::flights, nycflights13::flights$carrier)
it <- iterators::iter(byCarrierList)
argsList <- as.list(it)

hasDataSet <- hasDataSet(argsList)

expect_equal(hasDataSet, TRUE)
})

test_that("Arguments does not contain data set", {
args <- seq(1:10)
it <- iterators::iter(args)
argsList <- as.list(it)

hasDataSet <- hasDataSet(argsList)

expect_equal(hasDataSet, FALSE)
})

}