Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Fix: Upgrading to R Batch SDK to 2018-12-01.8.0 #354

Merged
merged 7 commits into from
Jun 19, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 13 additions & 14 deletions R/batch-api.R
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ BatchUtilities <- R6::R6Class(

accountName <- storageClient$authentication$name

resourceFiles <- NULL
resourceFiles <- args$resourceFiles
if (!is.null(argsList)) {
envFile <- paste0(taskId, ".rds")
saveRDS(argsList, file = envFile)
Expand All @@ -37,8 +37,18 @@ BatchUtilities <- R6::R6Class(
envFile,
readToken,
config$endpointSuffix)
resourceFiles <-
list(rAzureBatch::createResourceFile(url = envFileUrl, fileName = envFile))

environmentResourceFile <-
rAzureBatch::createResourceFile(filePath = envFile, httpUrl = envFileUrl)

if (is.null(resourceFiles))
{
resourceFiles <-
list(environmentResourceFile)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be reversed if null set to list else append to existing list?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix this.

else {
resourceFiles <- append(resourceFiles, environmentResourceFile)
}
}

# Only use the download command if cloudCombine is enabled
Expand All @@ -52,17 +62,6 @@ BatchUtilities <- R6::R6Class(

if (!is.null(cloudCombine)) {
assign("cloudCombine", cloudCombine, .doAzureBatchGlobals)
containerSettings$imageName <- "brianlovedocker/doazureparallel-merge-dockerfile:0.12.1"

copyCommand <- sprintf(
"%s %s %s --download --saskey $BLOBXFER_SASKEY --remoteresource . --include results/*.rds --endpoint %s",
accountName,
jobId,
"$AZ_BATCH_TASK_WORKING_DIR",
config$endpointSuffix
)

commands <- c(paste("blobxfer", copyCommand))
}

exitConditions <- NULL
Expand Down
2 changes: 1 addition & 1 deletion R/cluster.R
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ makeCluster <-

# install docker
containerConfiguration <- list(
type = "docker"
type = "dockerCompatible"
)

dockerImage <- "rocker/tidyverse:latest"
Expand Down
32 changes: 24 additions & 8 deletions R/doAzureParallel.R
Original file line number Diff line number Diff line change
Expand Up @@ -474,12 +474,12 @@ setHttpTraffic <- function(value = FALSE) {
storageEndpointSuffix = config$endpointSuffix)

requiredJobResourceFiles <- list(
rAzureBatch::createResourceFile(url = workerScriptUrl, fileName = "worker.R"),
rAzureBatch::createResourceFile(url = mergerScriptUrl, fileName = "merger.R"),
rAzureBatch::createResourceFile(url = installGithubScriptUrl, fileName = "install_github.R"),
rAzureBatch::createResourceFile(url = installCranScriptUrl, fileName = "install_cran.R"),
rAzureBatch::createResourceFile(url = installBioConductorScriptUrl, fileName = "install_bioconductor.R"),
rAzureBatch::createResourceFile(url = jobCommonFileUrl, fileName = jobFileName)
rAzureBatch::createResourceFile(filePath = "worker.R", httpUrl = workerScriptUrl),
rAzureBatch::createResourceFile(filePath = "merger.R", httpUrl = mergerScriptUrl),
rAzureBatch::createResourceFile(filePath = "install_github.R", httpUrl = installGithubScriptUrl),
rAzureBatch::createResourceFile(filePath = "install_cran.R", httpUrl = installCranScriptUrl),
rAzureBatch::createResourceFile(filePath = "install_bioconductor.R", httpUrl = installBioConductorScriptUrl),
rAzureBatch::createResourceFile(filePath = jobFileName, httpUrl = jobCommonFileUrl)
)

resourceFiles <-
Expand Down Expand Up @@ -669,6 +669,21 @@ setHttpTraffic <- function(value = FALSE) {
)
)

mergeReadSasToken <- storageClient$generateSasToken("rl", "c", id)
mergeResourceFileUrl <-
rAzureBatch::createBlobUrl(
storageAccount = storageClient$authentication$name,
containerName = id,
sasToken = mergeReadSasToken,
storageEndpointSuffix = config$endpointSuffix
)

mergeResources <-
list(
rAzureBatch::createResourceFile(
storageContainerUrl = mergeResourceFileUrl,
blobPrefix = "results"))

BatchUtilitiesOperations$addTask(
jobId = id,
taskId = "merge",
Expand All @@ -684,7 +699,8 @@ setHttpTraffic <- function(value = FALSE) {
dependsOn = taskDependencies,
cloudCombine = cloudCombine,
outputFiles = append(obj$options$azure$outputFiles, mergeOutput),
containerImage = data$containerImage
containerImage = data$containerImage,
resourceFiles = mergeResources
)

cat(". . .")
Expand Down Expand Up @@ -726,7 +742,7 @@ setHttpTraffic <- function(value = FALSE) {
}

if (!identical(function(a, ...) c(a, list(...)),
obj$combineInfo$fun, ignore.environment = TRUE)){
obj$combineInfo$fun, ignore.environment = TRUE)) {
tryCatch({
accumulator <- foreach::makeAccum(it)
accumulator(results, as.numeric(names(results)))
Expand Down
23 changes: 7 additions & 16 deletions R/utility-job.R
Original file line number Diff line number Diff line change
Expand Up @@ -472,19 +472,14 @@ waitForTasksToComplete <-

flush.console()

validationFlag <-
(taskCounts$validationStatus == "Validated" &&
totalTasks <= 200000) ||
totalTasks > 200000

if (taskCounts$failed > 0 &&
errorHandling == "stop" &&
validationFlag) {
errorHandling == "stop") {
cat("\n")

select <- "id, executionInfo"
filter <- "executionInfo/result eq 'failure'"
failedTasks <-
batchClient$taskOperations$list(jobId, select = select)
batchClient$taskOperations$list(jobId, select = select, filter = filter)

tasksFailureWarningLabel <-
sprintf(
Expand All @@ -498,14 +493,9 @@ waitForTasksToComplete <-
)

for (i in 1:length(failedTasks$value)) {
if (!is.null(failedTasks$value[[i]]$executionInfo$result) &&
grepl(failedTasks$value[[i]]$executionInfo$result,
"failure",
ignore.case = TRUE)) {
tasksFailureWarningLabel <-
paste0(tasksFailureWarningLabel,
sprintf("%s\n", failedTasks$value[[i]]$id))
}
}

warning(sprintf(tasksFailureWarningLabel,
Expand Down Expand Up @@ -533,9 +523,10 @@ waitForTasksToComplete <-
jobId)
}

if (taskCounts$completed >= totalTasks &&
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Counting service notoriously runs into problems. Might need to change to doing smart list calls in the future.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added job state in the check

(taskCounts$validationStatus == "Validated" ||
totalTasks >= 200000)) {
jobInfo <- getJob(jobId, verbose = FALSE)
if (taskCounts$completed >= totalTasks ||
jobInfo$state == "completed" ||
jobInfo$state == "terminating") {
cat("\n")
break
}
Expand Down
8 changes: 4 additions & 4 deletions docs/71-distributing-data.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ Here's an example that uses data stored in a public location on Azure Blob Stora
# define where to download data from
resource_files = list(
rAzureBatch::createResourceFile(
url = "https://<accountname>.blob.core.windows.net/<container>/2010.csv",
fileName = "2010.csv"
httpUrl = "https://<accountname>.blob.core.windows.net/<container>/2010.csv",
filePath = "2010.csv"
),
rAzureBatch::createResourceFile(
url = "https://<accountname>.blob.core.windows.net/<container>/2011.csv",
fileName = "2011.csv"
httpUrl = "https://<accountname>.blob.core.windows.net/<container>/2011.csv",
filePath = "2011.csv"
)
)

Expand Down
12 changes: 6 additions & 6 deletions samples/resource_files/resource_files_example.R
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ doAzureParallel::setCredentials("credentials.json")
# Using the NYC taxi datasets, http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml
azureStorageUrl <- "http://playdatastore.blob.core.windows.net/nyc-taxi-dataset"
resource_files <- list(
rAzureBatch::createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-1.csv"), fileName = "yellow_tripdata_2016-1.csv"),
rAzureBatch::createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-2.csv"), fileName = "yellow_tripdata_2016-2.csv"),
rAzureBatch::createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-3.csv"), fileName = "yellow_tripdata_2016-3.csv"),
rAzureBatch::createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-4.csv"), fileName = "yellow_tripdata_2016-4.csv"),
rAzureBatch::createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-5.csv"), fileName = "yellow_tripdata_2016-5.csv"),
rAzureBatch::createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-6.csv"), fileName = "yellow_tripdata_2016-6.csv")
rAzureBatch::createResourceFile(httpUrl = paste0(azureStorageUrl, "/yellow_tripdata_2016-1.csv"), filePath = "yellow_tripdata_2016-1.csv"),
rAzureBatch::createResourceFile(httpUrl = paste0(azureStorageUrl, "/yellow_tripdata_2016-2.csv"), filePath = "yellow_tripdata_2016-2.csv"),
rAzureBatch::createResourceFile(httpUrl = paste0(azureStorageUrl, "/yellow_tripdata_2016-3.csv"), filePath = "yellow_tripdata_2016-3.csv"),
rAzureBatch::createResourceFile(httpUrl = paste0(azureStorageUrl, "/yellow_tripdata_2016-4.csv"), filePath = "yellow_tripdata_2016-4.csv"),
rAzureBatch::createResourceFile(httpUrl = paste0(azureStorageUrl, "/yellow_tripdata_2016-5.csv"), filePath = "yellow_tripdata_2016-5.csv"),
rAzureBatch::createResourceFile(httpUrl = paste0(azureStorageUrl, "/yellow_tripdata_2016-6.csv"), filePath = "yellow_tripdata_2016-6.csv")
)

# add the parameter 'resourceFiles' to download files to nodes
Expand Down
4 changes: 2 additions & 2 deletions samples/sas_resource_files/sas_resources_files_example.R
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ csvFileUrl2 <- rAzureBatch::createBlobUrl(storageAccount = storageAccountName,
# Create a list of files to download to the cluster using read-only permissions
# Place the files in a directory called 'data'
resource_files = list(
rAzureBatch::createResourceFile(url = csvFileUrl1, fileName = "data/1989.csv"),
rAzureBatch::createResourceFile(url = csvFileUrl2, fileName = "data/1990.csv")
rAzureBatch::createResourceFile(httpUrl = csvFileUrl1, filePath = "data/1989.csv"),
rAzureBatch::createResourceFile(httpUrl = csvFileUrl2, filePath = "data/1990.csv")
)

# Create the cluster
Expand Down