Skip to content
This repository was archived by the owner on Oct 12, 2023. It is now read-only.

Commit 9b8f70a

Browse files
committed
merge from master
2 parents 6c2fd92 + 4eb3773 commit 9b8f70a

File tree

5 files changed

+195
-154
lines changed

5 files changed

+195
-154
lines changed

DESCRIPTION

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ Description: The project is for data experts who use R at scale. The project
1010
maintained by Azure Batch and, for the initial version, will be a public/
1111
communal pool. The orchestration for each job that needs to be parallelized in
1212
the cluster will be done by a middle layer that schedules each request.
13-
License: Microsoft Corporation
13+
Copyright: Microsoft
14+
License: MIT + file LICENSE
1415
LazyData: TRUE
1516
Depends:
1617
foreach (>= 1.4.3),

R/doAzureParallel.R

Lines changed: 159 additions & 148 deletions
Original file line numberDiff line numberDiff line change
@@ -253,190 +253,201 @@ setHttpTraffic <- function(value = FALSE) {
253253
assign("enableCloudCombine", enableCloudCombine, envir = .doAzureBatchGlobals)
254254
assign("cloudCombine", cloudCombine, envir = .doAzureBatchGlobals)
255255

256-
retryCounter <- 0
257-
maxRetryCount <- 5
258-
startupFolderName <- "startup"
259-
containerResponse <- NULL
260-
jobquotaReachedResponse <- NULL
261-
while (retryCounter < maxRetryCount) {
262-
sprintf("job id is: %s", id)
263-
# try to submit the job. We may run into naming conflicts. If so, try again
264-
tryCatch({
265-
retryCounter <- retryCounter + 1
256+
resourceFiles <- list()
257+
if (!is.null(obj$options$azure$resourceFiles)) {
258+
resourceFiles <- obj$options$azure$resourceFiles
259+
}
266260

267-
response <- rAzureBatch::createContainer(id, content = "text")
268-
if (grepl("ContainerAlreadyExists", response)) {
269-
if (!is.null(obj$options$azure$job)) {
270-
containerResponse <- grepl("ContainerAlreadyExists", response)
271-
break
261+
if (!is.null(obj$options$azure$resourcefiles)) {
262+
resourceFiles <- obj$options$azure$resourcefiles
263+
}
272264

273-
}
265+
enableCloudCombineKeyValuePair <-
266+
list(name = "enableCloudCombine", value = as.character(enableCloudCombine))
274267

275-
stop("Container already exists. Multiple jobs may possibly be running.")
276-
}
268+
chunkSize <- 1
277269

278-
rAzureBatch::uploadBlob(id,
279-
system.file(startupFolderName, "worker.R", package = "doAzureParallel"))
280-
rAzureBatch::uploadBlob(id,
281-
system.file(startupFolderName, "merger.R", package = "doAzureParallel"))
282-
rAzureBatch::uploadBlob(
283-
id,
284-
system.file(startupFolderName, "install_github.R", package = "doAzureParallel")
285-
)
286-
rAzureBatch::uploadBlob(
287-
id,
288-
system.file(startupFolderName, "install_cran.R", package = "doAzureParallel")
289-
)
290-
rAzureBatch::uploadBlob(
291-
id,
292-
system.file(startupFolderName, "install_bioconductor.R", package = "doAzureParallel")
293-
)
270+
if (!is.null(obj$options$azure$chunkSize)) {
271+
chunkSize <- obj$options$azure$chunkSize
272+
}
294273

295-
# Setting up common job environment for all tasks
296-
jobFileName <- paste0(id, ".rds")
297-
saveRDS(.doAzureBatchGlobals, file = jobFileName)
274+
if (!is.null(obj$options$azure$chunksize)) {
275+
chunkSize <- obj$options$azure$chunksize
276+
}
298277

299-
rAzureBatch::uploadBlob(id, paste0(getwd(), "/", jobFileName))
278+
if (exists("chunkSize", envir = .doAzureBatchGlobals)) {
279+
chunkSize <- get("chunkSize", envir = .doAzureBatchGlobals)
280+
}
300281

301-
file.remove(jobFileName)
282+
chunkSizeKeyValuePair <-
283+
list(name = "chunkSize", value = as.character(chunkSize))
302284

303-
resourceFiles <- list()
304-
if (!is.null(obj$options$azure$resourceFiles)) {
305-
resourceFiles <- obj$options$azure$resourceFiles
306-
}
285+
metadata <-
286+
list(enableCloudCombineKeyValuePair, chunkSizeKeyValuePair)
307287

308-
if (!is.null(obj$options$azure$resourcefiles)) {
309-
resourceFiles <- obj$options$azure$resourcefiles
310-
}
288+
metadataCount <- 3
289+
if (!is.null(obj$packages)) {
290+
packagesKeyValuePair <-
291+
list(name = "packages",
292+
value = paste(obj$packages, collapse = ";"))
311293

312-
sasToken <- rAzureBatch::createSasToken("r", "c", id)
313-
workerScriptUrl <-
314-
rAzureBatch::createBlobUrl(storageCredentials$name, id, "worker.R", sasToken)
315-
mergerScriptUrl <-
316-
rAzureBatch::createBlobUrl(storageCredentials$name, id, "merger.R", sasToken)
317-
installGithubScriptUrl <-
318-
rAzureBatch::createBlobUrl(storageCredentials$name,
319-
id,
320-
"install_github.R",
321-
sasToken)
322-
installCranScriptUrl <-
323-
rAzureBatch::createBlobUrl(storageCredentials$name, id, "install_cran.R", sasToken)
324-
installBioConductorScriptUrl <-
325-
rAzureBatch::createBlobUrl(storageCredentials$name, id, "install_bioconductor.R", sasToken)
326-
jobCommonFileUrl <-
327-
rAzureBatch::createBlobUrl(storageCredentials$name, id, jobFileName, sasToken)
328-
329-
requiredJobResourceFiles <- list(
330-
rAzureBatch::createResourceFile(url = workerScriptUrl, fileName = "worker.R"),
331-
rAzureBatch::createResourceFile(url = mergerScriptUrl, fileName = "merger.R"),
332-
rAzureBatch::createResourceFile(url = installGithubScriptUrl, fileName = "install_github.R"),
333-
rAzureBatch::createResourceFile(url = installCranScriptUrl, fileName = "install_cran.R"),
334-
rAzureBatch::createResourceFile(url = installBioConductorScriptUrl, fileName = "install_bioconductor.R"),
335-
rAzureBatch::createResourceFile(url = jobCommonFileUrl, fileName = jobFileName)
336-
)
337-
338-
# We need to merge any files passed by the calling lib with the resource files specified here
294+
metadata[[metadataCount]] <-packagesKeyValuePair
295+
metadataCount <- metadataCount + 1
296+
}
339297

340-
resourceFiles <-
341-
append(resourceFiles, requiredJobResourceFiles)
298+
if (!is.null(obj$errorHandling)) {
299+
errorHandlingKeyValuePair <-
300+
list(name = "errorHandling",
301+
value = as.character(obj$errorHandling))
342302

343-
enableCloudCombineKeyValuePair <-
344-
list(name = "enableCloudCombine", value = as.character(enableCloudCombine))
303+
metadata[[metadataCount]] <- errorHandlingKeyValuePair
304+
metadataCount <- metadataCount + 1
305+
}
345306

346-
chunkSize <- 1
307+
retryCounter <- 0
308+
maxRetryCount <- 5
309+
startupFolderName <- "startup"
310+
repeat {
311+
if (retryCounter > maxRetryCount) {
312+
stop(
313+
sprintf(
314+
"Error creating job: Maximum number of retries (%d) exceeded",
315+
maxRetryCount
316+
)
317+
)
318+
}
319+
else {
320+
retryCounter <- retryCounter + 1
321+
}
347322

348-
if (!is.null(obj$options$azure$chunkSize)) {
349-
chunkSize <- obj$options$azure$chunkSize
350-
}
323+
containerResponse <- rAzureBatch::createContainer(id, content = "response")
351324

352-
if (!is.null(obj$options$azure$chunksize)) {
353-
chunkSize <- obj$options$azure$chunksize
354-
}
325+
if (containerResponse$status_code >= 400 && containerResponse$status_code <= 499) {
326+
containerContent <- xml2::as_list(httr::content(containerResponse))
355327

356-
if (exists("chunkSize", envir = .doAzureBatchGlobals)) {
357-
chunkSize <- get("chunkSize", envir = .doAzureBatchGlobals)
328+
if (!is.null(obj$options$azure$job) && containerContent$Code[[1]] == "ContainerAlreadyExists") {
329+
stop(paste("Error creating job: Job's storage container already exists for an unique job id.",
330+
"Either delete the storage container or change the job argument in the foreach."))
358331
}
359332

360-
chunkSizeKeyValuePair <-
361-
list(name = "chunkSize", value = as.character(chunkSize))
333+
Sys.sleep(retryCounter * retryCounter)
362334

363-
metadata <-
364-
list(enableCloudCombineKeyValuePair, chunkSizeKeyValuePair)
335+
time <- format(Sys.time(), "%Y%m%d%H%M%S", tz = "GMT")
336+
id <- sprintf("%s%s",
337+
"job",
338+
time)
339+
next
340+
}
341+
else if (containerResponse$status_code >= 500 && containerResponse$status_code <= 599) {
342+
containerContent <- xml2::as_list(httr::content(containerResponse))
343+
stop(paste0("Error creating job: ", containerContent$message$value))
344+
}
365345

366-
metadataCount <- 3
367-
if (!is.null(obj$packages)) {
368-
packagesKeyValuePair <-
369-
list(name = "packages",
370-
value = paste(obj$packages, collapse = ";"))
346+
# Uploading common job files for the worker node
347+
rAzureBatch::uploadBlob(id,
348+
system.file(startupFolderName, "worker.R", package = "doAzureParallel"))
349+
rAzureBatch::uploadBlob(id,
350+
system.file(startupFolderName, "merger.R", package = "doAzureParallel"))
351+
rAzureBatch::uploadBlob(
352+
id,
353+
system.file(startupFolderName, "install_github.R", package = "doAzureParallel")
354+
)
355+
rAzureBatch::uploadBlob(
356+
id,
357+
system.file(startupFolderName, "install_cran.R", package = "doAzureParallel")
358+
)
359+
rAzureBatch::uploadBlob(
360+
id,
361+
system.file(startupFolderName, "install_bioconductor.R", package = "doAzureParallel")
362+
)
371363

372-
metadata[[metadataCount]] <-packagesKeyValuePair
373-
metadataCount <- metadataCount + 1
374-
}
364+
# Creating common job environment for all tasks
365+
jobFileName <- paste0(id, ".rds")
366+
saveRDS(.doAzureBatchGlobals, file = jobFileName)
367+
rAzureBatch::uploadBlob(id, paste0(getwd(), "/", jobFileName))
368+
file.remove(jobFileName)
369+
370+
# Creating read-only SAS token blob resource file urls
371+
sasToken <- rAzureBatch::createSasToken("r", "c", id)
372+
workerScriptUrl <-
373+
rAzureBatch::createBlobUrl(storageCredentials$name, id, "worker.R", sasToken)
374+
mergerScriptUrl <-
375+
rAzureBatch::createBlobUrl(storageCredentials$name, id, "merger.R", sasToken)
376+
installGithubScriptUrl <-
377+
rAzureBatch::createBlobUrl(storageCredentials$name,
378+
id,
379+
"install_github.R",
380+
sasToken)
381+
installCranScriptUrl <-
382+
rAzureBatch::createBlobUrl(storageCredentials$name, id, "install_cran.R", sasToken)
383+
installBioConductorScriptUrl <-
384+
rAzureBatch::createBlobUrl(storageCredentials$name, id, "install_bioconductor.R", sasToken)
385+
jobCommonFileUrl <-
386+
rAzureBatch::createBlobUrl(storageCredentials$name, id, jobFileName, sasToken)
387+
388+
requiredJobResourceFiles <- list(
389+
rAzureBatch::createResourceFile(url = workerScriptUrl, fileName = "worker.R"),
390+
rAzureBatch::createResourceFile(url = mergerScriptUrl, fileName = "merger.R"),
391+
rAzureBatch::createResourceFile(url = installGithubScriptUrl, fileName = "install_github.R"),
392+
rAzureBatch::createResourceFile(url = installCranScriptUrl, fileName = "install_cran.R"),
393+
rAzureBatch::createResourceFile(url = installBioConductorScriptUrl, fileName = "install_bioconductor.R"),
394+
rAzureBatch::createResourceFile(url = jobCommonFileUrl, fileName = jobFileName)
395+
)
375396

376-
if (!is.null(obj$errorHandling)) {
377-
errorHandlingKeyValuePair <-
378-
list(name = "errorHandling",
379-
value = as.character(obj$errorHandling))
397+
resourceFiles <-
398+
append(resourceFiles, requiredJobResourceFiles)
380399

381-
metadata[[metadataCount]] <- errorHandlingKeyValuePair
382-
metadataCount <- metadataCount + 1
383-
}
400+
response <- .addJob(
401+
jobId = id,
402+
poolId = data$poolId,
403+
resourceFiles = resourceFiles,
404+
metadata = metadata,
405+
packages = obj$packages
406+
)
384407

385-
response <- .addJob(
386-
jobId = id,
387-
poolId = data$poolId,
388-
resourceFiles = resourceFiles,
389-
metadata = metadata,
390-
packages = obj$packages
391-
)
408+
if (response$status_code == 201) {
409+
break
410+
}
411+
else {
412+
jobContent <- httr::content(response, content = "parsed")
392413

393-
if (grepl("ActiveJobAndScheduleQuotaReached", response)) {
394-
jobquotaReachedResponse <-
395-
grepl("ActiveJobAndScheduleQuotaReached", response)
414+
if (jobContent$code == "JobExists" && !is.null(obj$options$azure$job)) {
415+
stop(paste("Error in creating job: Job already exists with the unique job id.",
416+
"Either delete the job or change the job argument in the foreach loop.",
417+
jobContent$message$value))
396418
}
397-
398-
if (grepl("JobExists", response)) {
399-
stop("The specified job already exists.")
419+
else if (jobContent$code == "JobExists") {
420+
Sys.sleep(retryCounter * retryCounter)
421+
422+
time <- format(Sys.time(), "%Y%m%d%H%M%S", tz = "GMT")
423+
id <- sprintf("%s%s",
424+
"job",
425+
time)
426+
next
400427
}
401428

402-
saveMetadataBlob(id, metadata)
403-
404-
break
405-
406-
},
407-
error = function(e) {
408-
if (retryCounter == maxRetryCount) {
409-
cat(sprintf("Error creating job: %s\n",
410-
conditionMessage(e)))
429+
if (jobContent$code == "ActiveJobAndScheduleQuotaReached") {
430+
stop(
431+
paste(
432+
"Error in creating job: Your active job quota has been reached.",
433+
"To increase your active job quota,",
434+
"go to https://docs.microsoft.com/en-us/azure/batch/batch-quota-limit"
435+
)
436+
)
411437
}
412438

413-
print(e)
414-
time <- format(Sys.time(), "%Y%m%d%H%M%S", tz = "GMT")
415-
id <- sprintf("%s%s",
416-
"job",
417-
time)
418-
})
419-
}
420-
421-
if (!is.null(containerResponse)) {
422-
stop(
423-
"Aborted mission. The container has already exist with user's specific job id. Please use a different job id."
424-
)
425-
}
426-
427-
if (!is.null(jobquotaReachedResponse)) {
428-
stop(
429-
paste0(
430-
"Aborted mission. Your active job quota has been reached. To increase your active job quota, ",
431-
"go to https://docs.microsoft.com/en-us/azure/batch/batch-quota-limit"
432-
)
433-
)
439+
stop("Error in creating job: ", jobContent$message$value)
440+
}
434441
}
435442

436443
cat("Job Summary: ", fill = TRUE)
437444
job <- rAzureBatch::getJob(id)
438445
cat(sprintf("Id: %s", job$id), fill = TRUE)
439446

447+
if (!is.null(job$id)) {
448+
saveMetadataBlob(job$id, metadata)
449+
}
450+
440451
ntasks <- length(argsList)
441452

442453
startIndices <- seq(1, length(argsList), chunkSize)

R/helpers.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@
169169
poolInfo = poolInfo,
170170
jobPreparationTask = jobPreparationTask,
171171
usesTaskDependencies = usesTaskDependencies,
172-
content = "text",
172+
content = "response",
173173
metadata = metadata
174174
)
175175

0 commit comments

Comments
 (0)