@@ -186,6 +186,9 @@ waitForTasksToComplete <-
186186 totalTasks <- 0
187187 currentTasks <- rAzureBatch :: listTask(jobId )
188188
189+ jobInfo <- getJob(jobId , verbose = FALSE )
190+ enableCloudCombine <- as.logical(jobInfo $ metadata $ enableCloudCombine )
191+
189192 if (is.null(currentTasks $ value )) {
190193 stop(paste0(" Error: " , currentTasks $ message $ value ))
191194 return ()
@@ -213,34 +216,42 @@ waitForTasksToComplete <-
213216 totalTasks <- totalTasks + length(currentTasks $ value )
214217 }
215218
219+ if (enableCloudCombine ) {
220+ totalTasks <- totalTasks - 1
221+ }
222+
216223 timeToTimeout <- Sys.time() + timeout
217224
218225 repeat {
219226 taskCounts <- rAzureBatch :: getJobTaskCounts(jobId )
220- progressBarValue <- round(taskCounts $ completed / totalTasks * getOption(" width" ))
221227
222- if (taskCounts $ completed == totalTasks - 1 ) {
223- status <- " Tasks have completed. Merging results"
224- }
225- else {
226- status <- " "
228+ # Assumption: Merge task will always be the last one in the queue
229+ if (enableCloudCombine ) {
230+ if (taskCounts $ completed > totalTasks ) {
231+ taskCounts $ completed <- totalTasks
232+ }
233+
234+ if (taskCounts $ active > = 1 ) {
235+ taskCounts $ active <- taskCounts $ active - 1
236+ }
227237 }
228238
229- outputProgressBar <- sprintf(" |%s%s|" ,
230- strrep(" =" , progressBarValue ),
231- strrep(" " , getOption(" width" ) - progressBarValue ))
232- outputTaskCount <- sprintf(" %s (%s/%s)" ,
233- sprintf(" %.2f%%" , (taskCounts $ completed / totalTasks ) * 100 ),
234- taskCounts $ completed ,
235- totalTasks )
236- outputTaskCount <- sprintf(" %s %s" ,
237- outputTaskCount ,
238- strrep(" " , getOption(" width" ) - nchar(as.character(outputTaskCount ))))
239-
240- cat(' \r ' , sprintf(" %s %s %s" ,
241- outputProgressBar ,
242- outputTaskCount ,
243- status ))
239+ runningOutput <- paste0(" Running: " , taskCounts $ running )
240+ queueOutput <- paste0(" Queue: " , taskCounts $ active )
241+ completedOutput <- paste0(" Completed: " , taskCounts $ completed )
242+ failedOutput <- paste0(" Failed: " , taskCounts $ failed )
243+
244+ cat(" \r " ,
245+ sprintf(" | %s | %s | %s | %s | %s |" ,
246+ paste0(" Progress: " , sprintf(" %.2f%% (%s/%s)" , (taskCounts $ completed / totalTasks ) * 100 ,
247+ taskCounts $ completed ,
248+ totalTasks )),
249+ runningOutput ,
250+ queueOutput ,
251+ completedOutput ,
252+ failedOutput ),
253+ sep = " " )
254+
244255 flush.console()
245256
246257 validationFlag <-
@@ -269,7 +280,8 @@ waitForTasksToComplete <-
269280 )
270281
271282 for (i in 1 : length(failedTasks $ value )) {
272- if (failedTasks $ value [[i ]]$ executionInfo $ result == " Failure" ) {
283+ if (! is.null(failedTasks $ value [[i ]]$ executionInfo $ result ) &&
284+ failedTasks $ value [[i ]]$ executionInfo $ result == " Failure" ) {
273285 tasksFailureWarningLabel <-
274286 paste0(tasksFailureWarningLabel ,
275287 sprintf(" %s\n " , failedTasks $ value [[i ]]$ id ))
@@ -283,18 +295,7 @@ waitForTasksToComplete <-
283295 httr :: stop_for_status(response )
284296
285297 stop(sprintf(
286- paste(
287- " Errors have occurred while running the job '%s'." ,
288- " Error handling is set to 'stop' and has proceeded to terminate the job." ,
289- " The user will have to handle deleting the job." ,
290- " If this is not the correct behavior, change the errorHandling property to 'pass'" ,
291- " or 'remove' in the foreach object. Use the 'getJobFile' function to obtain the logs." ,
292- " For more information about getting job logs, follow this link:" ,
293- paste0(
294- " https://github.com/Azure/doAzureParallel/blob/master/docs/" ,
295- " 40-troubleshooting.md#viewing-files-directly-from-compute-node"
296- )
297- ),
298+ getTaskFailedErrorString(" Errors have occurred while running the job '%s'." ),
298299 jobId
299300 ))
300301 }
@@ -316,11 +317,48 @@ waitForTasksToComplete <-
316317 (taskCounts $ validationStatus == " Validated" ||
317318 totalTasks > = 200000 )) {
318319 cat(" \n " )
319- return ( 0 )
320+ break
320321 }
321322
322323 Sys.sleep(10 )
323324 }
325+
326+ cat(" Tasks have completed. " )
327+ if (enableCloudCombine ) {
328+ cat(" Merging results" )
329+
330+ # Wait for merge task to complete
331+ repeat {
332+ # Verify that the merge cloud task didn't have any errors
333+ mergeTask <- rAzureBatch :: getTask(jobId , paste0(jobId , " -merge" ))
334+
335+ # This test needs to go first as Batch service will not return an execution info as null
336+ if (is.null(mergeTask $ executionInfo $ result )) {
337+ cat(" ." )
338+ Sys.sleep(5 )
339+ next
340+ }
341+
342+ if (mergeTask $ executionInfo $ result == " Success" ) {
343+ break
344+ }
345+ else {
346+ rAzureBatch :: terminateJob(jobId )
347+
348+ # The foreach will not be able to run properly if the merge task fails
349+ # Stopping the user from processing a merge task that has failed
350+ stop(sprintf(
351+ getTaskFailedErrorString(" An error has occurred in the merge task of the job '%s'." ),
352+ jobId
353+ ))
354+ }
355+
356+ cat(" ." )
357+ Sys.sleep(5 )
358+ }
359+ }
360+
361+ cat(" \n " )
324362 }
325363
326364waitForJobPreparation <- function (jobId , poolId ) {
0 commit comments