Skip to content
This repository was archived by the owner on Oct 12, 2023. It is now read-only.

Commit 852dba0

Browse files
authored
Improvement on merge task performance (#223)
* Added doParallel support * Renamed txt file * Fixed lintr * Restructured merger script * Removed some error handling cases * Fixed syntax * Renamed error handling test * Added accumulator * Using filter on function * Proper filtering of tasks * Fixed merge naming * Added error handling for worker, separate merge task function * Added buckets * Added addSubMergeTask * Added merge sub task functions * Fixing file names * Fixed sorting order for merger * Added space * Merger in R * Clean up merger worker script * Added mergeSize option * By default one bucket * Removed merge size flag * Fixed test * Fixed lint code * Fixed more lintr issues * Fixed lintr * Fixed the added comments * Fixed the if statement * Add list combine function validation * Removed verification * Fixed lintr
1 parent 1b60e47 commit 852dba0

File tree

7 files changed

+515
-134
lines changed

7 files changed

+515
-134
lines changed

R/helpers.R renamed to R/batchHelperFunctions.R

Lines changed: 179 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,182 @@
1+
addFinalMergeTask <- function(jobId, taskId, rCommand, ...){
2+
storageCredentials <- rAzureBatch::getStorageCredentials()
3+
4+
args <- list(...)
5+
dependsOn <- args$dependsOn
6+
cloudCombine <- args$cloudCombine
7+
containerImage <- args$containerImage
8+
9+
resultFile <- paste0(taskId, "-result", ".rds")
10+
accountName <- storageCredentials$name
11+
12+
# Only use the download command if cloudCombine is enabled
13+
# Otherwise just leave it empty
14+
commands <- c()
15+
16+
if (!is.null(cloudCombine)) {
17+
assign("cloudCombine", cloudCombine, .doAzureBatchGlobals)
18+
19+
copyCommand <- sprintf(
20+
"%s %s %s --download --saskey $BLOBXFER_SASKEY --remoteresource . --include results/*.rds",
21+
accountName,
22+
jobId,
23+
"$AZ_BATCH_TASK_WORKING_DIR"
24+
)
25+
26+
downloadCommand <-
27+
dockerRunCommand("alfpark/blobxfer:0.12.1", copyCommand, "blobxfer", FALSE)
28+
29+
commands <- c(downloadCommand)
30+
}
31+
32+
exitConditions <- NULL
33+
if (!is.null(args$dependsOn)) {
34+
dependsOn <- args$dependsOn
35+
}
36+
else {
37+
exitConditions <- list(default = list(dependencyAction = "satisfy"))
38+
}
39+
40+
containerUrl <-
41+
rAzureBatch::createBlobUrl(
42+
storageAccount = storageCredentials$name,
43+
containerName = jobId,
44+
sasToken = rAzureBatch::createSasToken("w", "c", jobId)
45+
)
46+
47+
outputFiles <- list(
48+
list(
49+
filePattern = resultFile,
50+
destination = list(container = list(
51+
path = paste0("results/", resultFile),
52+
containerUrl = containerUrl
53+
)),
54+
uploadOptions = list(uploadCondition = "taskCompletion")
55+
),
56+
list(
57+
filePattern = paste0(taskId, ".txt"),
58+
destination = list(container = list(
59+
path = paste0("logs/", taskId, ".txt"),
60+
containerUrl = containerUrl
61+
)),
62+
uploadOptions = list(uploadCondition = "taskCompletion")
63+
),
64+
list(
65+
filePattern = "../stdout.txt",
66+
destination = list(container = list(
67+
path = paste0("stdout/", taskId, "-stdout.txt"),
68+
containerUrl = containerUrl
69+
)),
70+
uploadOptions = list(uploadCondition = "taskCompletion")
71+
),
72+
list(
73+
filePattern = "../stderr.txt",
74+
destination = list(container = list(
75+
path = paste0("stderr/", taskId, "-stderr.txt"),
76+
containerUrl = containerUrl
77+
)),
78+
uploadOptions = list(uploadCondition = "taskCompletion")
79+
)
80+
)
81+
82+
commands <-
83+
c(commands,
84+
dockerRunCommand(containerImage, rCommand))
85+
86+
commands <- linuxWrapCommands(commands)
87+
88+
sasToken <- rAzureBatch::createSasToken("rwcl", "c", jobId)
89+
queryParameterUrl <- "?"
90+
91+
for (query in names(sasToken)) {
92+
queryParameterUrl <-
93+
paste0(queryParameterUrl,
94+
query,
95+
"=",
96+
RCurl::curlEscape(sasToken[[query]]),
97+
"&")
98+
}
99+
100+
queryParameterUrl <-
101+
substr(queryParameterUrl, 1, nchar(queryParameterUrl) - 1)
102+
103+
setting <- list(name = "BLOBXFER_SASKEY",
104+
value = queryParameterUrl)
105+
106+
containerEnv <- list(name = "CONTAINER_NAME",
107+
value = jobId)
108+
109+
rAzureBatch::addTask(
110+
jobId,
111+
taskId,
112+
environmentSettings = list(setting, containerEnv),
113+
commandLine = commands,
114+
dependsOn = dependsOn,
115+
outputFiles = outputFiles,
116+
exitConditions = exitConditions
117+
)
118+
}
119+
120+
addSubMergeTask <- function(jobId, taskId, rCommand, ...){
121+
storageCredentials <- rAzureBatch::getStorageCredentials()
122+
accountName <- storageCredentials$name
123+
124+
args <- list(...)
125+
dependsOn <- args$dependsOn
126+
containerImage <- args$containerImage
127+
outputFiles <- args$outputFiles
128+
129+
copyCommand <- sprintf(
130+
"%s %s %s --download --saskey $BLOBXFER_SASKEY --remoteresource . --include %s/*.rds",
131+
accountName,
132+
jobId,
133+
"$AZ_BATCH_TASK_WORKING_DIR",
134+
taskId
135+
)
136+
137+
exitConditions <- NULL
138+
if (!is.null(args$dependsOn)) {
139+
dependsOn <- args$dependsOn
140+
}
141+
else {
142+
exitConditions <- list(default = list(dependencyAction = "satisfy"))
143+
}
144+
145+
downloadCommand <-
146+
dockerRunCommand("alfpark/blobxfer:0.12.1", copyCommand, "blobxfer", FALSE)
147+
148+
commands <- c(downloadCommand, dockerRunCommand(containerImage, rCommand))
149+
commands <- linuxWrapCommands(commands)
150+
151+
sasToken <- rAzureBatch::createSasToken("rwcl", "c", jobId)
152+
queryParameterUrl <- "?"
153+
154+
for (query in names(sasToken)) {
155+
queryParameterUrl <-
156+
paste0(queryParameterUrl,
157+
query,
158+
"=",
159+
RCurl::curlEscape(sasToken[[query]]),
160+
"&")
161+
}
162+
163+
queryParameterUrl <-
164+
substr(queryParameterUrl, 1, nchar(queryParameterUrl) - 1)
165+
166+
setting <- list(name = "BLOBXFER_SASKEY",
167+
value = queryParameterUrl)
168+
169+
rAzureBatch::addTask(
170+
jobId,
171+
taskId,
172+
commandLine = commands,
173+
environmentSettings = list(setting),
174+
dependsOn = dependsOn,
175+
outputFiles = outputFiles,
176+
exitConditions = exitConditions
177+
)
178+
}
179+
1180
.addTask <- function(jobId, taskId, rCommand, ...) {
2181
storageCredentials <- rAzureBatch::getStorageCredentials()
3182

@@ -14,7 +193,6 @@
14193
maxTaskRetryCount <- args$maxTaskRetryCount
15194
}
16195

17-
resultFile <- paste0(taskId, "-result", ".rds")
18196
accountName <- storageCredentials$name
19197

20198
resourceFiles <- NULL
@@ -67,14 +245,6 @@
67245
)
68246

69247
outputFiles <- list(
70-
list(
71-
filePattern = resultFile,
72-
destination = list(container = list(
73-
path = paste0("result/", resultFile),
74-
containerUrl = containerUrl
75-
)),
76-
uploadOptions = list(uploadCondition = "taskCompletion")
77-
),
78248
list(
79249
filePattern = paste0(taskId, ".txt"),
80250
destination = list(container = list(

0 commit comments

Comments
 (0)