Skip to content
This repository was archived by the owner on Oct 12, 2023. It is now read-only.

Commit afde92f

Browse files
authored
Fix/add task perf (#195)
* Added task id range * Removed upload blob methods * Removed upload blob * Fixed trailing whitespace * Discarded job id on merge task id name * Adding chunk logic for argsList * Added check for args containing data sets * Removed container name for docker run command for all tasks * Added test for hasDataSet * Fix travis yml * Adding before_install for R * Removed before install, added github package of nycflights13
1 parent 9d50403 commit afde92f

File tree

8 files changed

+94
-35
lines changed

8 files changed

+94
-35
lines changed

.travis.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ warnings_are_errors: false
88
r_github_packages:
99
- Azure/rAzureBatch
1010
- jimhester/lintr
11+
- hadley/nycflights13

R/doAzureParallel.R

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,12 @@ setHttpTraffic <- function(value = FALSE) {
224224
assign("bioconductor", bioconductorPackages, .doAzureBatchGlobals)
225225
assign("pkgName", pkgName, .doAzureBatchGlobals)
226226

227+
isDataSet <- hasDataSet(argsList)
228+
229+
if (!isDataSet) {
230+
assign("argsList", argsList, .doAzureBatchGlobals)
231+
}
232+
227233
if (!is.null(obj$options$azure$job)) {
228234
id <- obj$options$azure$job
229235
}
@@ -528,18 +534,26 @@ setHttpTraffic <- function(value = FALSE) {
528534
tasks <- lapply(1:length(endIndices), function(i) {
529535
startIndex <- startIndices[i]
530536
endIndex <- endIndices[i]
531-
taskId <- paste0(id, "-task", i)
537+
taskId <- as.character(i)
538+
539+
args <- NULL
540+
if (isDataSet) {
541+
args <- argsList[startIndex:endIndex]
542+
}
532543

533544
.addTask(
534545
jobId = id,
535546
taskId = taskId,
536547
rCommand = sprintf(
537-
"Rscript --vanilla --verbose $AZ_BATCH_JOB_PREP_WORKING_DIR/worker.R > $AZ_BATCH_TASK_ID.txt"),
538-
args = argsList[startIndex:endIndex],
548+
"Rscript --vanilla --verbose $AZ_BATCH_JOB_PREP_WORKING_DIR/worker.R %i %i %i > $AZ_BATCH_TASK_ID.txt",
549+
startIndex,
550+
endIndex,
551+
isDataSet),
539552
envir = .doAzureBatchGlobals,
540553
packages = obj$packages,
541554
outputFiles = obj$options$azure$outputFiles,
542-
containerImage = data$containerImage
555+
containerImage = data$containerImage,
556+
args = args
543557
)
544558

545559
cat("\r", sprintf("Submitting tasks (%s/%s)", i, length(endIndices)), sep = "")
@@ -548,14 +562,11 @@ setHttpTraffic <- function(value = FALSE) {
548562
return(taskId)
549563
})
550564

551-
rAzureBatch::updateJob(id)
552-
553565
if (enableCloudCombine) {
554566
cat("\nSubmitting merge task")
555-
mergeTaskId <- paste0(id, "-merge")
556567
.addTask(
557568
jobId = id,
558-
taskId = mergeTaskId,
569+
taskId = "merge",
559570
rCommand = sprintf(
560571
"Rscript --vanilla --verbose $AZ_BATCH_JOB_PREP_WORKING_DIR/merger.R %s %s %s > $AZ_BATCH_TASK_ID.txt",
561572
length(tasks),
@@ -564,14 +575,17 @@ setHttpTraffic <- function(value = FALSE) {
564575
),
565576
envir = .doAzureBatchGlobals,
566577
packages = obj$packages,
567-
dependsOn = tasks,
578+
dependsOn = list(taskIdRanges = list(list(start = 1, end = length(tasks)))),
568579
cloudCombine = cloudCombine,
569580
outputFiles = obj$options$azure$outputFiles,
570581
containerImage = data$containerImage
571582
)
572583
cat(". . .")
573584
}
574585

586+
# Updating the job to terminate after all tasks are completed
587+
rAzureBatch::updateJob(id)
588+
575589
if (wait) {
576590
if (!is.null(obj$packages) ||
577591
!is.null(githubPackages) ||
@@ -588,7 +602,7 @@ setHttpTraffic <- function(value = FALSE) {
588602
response <-
589603
rAzureBatch::downloadBlob(
590604
id,
591-
paste0("result/", id, "-merge-result.rds"),
605+
paste0("result/", "merge-result.rds"),
592606
sasToken = sasToken,
593607
accountName = storageCredentials$name,
594608
downloadPath = tempFile,

R/helpers.R

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,27 @@
33

44
args <- list(...)
55
.doAzureBatchGlobals <- args$envir
6-
argsList <- args$args
76
dependsOn <- args$dependsOn
7+
argsList <- args$args
88
cloudCombine <- args$cloudCombine
99
userOutputFiles <- args$outputFiles
1010
containerImage <- args$containerImage
1111

1212
resultFile <- paste0(taskId, "-result", ".rds")
1313
accountName <- storageCredentials$name
1414

15+
resourceFiles <- NULL
1516
if (!is.null(argsList)) {
16-
assign("argsList", argsList, .doAzureBatchGlobals)
17+
envFile <- paste0(taskId, ".rds")
18+
saveRDS(argsList, file = envFile)
19+
rAzureBatch::uploadBlob(jobId, file.path(getwd(), envFile))
20+
file.remove(envFile)
21+
22+
readToken <- rAzureBatch::createSasToken("r", "c", jobId)
23+
envFileUrl <-
24+
rAzureBatch::createBlobUrl(storageCredentials$name, jobId, envFile, readToken)
25+
resourceFiles <-
26+
list(rAzureBatch::createResourceFile(url = envFileUrl, fileName = envFile))
1727
}
1828

1929
# Only use the download command if cloudCombine is enabled
@@ -34,22 +44,9 @@
3444
commands <- c(downloadCommand)
3545
}
3646

37-
envFile <- paste0(taskId, ".rds")
38-
saveRDS(argsList, file = envFile)
39-
rAzureBatch::uploadBlob(jobId, paste0(getwd(), "/", envFile))
40-
file.remove(envFile)
41-
42-
sasToken <- rAzureBatch::createSasToken("r", "c", jobId)
43-
writeToken <- rAzureBatch::createSasToken("w", "c", jobId)
44-
45-
envFileUrl <-
46-
rAzureBatch::createBlobUrl(storageCredentials$name, jobId, envFile, sasToken)
47-
resourceFiles <-
48-
list(rAzureBatch::createResourceFile(url = envFileUrl, fileName = envFile))
49-
5047
exitConditions <- NULL
5148
if (!is.null(args$dependsOn)) {
52-
dependsOn <- list(taskIds = dependsOn)
49+
dependsOn <- args$dependsOn
5350
}
5451
else {
5552
exitConditions <- list(default = list(dependencyAction = "satisfy"))
@@ -59,7 +56,7 @@
5956
rAzureBatch::createBlobUrl(
6057
storageAccount = storageCredentials$name,
6158
containerName = jobId,
62-
sasToken = writeToken
59+
sasToken = rAzureBatch::createSasToken("w", "c", jobId)
6360
)
6461

6562
outputFiles <- list(
@@ -101,7 +98,7 @@
10198

10299
commands <-
103100
c(commands,
104-
dockerRunCommand(containerImage, rCommand, taskId))
101+
dockerRunCommand(containerImage, rCommand))
105102

106103
commands <- linuxWrapCommands(commands)
107104

R/jobUtilities.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,7 @@ waitForTasksToComplete <-
464464
# Wait for merge task to complete
465465
repeat {
466466
# Verify that the merge cloud task didn't have any errors
467-
mergeTask <- rAzureBatch::getTask(jobId, paste0(jobId, "-merge"))
467+
mergeTask <- rAzureBatch::getTask(jobId, "merge")
468468

469469
# This test needs to go first as Batch service will not return an execution info as null
470470
if (is.null(mergeTask$executionInfo$result)) {

R/utility.R

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,3 +243,16 @@ readMetadataBlob <- function(jobId) {
243243
areShallowEqual <- function(a, b) {
244244
!is.null(a) && !is.null(b) && a == b
245245
}
246+
247+
hasDataSet <- function(list) {
248+
if (length(list) > 0) {
249+
for (arg in list[[1]]) {
250+
# Data frames are shown as list in the foreach iterator
251+
if (typeof(arg) == "list") {
252+
return(TRUE)
253+
}
254+
}
255+
}
256+
257+
return(FALSE)
258+
}

inst/startup/merger.R

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ chunkSize <- as.integer(args[2])
1818
errorHandling <- args[3]
1919

2020
batchJobId <- Sys.getenv("AZ_BATCH_JOB_ID")
21+
batchTaskId <- Sys.getenv("AZ_BATCH_TASK_ID")
2122
batchJobPreparationDirectory <-
2223
Sys.getenv("AZ_BATCH_JOB_PREP_WORKING_DIR")
2324
batchTaskWorkingDirectory <- Sys.getenv("AZ_BATCH_TASK_WORKING_DIR")
@@ -75,14 +76,14 @@ if (typeof(cloudCombine) == "list" && enableCloudCombine) {
7576
}
7677

7778
for (t in 1:length(task)) {
78-
results[count] <- task[t]
79+
results[[count]] <- task[[t]]
7980
count <- count + 1
8081
}
8182
}
8283

8384
saveRDS(results, file = file.path(
8485
batchTaskWorkingDirectory,
85-
paste0(batchJobId, "-merge-result.rds")
86+
paste0(batchTaskId, "-result.rds")
8687
))
8788
}
8889
else if (errorHandling == "pass") {
@@ -111,7 +112,7 @@ if (typeof(cloudCombine) == "list" && enableCloudCombine) {
111112

112113
saveRDS(results, file = file.path(
113114
batchTaskWorkingDirectory,
114-
paste0(batchJobId, "-merge-result.rds")
115+
paste0(batchTaskId, "-result.rds")
115116
))
116117
}
117118

inst/startup/worker.R

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22
args <- commandArgs(trailingOnly = TRUE)
33
workerErrorStatus <- 0
44

5+
startIndex <- as.integer(args[1])
6+
endIndex <- as.integer(args[2])
7+
isDataSet <- as.logical(as.integer(args[3]))
8+
59
jobPrepDirectory <- Sys.getenv("AZ_BATCH_JOB_PREP_WORKING_DIR")
610
.libPaths(c(
711
jobPrepDirectory,
@@ -68,7 +72,12 @@ setwd(batchTaskWorkingDirectory)
6872

6973
azbatchenv <-
7074
readRDS(paste0(batchJobPreparationDirectory, "/", batchJobEnvironment))
71-
taskArgs <- readRDS(batchTaskEnvironment)
75+
76+
if (isDataSet) {
77+
argsList <- readRDS(batchTaskEnvironment)
78+
} else {
79+
argsList <- azbatchenv$argsList[startIndex:endIndex]
80+
}
7281

7382
for (package in azbatchenv$packages) {
7483
library(package, character.only = TRUE)
@@ -83,7 +92,7 @@ if (!is.null(azbatchenv$inputs)) {
8392
options("az_config" = list(container = azbatchenv$inputs))
8493
}
8594

86-
result <- lapply(taskArgs, function(args) {
95+
result <- lapply(argsList, function(args) {
8796
tryCatch({
8897
lapply(names(args), function(n)
8998
assign(n, args[[n]], pos = azbatchenv$exportenv))
@@ -99,7 +108,7 @@ result <- lapply(taskArgs, function(args) {
99108
})
100109
})
101110

102-
if (!is.null(azbatchenv$gather) && length(taskArgs) > 1) {
111+
if (!is.null(azbatchenv$gather) && length(argsList) > 1) {
103112
result <- Reduce(azbatchenv$gather, result)
104113
}
105114

tests/testthat/test-hasdataset.R

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
if (requireNamespace("nycflights13", quietly = TRUE)) {
2+
context("hasDataSet function")
3+
4+
test_that("Arguments contains data set", {
5+
byCarrierList <- split(nycflights13::flights, nycflights13::flights$carrier)
6+
it <- iterators::iter(byCarrierList)
7+
argsList <- as.list(it)
8+
9+
hasDataSet <- hasDataSet(argsList)
10+
11+
expect_equal(hasDataSet, TRUE)
12+
})
13+
14+
test_that("Arguments does not contain data set", {
15+
args <- seq(1:10)
16+
it <- iterators::iter(args)
17+
argsList <- as.list(it)
18+
19+
hasDataSet <- hasDataSet(argsList)
20+
21+
expect_equal(hasDataSet, FALSE)
22+
})
23+
24+
}

0 commit comments

Comments
 (0)