Skip to content
This repository was archived by the owner on Oct 12, 2023. It is now read-only.

Commit 169e75f

Browse files
authored
Fixed job creation (#138)
1 parent 40a2cf0 commit 169e75f

File tree

2 files changed

+153
-144
lines changed

2 files changed

+153
-144
lines changed

R/doAzureParallel.R

Lines changed: 152 additions & 143 deletions
Original file line numberDiff line numberDiff line change
@@ -253,174 +253,183 @@ setHttpTraffic <- function(value = FALSE) {
253253
assign("enableCloudCombine", enableCloudCombine, envir = .doAzureBatchGlobals)
254254
assign("cloudCombine", cloudCombine, envir = .doAzureBatchGlobals)
255255

256-
retryCounter <- 0
257-
maxRetryCount <- 5
258-
startupFolderName <- "startup"
259-
containerResponse <- NULL
260-
jobquotaReachedResponse <- NULL
261-
while (retryCounter < maxRetryCount) {
262-
sprintf("job id is: %s", id)
263-
# try to submit the job. We may run into naming conflicts. If so, try again
264-
tryCatch({
265-
retryCounter <- retryCounter + 1
266-
267-
response <- rAzureBatch::createContainer(id, content = "text")
268-
if (grepl("ContainerAlreadyExists", response)) {
269-
if (!is.null(obj$options$azure$job)) {
270-
containerResponse <- grepl("ContainerAlreadyExists", response)
271-
break
272-
273-
}
256+
resourceFiles <- list()
257+
if (!is.null(obj$options$azure$resourceFiles)) {
258+
resourceFiles <- obj$options$azure$resourceFiles
259+
}
274260

275-
stop("Container already exists. Multiple jobs may possibly be running.")
276-
}
261+
if (!is.null(obj$options$azure$resourcefiles)) {
262+
resourceFiles <- obj$options$azure$resourcefiles
263+
}
277264

278-
rAzureBatch::uploadBlob(id,
279-
system.file(startupFolderName, "worker.R", package = "doAzureParallel"))
280-
rAzureBatch::uploadBlob(id,
281-
system.file(startupFolderName, "merger.R", package = "doAzureParallel"))
282-
rAzureBatch::uploadBlob(
283-
id,
284-
system.file(startupFolderName, "install_github.R", package = "doAzureParallel")
285-
)
286-
rAzureBatch::uploadBlob(
287-
id,
288-
system.file(startupFolderName, "install_cran.R", package = "doAzureParallel")
289-
)
290-
rAzureBatch::uploadBlob(
291-
id,
292-
system.file(startupFolderName, "install_bioconductor.R", package = "doAzureParallel")
293-
)
265+
enableCloudCombineKeyValuePair <-
266+
list(name = "enableCloudCombine", value = as.character(enableCloudCombine))
294267

295-
# Setting up common job environment for all tasks
296-
jobFileName <- paste0(id, ".rds")
297-
saveRDS(.doAzureBatchGlobals, file = jobFileName)
268+
chunkSize <- 1
298269

299-
rAzureBatch::uploadBlob(id, paste0(getwd(), "/", jobFileName))
270+
if (!is.null(obj$options$azure$chunkSize)) {
271+
chunkSize <- obj$options$azure$chunkSize
272+
}
300273

301-
file.remove(jobFileName)
274+
if (!is.null(obj$options$azure$chunksize)) {
275+
chunkSize <- obj$options$azure$chunksize
276+
}
302277

303-
resourceFiles <- list()
304-
if (!is.null(obj$options$azure$resourceFiles)) {
305-
resourceFiles <- obj$options$azure$resourceFiles
306-
}
278+
if (exists("chunkSize", envir = .doAzureBatchGlobals)) {
279+
chunkSize <- get("chunkSize", envir = .doAzureBatchGlobals)
280+
}
307281

308-
if (!is.null(obj$options$azure$resourcefiles)) {
309-
resourceFiles <- obj$options$azure$resourcefiles
310-
}
282+
chunkSizeKeyValuePair <-
283+
list(name = "chunkSize", value = as.character(chunkSize))
284+
285+
if (is.null(obj$packages)) {
286+
metadata <-
287+
list(enableCloudCombineKeyValuePair, chunkSizeKeyValuePair)
288+
} else {
289+
packagesKeyValuePair <-
290+
list(name = "packages",
291+
value = paste(obj$packages, collapse = ";"))
292+
293+
metadata <-
294+
list(enableCloudCombineKeyValuePair,
295+
chunkSizeKeyValuePair,
296+
packagesKeyValuePair)
297+
}
311298

312-
sasToken <- rAzureBatch::createSasToken("r", "c", id)
313-
workerScriptUrl <-
314-
rAzureBatch::createBlobUrl(storageCredentials$name, id, "worker.R", sasToken)
315-
mergerScriptUrl <-
316-
rAzureBatch::createBlobUrl(storageCredentials$name, id, "merger.R", sasToken)
317-
installGithubScriptUrl <-
318-
rAzureBatch::createBlobUrl(storageCredentials$name,
319-
id,
320-
"install_github.R",
321-
sasToken)
322-
installCranScriptUrl <-
323-
rAzureBatch::createBlobUrl(storageCredentials$name, id, "install_cran.R", sasToken)
324-
installBioConductorScriptUrl <-
325-
rAzureBatch::createBlobUrl(storageCredentials$name, id, "install_bioconductor.R", sasToken)
326-
jobCommonFileUrl <-
327-
rAzureBatch::createBlobUrl(storageCredentials$name, id, jobFileName, sasToken)
328-
329-
requiredJobResourceFiles <- list(
330-
rAzureBatch::createResourceFile(url = workerScriptUrl, fileName = "worker.R"),
331-
rAzureBatch::createResourceFile(url = mergerScriptUrl, fileName = "merger.R"),
332-
rAzureBatch::createResourceFile(url = installGithubScriptUrl, fileName = "install_github.R"),
333-
rAzureBatch::createResourceFile(url = installCranScriptUrl, fileName = "install_cran.R"),
334-
rAzureBatch::createResourceFile(url = installBioConductorScriptUrl, fileName = "install_bioconductor.R"),
335-
rAzureBatch::createResourceFile(url = jobCommonFileUrl, fileName = jobFileName)
299+
retryCounter <- 0
300+
maxRetryCount <- 5
301+
startupFolderName <- "startup"
302+
repeat {
303+
if (retryCounter > maxRetryCount) {
304+
stop(
305+
sprintf(
306+
"Error creating job: Maximum number of retries (%d) exceeded",
307+
maxRetryCount
308+
)
336309
)
310+
}
311+
else {
312+
retryCounter <- retryCounter + 1
313+
}
337314

338-
# We need to merge any files passed by the calling lib with the resource files specified here
339-
340-
resourceFiles <-
341-
append(resourceFiles, requiredJobResourceFiles)
342-
343-
enableCloudCombineKeyValuePair <-
344-
list(name = "enableCloudCombine", value = as.character(enableCloudCombine))
315+
containerResponse <- rAzureBatch::createContainer(id, content = "response")
345316

346-
chunkSize <- 1
317+
if (containerResponse$status_code >= 400 && containerResponse$status_code <= 499) {
318+
containerContent <- xml2::as_list(httr::content(containerResponse))
347319

348-
if (!is.null(obj$options$azure$chunkSize)) {
349-
chunkSize <- obj$options$azure$chunkSize
320+
if (!is.null(obj$options$azure$job) && containerContent$Code[[1]] == "ContainerAlreadyExists") {
321+
stop(paste("Error creating job: Job's storage container already exists for an unique job id.",
322+
"Either delete the storage container or change the job argument in the foreach."))
350323
}
351324

352-
if (!is.null(obj$options$azure$chunksize)) {
353-
chunkSize <- obj$options$azure$chunksize
354-
}
325+
Sys.sleep(retryCounter * retryCounter)
355326

356-
if (exists("chunkSize", envir = .doAzureBatchGlobals)) {
357-
chunkSize <- get("chunkSize", envir = .doAzureBatchGlobals)
358-
}
327+
time <- format(Sys.time(), "%Y%m%d%H%M%S", tz = "GMT")
328+
id <- sprintf("%s%s",
329+
"job",
330+
time)
331+
next
332+
}
333+
else if (containerResponse$status_code >= 500 && containerResponse$status_code <= 599) {
334+
containerContent <- xml2::as_list(httr::content(containerResponse))
335+
stop(paste0("Error creating job: ", containerContent$message$value))
336+
}
359337

360-
chunkSizeKeyValuePair <-
361-
list(name = "chunkSize", value = as.character(chunkSize))
362-
363-
if (is.null(obj$packages)) {
364-
metadata <-
365-
list(enableCloudCombineKeyValuePair, chunkSizeKeyValuePair)
366-
} else {
367-
packagesKeyValuePair <-
368-
list(name = "packages",
369-
value = paste(obj$packages, collapse = ";"))
370-
371-
metadata <-
372-
list(enableCloudCombineKeyValuePair,
373-
chunkSizeKeyValuePair,
374-
packagesKeyValuePair)
375-
}
338+
# Uploading common job files for the worker node
339+
rAzureBatch::uploadBlob(id,
340+
system.file(startupFolderName, "worker.R", package = "doAzureParallel"))
341+
rAzureBatch::uploadBlob(id,
342+
system.file(startupFolderName, "merger.R", package = "doAzureParallel"))
343+
rAzureBatch::uploadBlob(
344+
id,
345+
system.file(startupFolderName, "install_github.R", package = "doAzureParallel")
346+
)
347+
rAzureBatch::uploadBlob(
348+
id,
349+
system.file(startupFolderName, "install_cran.R", package = "doAzureParallel")
350+
)
351+
rAzureBatch::uploadBlob(
352+
id,
353+
system.file(startupFolderName, "install_bioconductor.R", package = "doAzureParallel")
354+
)
376355

377-
response <- .addJob(
378-
jobId = id,
379-
poolId = data$poolId,
380-
resourceFiles = resourceFiles,
381-
metadata = metadata,
382-
packages = obj$packages
383-
)
356+
# Creating common job environment for all tasks
357+
jobFileName <- paste0(id, ".rds")
358+
saveRDS(.doAzureBatchGlobals, file = jobFileName)
359+
rAzureBatch::uploadBlob(id, paste0(getwd(), "/", jobFileName))
360+
file.remove(jobFileName)
361+
362+
# Creating read-only SAS token blob resource file urls
363+
sasToken <- rAzureBatch::createSasToken("r", "c", id)
364+
workerScriptUrl <-
365+
rAzureBatch::createBlobUrl(storageCredentials$name, id, "worker.R", sasToken)
366+
mergerScriptUrl <-
367+
rAzureBatch::createBlobUrl(storageCredentials$name, id, "merger.R", sasToken)
368+
installGithubScriptUrl <-
369+
rAzureBatch::createBlobUrl(storageCredentials$name,
370+
id,
371+
"install_github.R",
372+
sasToken)
373+
installCranScriptUrl <-
374+
rAzureBatch::createBlobUrl(storageCredentials$name, id, "install_cran.R", sasToken)
375+
installBioConductorScriptUrl <-
376+
rAzureBatch::createBlobUrl(storageCredentials$name, id, "install_bioconductor.R", sasToken)
377+
jobCommonFileUrl <-
378+
rAzureBatch::createBlobUrl(storageCredentials$name, id, jobFileName, sasToken)
379+
380+
requiredJobResourceFiles <- list(
381+
rAzureBatch::createResourceFile(url = workerScriptUrl, fileName = "worker.R"),
382+
rAzureBatch::createResourceFile(url = mergerScriptUrl, fileName = "merger.R"),
383+
rAzureBatch::createResourceFile(url = installGithubScriptUrl, fileName = "install_github.R"),
384+
rAzureBatch::createResourceFile(url = installCranScriptUrl, fileName = "install_cran.R"),
385+
rAzureBatch::createResourceFile(url = installBioConductorScriptUrl, fileName = "install_bioconductor.R"),
386+
rAzureBatch::createResourceFile(url = jobCommonFileUrl, fileName = jobFileName)
387+
)
384388

385-
if (grepl("ActiveJobAndScheduleQuotaReached", response)) {
386-
jobquotaReachedResponse <-
387-
grepl("ActiveJobAndScheduleQuotaReached", response)
388-
}
389+
resourceFiles <-
390+
append(resourceFiles, requiredJobResourceFiles)
389391

390-
if (grepl("JobExists", response)) {
391-
stop("The specified job already exists.")
392-
}
392+
response <- .addJob(
393+
jobId = id,
394+
poolId = data$poolId,
395+
resourceFiles = resourceFiles,
396+
metadata = metadata,
397+
packages = obj$packages
398+
)
393399

400+
if (response$status_code == 201) {
394401
break
402+
}
403+
else {
404+
jobContent <- httr::content(response, content = "parsed")
395405

396-
},
397-
error = function(e) {
398-
if (retryCounter == maxRetryCount) {
399-
cat(sprintf("Error creating job: %s\n",
400-
conditionMessage(e)))
406+
if (jobContent$code == "JobExists" && !is.null(obj$options$azure$job)) {
407+
stop(paste("Error in creating job: Job already exists with the unique job id.",
408+
"Either delete the job or change the job argument in the foreach loop.",
409+
jobContent$message$value))
410+
}
411+
else if (jobContent$code == "JobExists") {
412+
Sys.sleep(retryCounter * retryCounter)
413+
414+
time <- format(Sys.time(), "%Y%m%d%H%M%S", tz = "GMT")
415+
id <- sprintf("%s%s",
416+
"job",
417+
time)
418+
next
401419
}
402420

403-
print(e)
404-
time <- format(Sys.time(), "%Y%m%d%H%M%S", tz = "GMT")
405-
id <- sprintf("%s%s",
406-
"job",
407-
time)
408-
})
409-
}
410-
411-
if (!is.null(containerResponse)) {
412-
stop(
413-
"Aborted mission. The container has already exist with user's specific job id. Please use a different job id."
414-
)
415-
}
421+
if (jobContent$code == "ActiveJobAndScheduleQuotaReached") {
422+
stop(
423+
paste(
424+
"Error in creating job: Your active job quota has been reached.",
425+
"To increase your active job quota,",
426+
"go to https://docs.microsoft.com/en-us/azure/batch/batch-quota-limit"
427+
)
428+
)
429+
}
416430

417-
if (!is.null(jobquotaReachedResponse)) {
418-
stop(
419-
paste0(
420-
"Aborted mission. Your active job quota has been reached. To increase your active job quota, ",
421-
"go to https://docs.microsoft.com/en-us/azure/batch/batch-quota-limit"
422-
)
423-
)
431+
stop("Error in creating job: ", jobContent$message$value)
432+
}
424433
}
425434

426435
cat("Job Summary: ", fill = TRUE)

R/helpers.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@
169169
poolInfo = poolInfo,
170170
jobPreparationTask = jobPreparationTask,
171171
usesTaskDependencies = usesTaskDependencies,
172-
content = "text",
172+
content = "response",
173173
metadata = metadata
174174
)
175175

0 commit comments

Comments
 (0)