diff --git a/DESCRIPTION b/DESCRIPTION index 3063924e..5102d952 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -27,5 +27,5 @@ Suggests: caret, plyr, lintr -Remotes: Azure/rAzureBatch@v0.5.4 +Remotes: Azure/rAzureBatch@v0.5.5 RoxygenNote: 6.0.1 diff --git a/NAMESPACE b/NAMESPACE index 3c4950a8..c2bbb1a0 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -8,6 +8,7 @@ export(generateClusterConfig) export(generateCredentialsConfig) export(getCluster) export(getClusterFile) +export(getClusterList) export(getJob) export(getJobFile) export(getJobList) diff --git a/R/cluster.R b/R/cluster.R index ff3bb56f..51e4f3a1 100644 --- a/R/cluster.R +++ b/R/cluster.R @@ -239,8 +239,7 @@ makeCluster <- poolConfig$name), fill = TRUE) - while (areShallowEqual(rAzureBatch::getPool(poolConfig$name)$state, - "deleting")) { + while (rAzureBatch::getPool(poolConfig$name)$state == "deleting") { cat(".") Sys.sleep(10) } @@ -256,8 +255,7 @@ makeCluster <- ) } else { stop(sprintf(message, - poolConfig$name), - fill = TRUE) + poolConfig$name)) } } @@ -321,26 +319,16 @@ makeCluster <- if (!grepl("PoolExists", response)) { waitForNodesToComplete(poolConfig$name, 60000) } - - cat("Your cluster has been registered.", fill = TRUE) - cat(sprintf("Dedicated Node Count: %i", pool$targetDedicatedNodes), - fill = TRUE) - cat(sprintf("Low Priority Node Count: %i", pool$targetLowPriorityNodes), - fill = TRUE) - - config$poolId <- poolConfig$name - options("az_config" = config) - return(getOption("az_config")) - } else { - print( - paste0( - "Because the 'wait' parameter is set to FALSE, the returned value is cluster name ", - "Use this returned value with getCluster(clusterName) to get the cluster when the ", - "cluster is created in Azure" - ) - ) - return (poolConfig$name) } + + cat("Your cluster has been registered.", fill = TRUE) + cat(sprintf("Dedicated Node Count: %i", pool$targetDedicatedNodes), + fill = TRUE) + cat(sprintf("Low Priority Node Count: %i", pool$targetLowPriorityNodes), + fill = TRUE) + config$poolId <- poolConfig$name + options("az_config" = config) + return(getOption("az_config")) } #' Gets the cluster from your Azure account. @@ -352,7 +340,7 @@ makeCluster <- #' cluster <- getCluster("myCluster") #' } #' @export -getCluster <- function(clusterName) { +getCluster <- function(clusterName, verbose = TRUE) { pool <- rAzureBatch::getPool(clusterName) if (!is.null(pool$code) && !is.null(pool$message)) { @@ -360,12 +348,9 @@ getCluster <- function(clusterName) { } if (pool$targetDedicatedNodes + pool$targetLowPriorityNodes <= 0) { - stop("Pool count needs to be greater than 0.") + stop("Cluster node count needs to be greater than 0.") } - totalNodes <- - pool$targetDedicatedNodes + pool$targetLowPriorityNodes - if (!is.null(pool$resizeErrors)) { cat("\n") @@ -387,31 +372,114 @@ getCluster <- function(clusterName) { nodes <- rAzureBatch::listPoolNodes(clusterName) - currentNodeCount <- 0 if (!is.null(nodes$value) && length(nodes$value) > 0) { - nodesStatus <- .processNodeCount(nodes) - - currentNodeCount <- nodesStatus$currentNodeCount - nodesWithFailures <- nodesStatus$nodesWithFailures + nodesInfo <- .processNodeCount(nodes) + nodesState <- nodesInfo$nodesState + nodesWithFailures <- nodesInfo$nodesWithFailures + + if (verbose == TRUE) { + cat("\nnodes:", fill = TRUE) + cat(sprintf("\tidle: %s", nodesState$idle), fill = TRUE) + cat(sprintf("\tcreating: %s", nodesState$creating), fill = TRUE) + cat(sprintf("\tstarting: %s", nodesState$starting), fill = TRUE) + cat(sprintf("\twaitingforstarttask: %s", nodesState$waitingforstarttask), fill = TRUE) + cat(sprintf("\tstarttaskfailed: %s", nodesState$starttaskfailed), fill = TRUE) + cat(sprintf("\tpreempted: %s", nodesState$preempted), fill = TRUE) + cat(sprintf("\trunning: %s", nodesState$running), fill = TRUE) + cat(sprintf("\tother: %s", nodesState$other), fill = TRUE) + } .showNodesFailure(nodesWithFailures) } - if (currentNodeCount >= totalNodes) { - config <- getOption("az_config") - cat("Your cluster has been registered.", fill = TRUE) - cat(sprintf("Dedicated Node Count: %i", pool$targetDedicatedNodes), - fill = TRUE) - cat(sprintf("Low Priority Node Count: %i", pool$targetLowPriorityNodes), - fill = TRUE) + cat("Your cluster has been registered.", fill = TRUE) - config$poolId <- clusterName - options("az_config" = config) - return(getOption("az_config")) - } else { - cat("Your cluster is not ready yet.", fill = TRUE) - return (NULL) + config <- getOption("az_config") + config$targetDedicatedNodes <- pool$targetDedicatedNodes + config$targetLowPriorityNodes <- pool$targetLowPriorityNodes + cat(sprintf("Dedicated Node Count: %i", pool$targetDedicatedNodes), + fill = TRUE) + cat(sprintf("Low Priority Node Count: %i", pool$targetLowPriorityNodes), + fill = TRUE) + + config$poolId <- clusterName + options("az_config" = config) + return (config) +} + +#' Get a list of clusters by state from the given filter +#' +#' @param filter A filter containing cluster state +#' +#' @examples +#' \dontrun{ +#' getClusterList() +#' } +#' @export +getClusterList <- function(filter = NULL) { + filterClause <- "" + + if (!is.null(filter)) { + if (!is.null(filter$state)) { + for (i in 1:length(filter$state)) { + filterClause <- + paste0(filterClause, + sprintf("state eq '%s'", filter$state[i]), + " or ") + } + + filterClause <- + substr(filterClause, 1, nchar(filterClause) - 3) + } } + + pools <- + rAzureBatch::listPools( + query = list( + "$filter" = filterClause, + "$select" = "id,state,allocationState,vmSize,currentDedicatedNodes" + + ",targetDedicatedNodes,currentLowPriorityNodes,targetLowPriorityNodes" + ) + ) + + count <- length(pools$value) + id <- character(count) + state <- character(count) + allocationState <- character(count) + vmSize <- integer(count) + currentDedicatedNodes <- integer(count) + targetDedicatedNodes <- integer(count) + currentLowPriorityNodes <- integer(count) + targetLowPriorityNodes <- integer(count) + + if (count > 0) { + if (is.null(pools$value[[1]]$id)) { + stop(pools$value) + } + for (j in 1:length(pools$value)) { + id[j] <- pools$value[[j]]$id + state[j] <- pools$value[[j]]$state + allocationState[j] <- pools$value[[j]]$allocationState + vmSize[j] <- pools$value[[j]]$vmSize + currentDedicatedNodes[j] <- pools$value[[j]]$currentDedicatedNodes + targetDedicatedNodes[j] <- pools$value[[j]]$targetDedicatedNodes + currentLowPriorityNodes[j] <- pools$value[[j]]$currentLowPriorityNodes + targetLowPriorityNodes[j] <- pools$value[[j]]$targetLowPriorityNodes + } + } + + return ( + data.frame( + Id = id, + State = state, + AllocationState = allocationState, + VmSize = vmSize, + CurrentDedicatedNodes = currentDedicatedNodes, + targetDedicatedNodes = targetDedicatedNodes, + currentLowPriorityNodes = currentLowPriorityNodes, + targetLowPriorityNodes = targetLowPriorityNodes + ) + ) } #' Deletes the cluster from your Azure account. diff --git a/R/helpers.R b/R/helpers.R index b089faf7..27178ed2 100644 --- a/R/helpers.R +++ b/R/helpers.R @@ -248,6 +248,7 @@ startTask = startTask, virtualMachineConfiguration = virtualMachineConfiguration, enableAutoScale = TRUE, + metadata = list(list(name = "origin", value = "doAzureParallel")), autoscaleFormula = getAutoscaleFormula( pool$poolSize$autoscaleFormula, pool$poolSize$dedicatedNodes$min, diff --git a/R/utility.R b/R/utility.R index c6d1cd04..86942eb1 100644 --- a/R/utility.R +++ b/R/utility.R @@ -56,10 +56,10 @@ waitForNodesToComplete <- function(poolId, timeout = 86400) { nodes <- rAzureBatch::listPoolNodes(poolId) if (!is.null(nodes$value) && length(nodes$value) > 0) { - nodesStatus <- .processNodeCount(nodes) + nodesInfo <- .processNodeCount(nodes) - currentProgressBarCount <- nodesStatus$currentNodeCount - nodesWithFailures <- nodesStatus$nodesWithFailures + currentProgressBarCount <- nodesInfo$currentNodeCount + nodesWithFailures <- nodesInfo$nodesWithFailures if (currentProgressBarCount >= pb$getVal()) { setTxtProgressBar(pb, currentProgressBarCount) @@ -83,7 +83,25 @@ waitForNodesToComplete <- function(poolId, timeout = 86400) { .processNodeCount <- function(nodes) { nodesWithFailures <- c() currentNodeCount <- 0 + nodesState <- list( + idle = as.integer(0), + creating = as.integer(0), + starting = as.integer(0), + waitingforstarttask = as.integer(0), + starttaskfailed = as.integer(0), + preempted = as.integer(0), + running = as.integer(0), + other = as.integer(0) + ) + for (i in 1:length(nodes$value)) { + state <- nodes$value[[i]]$state + if (is.null(nodesState[[state]])) { + nodesState[["other"]] <- nodesState[["other"]] + 1 + } else { + nodesState[[state]] <- nodesState[[state]] + as.integer(1) + } + # The progress total count is the number of the nodes. Each node counts as 1. # If a node is not in idle, prempted, running, or start task failed, the value is # less than 1. The default value is 0 because the node has not been allocated to @@ -99,7 +117,7 @@ waitForNodesToComplete <- function(poolId, timeout = 86400) { "starting" = { 0.50 }, - "waitingforstartask" = { + "waitingforstarttask" = { 0.75 }, "starttaskfailed" = { @@ -118,7 +136,7 @@ waitForNodesToComplete <- function(poolId, timeout = 86400) { currentNodeCount <- currentNodeCount + nodeValue } - return(list(currentNodeCount = currentNodeCount, nodesWithFailures = nodesWithFailures)) + return(list(currentNodeCount = currentNodeCount, nodesWithFailures = nodesWithFailures, nodesState = nodesState)) } .showNodesFailure <- function(nodesWithFailures) { @@ -250,7 +268,3 @@ readMetadataBlob <- function(jobId) { return(NULL) } } - -areShallowEqual <- function(a, b) { - !is.null(a) && !is.null(b) && a == b -} diff --git a/man/getCluster.Rd b/man/getCluster.Rd index aa4546ea..3053a263 100644 --- a/man/getCluster.Rd +++ b/man/getCluster.Rd @@ -4,7 +4,7 @@ \alias{getCluster} \title{Gets the cluster from your Azure account.} \usage{ -getCluster(clusterName) +getCluster(clusterName, verbose = TRUE) } \arguments{ \item{clusterName}{The cluster configuration that was created in \code{makeCluster}} diff --git a/man/getClusterList.Rd b/man/getClusterList.Rd new file mode 100644 index 00000000..605a1149 --- /dev/null +++ b/man/getClusterList.Rd @@ -0,0 +1,19 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/cluster.R +\name{getClusterList} +\alias{getClusterList} +\title{Get a list of clusters by state from the given filter} +\usage{ +getClusterList(filter = NULL) +} +\arguments{ +\item{filter}{A filter containing cluster state} +} +\description{ +Get a list of clusters by state from the given filter +} +\examples{ +\dontrun{ +getClusterList() +} +} diff --git a/samples/long_running_job/long_running_job.R b/samples/long_running_job/long_running_job.R index 4f7b7247..c98b003d 100644 --- a/samples/long_running_job/long_running_job.R +++ b/samples/long_running_job/long_running_job.R @@ -11,7 +11,7 @@ install_github("azure/doazureparallel") library(doAzureParallel) credentialsFileName <- "credentials.json" -clusterFileName <- "test_cluster.json" +clusterFileName <- "cluster.json" # generate a credentials json file generateCredentialsConfig(credentialsFileName) diff --git a/tests/testthat/test-async-cluster.R b/tests/testthat/test-async-cluster.R index 838a95fc..020f98e8 100644 --- a/tests/testthat/test-async-cluster.R +++ b/tests/testthat/test-async-cluster.R @@ -5,7 +5,7 @@ test_that("Async cluster scenario test", { testthat::skip("Live test") testthat::skip_on_travis() credentialsFileName <- "credentials.json" - clusterFileName <- "test_cluster.json" + clusterFileName <- "cluster.json" doAzureParallel::generateCredentialsConfig(credentialsFileName) doAzureParallel::generateClusterConfig(clusterFileName) @@ -13,16 +13,26 @@ test_that("Async cluster scenario test", { # set your credentials doAzureParallel::setCredentials(credentialsFileName) - clusterName <- + cluster <- doAzureParallel::makeCluster(clusterSetting = clusterFileName, wait = FALSE) - while (is.null(getCluster(clusterName))) { - Sys.sleep(30) - } - - cat("\ncluster is ready") - cluster <- getCluster(clusterName) + cluster <- getCluster(cluster$poolId) + getClusterList() + filter <- filter <- list() + filter$state <- c("active", "deleting") + getClusterList(filter) doAzureParallel::registerDoAzureParallel(cluster) + '%dopar%' <- foreach::'%dopar%' + res <- + foreach::foreach(i = 1:4) %dopar% { + mean(1:3) + } + + res + + testthat::expect_equal(length(res), 4) + testthat::expect_equal(res, list(2, 2, 2, 2)) + stopCluster(cluster) }) diff --git a/tests/testthat/test-live.R b/tests/testthat/test-live.R index d1f492fb..b4d2d4df 100644 --- a/tests/testthat/test-live.R +++ b/tests/testthat/test-live.R @@ -5,7 +5,7 @@ test_that("Basic scenario test", { testthat::skip("Live test") testthat::skip_on_travis() credentialsFileName <- "credentials.json" - clusterFileName <- "test_cluster.json" + clusterFileName <- "cluster.json" doAzureParallel::generateCredentialsConfig(credentialsFileName) doAzureParallel::generateClusterConfig(clusterFileName) diff --git a/tests/testthat/test-set-credentials.R b/tests/testthat/test-set-credentials.R index 45587c95..565d4363 100644 --- a/tests/testthat/test-set-credentials.R +++ b/tests/testthat/test-set-credentials.R @@ -49,7 +49,7 @@ test_that("set credentials/cluster config from Json file scenario test", { testthat::skip_on_travis() credentialsFileName <- "credentials.json" - clusterFileName <- "test_cluster.json" + clusterFileName <- "cluster.json" doAzureParallel::generateCredentialsConfig(credentialsFileName) doAzureParallel::generateClusterConfig(clusterFileName)