From daaac6b30d095f762ef69363aa26ce2d98f485e3 Mon Sep 17 00:00:00 2001 From: zfengms Date: Fri, 12 Jan 2018 12:49:27 -0800 Subject: [PATCH 1/6] show node status in getCluster --- R/cluster.R | 79 +++++++++------------ R/utility.R | 26 ++++--- samples/long_running_job/long_running_job.R | 2 +- tests/testthat/test-async-cluster.R | 22 +++--- tests/testthat/test-live.R | 2 +- tests/testthat/test-set-credentials.R | 2 +- 6 files changed, 67 insertions(+), 66 deletions(-) diff --git a/R/cluster.R b/R/cluster.R index ff3bb56f..8e6e683e 100644 --- a/R/cluster.R +++ b/R/cluster.R @@ -239,8 +239,7 @@ makeCluster <- poolConfig$name), fill = TRUE) - while (areShallowEqual(rAzureBatch::getPool(poolConfig$name)$state, - "deleting")) { + while (rAzureBatch::getPool(poolConfig$name)$state == "deleting") { cat(".") Sys.sleep(10) } @@ -321,26 +320,9 @@ makeCluster <- if (!grepl("PoolExists", response)) { waitForNodesToComplete(poolConfig$name, 60000) } - - cat("Your cluster has been registered.", fill = TRUE) - cat(sprintf("Dedicated Node Count: %i", pool$targetDedicatedNodes), - fill = TRUE) - cat(sprintf("Low Priority Node Count: %i", pool$targetLowPriorityNodes), - fill = TRUE) - - config$poolId <- poolConfig$name - options("az_config" = config) - return(getOption("az_config")) - } else { - print( - paste0( - "Because the 'wait' parameter is set to FALSE, the returned value is cluster name ", - "Use this returned value with getCluster(clusterName) to get the cluster when the ", - "cluster is created in Azure" - ) - ) - return (poolConfig$name) } + + return(getCluster(poolConfig$name)) } #' Gets the cluster from your Azure account. @@ -352,7 +334,7 @@ makeCluster <- #' cluster <- getCluster("myCluster") #' } #' @export -getCluster <- function(clusterName) { +getCluster <- function(clusterName, verbose = TRUE) { pool <- rAzureBatch::getPool(clusterName) if (!is.null(pool$code) && !is.null(pool$message)) { @@ -360,12 +342,9 @@ getCluster <- function(clusterName) { } if (pool$targetDedicatedNodes + pool$targetLowPriorityNodes <= 0) { - stop("Pool count needs to be greater than 0.") + stop("Cluster node count needs to be greater than 0.") } - totalNodes <- - pool$targetDedicatedNodes + pool$targetLowPriorityNodes - if (!is.null(pool$resizeErrors)) { cat("\n") @@ -387,31 +366,39 @@ getCluster <- function(clusterName) { nodes <- rAzureBatch::listPoolNodes(clusterName) - currentNodeCount <- 0 if (!is.null(nodes$value) && length(nodes$value) > 0) { - nodesStatus <- .processNodeCount(nodes) - - currentNodeCount <- nodesStatus$currentNodeCount - nodesWithFailures <- nodesStatus$nodesWithFailures + nodesInfo <- .processNodeCount(nodes) + nodesState <- nodesInfo$nodesState + nodesWithFailures <- nodesInfo$nodesWithFailures + + if (verbose == TRUE) { + cat("\nnodes:", fill = TRUE) + cat(sprintf("\tidle: %s", nodesState$idle), fill = TRUE) + cat(sprintf("\tcreating: %s", nodesState$creating), fill = TRUE) + cat(sprintf("\tstarting: %s", nodesState$starting), fill = TRUE) + cat(sprintf("\twaitingforstarttask: %s", nodesState$waitingforstarttask), fill = TRUE) + cat(sprintf("\tstarttaskfailed: %s", nodesState$starttaskfailed), fill = TRUE) + cat(sprintf("\tpreempted: %s", nodesState$preempted), fill = TRUE) + cat(sprintf("\trunning: %s", nodesState$running), fill = TRUE) + } .showNodesFailure(nodesWithFailures) } - if (currentNodeCount >= totalNodes) { - config <- getOption("az_config") - cat("Your cluster has been registered.", fill = TRUE) - cat(sprintf("Dedicated Node Count: %i", pool$targetDedicatedNodes), - fill = TRUE) - cat(sprintf("Low Priority Node Count: %i", pool$targetLowPriorityNodes), - fill = TRUE) - - config$poolId <- clusterName - options("az_config" = config) - return(getOption("az_config")) - } else { - cat("Your cluster is not ready yet.", fill = TRUE) - return (NULL) - } + cat("Your cluster has been registered.", fill = TRUE) + + config <- getOption("az_config") + config$nodesState <- nodesState + config$targetDedicatedNodes <- pool$targetDedicatedNodes + config$targetLowPriorityNodes <- pool$targetLowPriorityNodes + cat(sprintf("Dedicated Node Count: %i", pool$targetDedicatedNodes), + fill = TRUE) + cat(sprintf("Low Priority Node Count: %i", pool$targetLowPriorityNodes), + fill = TRUE) + + config$poolId <- clusterName + options("az_config" = config) + return(getOption("az_config")) } #' Deletes the cluster from your Azure account. diff --git a/R/utility.R b/R/utility.R index c6d1cd04..e43a72f8 100644 --- a/R/utility.R +++ b/R/utility.R @@ -56,10 +56,10 @@ waitForNodesToComplete <- function(poolId, timeout = 86400) { nodes <- rAzureBatch::listPoolNodes(poolId) if (!is.null(nodes$value) && length(nodes$value) > 0) { - nodesStatus <- .processNodeCount(nodes) + nodesInfo <- .processNodeCount(nodes) - currentProgressBarCount <- nodesStatus$currentNodeCount - nodesWithFailures <- nodesStatus$nodesWithFailures + currentProgressBarCount <- nodesInfo$currentNodeCount + nodesWithFailures <- nodesInfo$nodesWithFailures if (currentProgressBarCount >= pb$getVal()) { setTxtProgressBar(pb, currentProgressBarCount) @@ -83,7 +83,19 @@ waitForNodesToComplete <- function(poolId, timeout = 86400) { .processNodeCount <- function(nodes) { nodesWithFailures <- c() currentNodeCount <- 0 + nodesState <- list( + idle = as.integer(0), + creating = as.integer(0), + starting = as.integer(0), + waitingforstarttask = as.integer(0), + starttaskfailed = as.integer(0), + preempted = as.integer(0), + running = as.integer(0) + ) + for (i in 1:length(nodes$value)) { + nodesState[[nodes$value[[i]]$state]] = nodesState[[nodes$value[[i]]$state]] + as.integer(1) + # The progress total count is the number of the nodes. Each node counts as 1. # If a node is not in idle, prempted, running, or start task failed, the value is # less than 1. The default value is 0 because the node has not been allocated to @@ -99,7 +111,7 @@ waitForNodesToComplete <- function(poolId, timeout = 86400) { "starting" = { 0.50 }, - "waitingforstartask" = { + "waitingforstarttask" = { 0.75 }, "starttaskfailed" = { @@ -118,7 +130,7 @@ waitForNodesToComplete <- function(poolId, timeout = 86400) { currentNodeCount <- currentNodeCount + nodeValue } - return(list(currentNodeCount = currentNodeCount, nodesWithFailures = nodesWithFailures)) + return(list(currentNodeCount = currentNodeCount, nodesWithFailures = nodesWithFailures, nodesState = nodesState)) } .showNodesFailure <- function(nodesWithFailures) { @@ -250,7 +262,3 @@ readMetadataBlob <- function(jobId) { return(NULL) } } - -areShallowEqual <- function(a, b) { - !is.null(a) && !is.null(b) && a == b -} diff --git a/samples/long_running_job/long_running_job.R b/samples/long_running_job/long_running_job.R index 4f7b7247..c98b003d 100644 --- a/samples/long_running_job/long_running_job.R +++ b/samples/long_running_job/long_running_job.R @@ -11,7 +11,7 @@ install_github("azure/doazureparallel") library(doAzureParallel) credentialsFileName <- "credentials.json" -clusterFileName <- "test_cluster.json" +clusterFileName <- "cluster.json" # generate a credentials json file generateCredentialsConfig(credentialsFileName) diff --git a/tests/testthat/test-async-cluster.R b/tests/testthat/test-async-cluster.R index 838a95fc..833b831f 100644 --- a/tests/testthat/test-async-cluster.R +++ b/tests/testthat/test-async-cluster.R @@ -5,7 +5,7 @@ test_that("Async cluster scenario test", { testthat::skip("Live test") testthat::skip_on_travis() credentialsFileName <- "credentials.json" - clusterFileName <- "test_cluster.json" + clusterFileName <- "cluster.json" doAzureParallel::generateCredentialsConfig(credentialsFileName) doAzureParallel::generateClusterConfig(clusterFileName) @@ -13,16 +13,22 @@ test_that("Async cluster scenario test", { # set your credentials doAzureParallel::setCredentials(credentialsFileName) - clusterName <- + cluster <- doAzureParallel::makeCluster(clusterSetting = clusterFileName, wait = FALSE) - while (is.null(getCluster(clusterName))) { - Sys.sleep(30) - } - - cat("\ncluster is ready") - cluster <- getCluster(clusterName) + cluster <- getCluster(cluster$poolId) doAzureParallel::registerDoAzureParallel(cluster) + '%dopar%' <- foreach::'%dopar%' + res <- + foreach::foreach(i = 1:4) %dopar% { + mean(1:3) + } + + res + + testthat::expect_equal(length(res), 4) + testthat::expect_equal(res, list(2, 2, 2, 2)) + stopCluster(cluster) }) diff --git a/tests/testthat/test-live.R b/tests/testthat/test-live.R index d1f492fb..b4d2d4df 100644 --- a/tests/testthat/test-live.R +++ b/tests/testthat/test-live.R @@ -5,7 +5,7 @@ test_that("Basic scenario test", { testthat::skip("Live test") testthat::skip_on_travis() credentialsFileName <- "credentials.json" - clusterFileName <- "test_cluster.json" + clusterFileName <- "cluster.json" doAzureParallel::generateCredentialsConfig(credentialsFileName) doAzureParallel::generateClusterConfig(clusterFileName) diff --git a/tests/testthat/test-set-credentials.R b/tests/testthat/test-set-credentials.R index 45587c95..565d4363 100644 --- a/tests/testthat/test-set-credentials.R +++ b/tests/testthat/test-set-credentials.R @@ -49,7 +49,7 @@ test_that("set credentials/cluster config from Json file scenario test", { testthat::skip_on_travis() credentialsFileName <- "credentials.json" - clusterFileName <- "test_cluster.json" + clusterFileName <- "cluster.json" doAzureParallel::generateCredentialsConfig(credentialsFileName) doAzureParallel::generateClusterConfig(clusterFileName) From c1cc65c15914e251ae5464816aa185f28eec77e4 Mon Sep 17 00:00:00 2001 From: zfengms Date: Fri, 12 Jan 2018 14:44:29 -0800 Subject: [PATCH 2/6] workaround error --- R/cluster.R | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/R/cluster.R b/R/cluster.R index 8e6e683e..05a36cd4 100644 --- a/R/cluster.R +++ b/R/cluster.R @@ -322,7 +322,14 @@ makeCluster <- } } - return(getCluster(poolConfig$name)) + cat("Your cluster has been registered.", fill = TRUE) + cat(sprintf("Dedicated Node Count: %i", pool$targetDedicatedNodes), + fill = TRUE) + cat(sprintf("Low Priority Node Count: %i", pool$targetLowPriorityNodes), + fill = TRUE) + config$poolId <- poolConfig$name + options("az_config" = config) + return(getOption("az_config")) } #' Gets the cluster from your Azure account. @@ -388,7 +395,6 @@ getCluster <- function(clusterName, verbose = TRUE) { cat("Your cluster has been registered.", fill = TRUE) config <- getOption("az_config") - config$nodesState <- nodesState config$targetDedicatedNodes <- pool$targetDedicatedNodes config$targetLowPriorityNodes <- pool$targetLowPriorityNodes cat(sprintf("Dedicated Node Count: %i", pool$targetDedicatedNodes), @@ -398,7 +404,7 @@ getCluster <- function(clusterName, verbose = TRUE) { config$poolId <- clusterName options("az_config" = config) - return(getOption("az_config")) + return (config) } #' Deletes the cluster from your Azure account. From 739796574a6b06125e584967e97d835ef3e25709 Mon Sep 17 00:00:00 2001 From: zfengms Date: Tue, 16 Jan 2018 13:54:09 -0800 Subject: [PATCH 3/6] minor fixes --- R/cluster.R | 3 +-- R/utility.R | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/R/cluster.R b/R/cluster.R index 05a36cd4..f5c86cc8 100644 --- a/R/cluster.R +++ b/R/cluster.R @@ -255,8 +255,7 @@ makeCluster <- ) } else { stop(sprintf(message, - poolConfig$name), - fill = TRUE) + poolConfig$name)) } } diff --git a/R/utility.R b/R/utility.R index e43a72f8..63d637a3 100644 --- a/R/utility.R +++ b/R/utility.R @@ -94,7 +94,7 @@ waitForNodesToComplete <- function(poolId, timeout = 86400) { ) for (i in 1:length(nodes$value)) { - nodesState[[nodes$value[[i]]$state]] = nodesState[[nodes$value[[i]]$state]] + as.integer(1) + nodesState[[nodes$value[[i]]$state]] <- nodesState[[nodes$value[[i]]$state]] + as.integer(1) # The progress total count is the number of the nodes. Each node counts as 1. # If a node is not in idle, prempted, running, or start task failed, the value is From eec93f1e282d5c1c9f07c78798487c2f0c99184e Mon Sep 17 00:00:00 2001 From: zfengms Date: Tue, 16 Jan 2018 16:22:44 -0800 Subject: [PATCH 4/6] add getClusterList api --- DESCRIPTION | 2 +- NAMESPACE | 1 + R/cluster.R | 71 +++++++++++++++++++++++++++++++++++++++++++ man/getCluster.Rd | 2 +- man/getClusterList.Rd | 19 ++++++++++++ 5 files changed, 93 insertions(+), 2 deletions(-) create mode 100644 man/getClusterList.Rd diff --git a/DESCRIPTION b/DESCRIPTION index 3063924e..5102d952 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -27,5 +27,5 @@ Suggests: caret, plyr, lintr -Remotes: Azure/rAzureBatch@v0.5.4 +Remotes: Azure/rAzureBatch@v0.5.5 RoxygenNote: 6.0.1 diff --git a/NAMESPACE b/NAMESPACE index 3c4950a8..c2bbb1a0 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -8,6 +8,7 @@ export(generateClusterConfig) export(generateCredentialsConfig) export(getCluster) export(getClusterFile) +export(getClusterList) export(getJob) export(getJobFile) export(getJobList) diff --git a/R/cluster.R b/R/cluster.R index f5c86cc8..d61e56fe 100644 --- a/R/cluster.R +++ b/R/cluster.R @@ -406,6 +406,77 @@ getCluster <- function(clusterName, verbose = TRUE) { return (config) } +#' Get a list of clusters by state from the given filter +#' +#' @param filter A filter containing cluster state +#' +#' @examples +#' \dontrun{ +#' getClusterList() +#' } +#' @export +getClusterList <- function(filter = NULL) { + filterClause <- "" + + if (!is.null(filter)) { + if (!is.null(filter$state)) { + for (i in 1:length(filter$state)) { + filterClause <- + paste0(filterClause, + sprintf("state eq '%s'", filter$state[i]), + " or ") + } + + filterClause <- + substr(filterClause, 1, nchar(filterClause) - 3) + } + } + + pools <- + rAzureBatch::listPools( + query = list("$filter" = filterClause, "$select" = "id,state,allocationState,vmSize,currentDedicatedNodes,targetDedicatedNodes,currentLowPriorityNodes,targetLowPriorityNodes") + ) + + count <- length(pools$value) + id <- character(count) + state <- character(count) + allocationState <- character(count) + vmSize <- integer(count) + currentDedicatedNodes <- integer(count) + targetDedicatedNodes <- integer(count) + currentLowPriorityNodes <- integer(count) + targetLowPriorityNodes <- integer(count) + + if (count > 0) { + if (is.null(pools$value[[1]]$id)) { + stop(pools$value) + } + for (j in 1:length(pools$value)) { + id[j] <- pools$value[[j]]$id + state[j] <- pools$value[[j]]$state + allocationState[j] <- pools$value[[j]]$allocationState + vmSize[j] <- pools$value[[j]]$vmSize + currentDedicatedNodes[j] <- pools$value[[j]]$currentDedicatedNodes + targetDedicatedNodes[j] <- pools$value[[j]]$targetDedicatedNodes + currentLowPriorityNodes[j] <- pools$value[[j]]$currentLowPriorityNodes + targetLowPriorityNodes[j] <- pools$value[[j]]$targetLowPriorityNodes + } + } + + return ( + data.frame( + Id = id, + State = state, + AllocationState = allocationState, + VmSize = vmSize, + CurrentDedicatedNodes = currentDedicatedNodes, + targetDedicatedNodes = targetDedicatedNodes, + currentLowPriorityNodes = currentLowPriorityNodes, + targetLowPriorityNodes = targetLowPriorityNodes + ) + ) +} + #' Deletes the cluster from your Azure account. #' #' @param cluster The cluster configuration that was created in \code{makeCluster} diff --git a/man/getCluster.Rd b/man/getCluster.Rd index aa4546ea..3053a263 100644 --- a/man/getCluster.Rd +++ b/man/getCluster.Rd @@ -4,7 +4,7 @@ \alias{getCluster} \title{Gets the cluster from your Azure account.} \usage{ -getCluster(clusterName) +getCluster(clusterName, verbose = TRUE) } \arguments{ \item{clusterName}{The cluster configuration that was created in \code{makeCluster}} diff --git a/man/getClusterList.Rd b/man/getClusterList.Rd new file mode 100644 index 00000000..605a1149 --- /dev/null +++ b/man/getClusterList.Rd @@ -0,0 +1,19 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/cluster.R +\name{getClusterList} +\alias{getClusterList} +\title{Get a list of clusters by state from the given filter} +\usage{ +getClusterList(filter = NULL) +} +\arguments{ +\item{filter}{A filter containing cluster state} +} +\description{ +Get a list of clusters by state from the given filter +} +\examples{ +\dontrun{ +getClusterList() +} +} From 8e0681e0d6e18144e2f086f289361373e12bfdc0 Mon Sep 17 00:00:00 2001 From: zfengms Date: Wed, 17 Jan 2018 12:48:57 -0800 Subject: [PATCH 5/6] add metadata to pool indicating pool is created by doAzureParallel --- R/cluster.R | 6 +++++- R/helpers.R | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/R/cluster.R b/R/cluster.R index d61e56fe..42f34c03 100644 --- a/R/cluster.R +++ b/R/cluster.R @@ -434,7 +434,11 @@ getClusterList <- function(filter = NULL) { pools <- rAzureBatch::listPools( - query = list("$filter" = filterClause, "$select" = "id,state,allocationState,vmSize,currentDedicatedNodes,targetDedicatedNodes,currentLowPriorityNodes,targetLowPriorityNodes") + query = list( + "$filter" = filterClause, + "$select" = "id,state,allocationState,vmSize,currentDedicatedNodes" + + ",targetDedicatedNodes,currentLowPriorityNodes,targetLowPriorityNodes" + ) ) count <- length(pools$value) diff --git a/R/helpers.R b/R/helpers.R index b089faf7..27178ed2 100644 --- a/R/helpers.R +++ b/R/helpers.R @@ -248,6 +248,7 @@ startTask = startTask, virtualMachineConfiguration = virtualMachineConfiguration, enableAutoScale = TRUE, + metadata = list(list(name = "origin", value = "doAzureParallel")), autoscaleFormula = getAutoscaleFormula( pool$poolSize$autoscaleFormula, pool$poolSize$dedicatedNodes$min, From 3b4c3e8d82f8ab500dd1d0c77a8882f22126d947 Mon Sep 17 00:00:00 2001 From: zfengms Date: Wed, 17 Jan 2018 17:40:32 -0800 Subject: [PATCH 6/6] add test for getClusterList(), add 'other' state for nodes for getCluster() --- R/cluster.R | 1 + R/utility.R | 10 ++++++++-- tests/testthat/test-async-cluster.R | 4 ++++ 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/R/cluster.R b/R/cluster.R index 42f34c03..51e4f3a1 100644 --- a/R/cluster.R +++ b/R/cluster.R @@ -386,6 +386,7 @@ getCluster <- function(clusterName, verbose = TRUE) { cat(sprintf("\tstarttaskfailed: %s", nodesState$starttaskfailed), fill = TRUE) cat(sprintf("\tpreempted: %s", nodesState$preempted), fill = TRUE) cat(sprintf("\trunning: %s", nodesState$running), fill = TRUE) + cat(sprintf("\tother: %s", nodesState$other), fill = TRUE) } .showNodesFailure(nodesWithFailures) diff --git a/R/utility.R b/R/utility.R index 63d637a3..86942eb1 100644 --- a/R/utility.R +++ b/R/utility.R @@ -90,11 +90,17 @@ waitForNodesToComplete <- function(poolId, timeout = 86400) { waitingforstarttask = as.integer(0), starttaskfailed = as.integer(0), preempted = as.integer(0), - running = as.integer(0) + running = as.integer(0), + other = as.integer(0) ) for (i in 1:length(nodes$value)) { - nodesState[[nodes$value[[i]]$state]] <- nodesState[[nodes$value[[i]]$state]] + as.integer(1) + state <- nodes$value[[i]]$state + if (is.null(nodesState[[state]])) { + nodesState[["other"]] <- nodesState[["other"]] + 1 + } else { + nodesState[[state]] <- nodesState[[state]] + as.integer(1) + } # The progress total count is the number of the nodes. Each node counts as 1. # If a node is not in idle, prempted, running, or start task failed, the value is diff --git a/tests/testthat/test-async-cluster.R b/tests/testthat/test-async-cluster.R index 833b831f..020f98e8 100644 --- a/tests/testthat/test-async-cluster.R +++ b/tests/testthat/test-async-cluster.R @@ -17,6 +17,10 @@ test_that("Async cluster scenario test", { doAzureParallel::makeCluster(clusterSetting = clusterFileName, wait = FALSE) cluster <- getCluster(cluster$poolId) + getClusterList() + filter <- filter <- list() + filter$state <- c("active", "deleting") + getClusterList(filter) doAzureParallel::registerDoAzureParallel(cluster) '%dopar%' <- foreach::'%dopar%'