Skip to content
This repository was archived by the owner on Oct 12, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ Suggests:
caret,
plyr,
lintr
Remotes: Azure/rAzureBatch@v0.5.4
Remotes: Azure/rAzureBatch@v0.5.5
RoxygenNote: 6.0.1
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export(generateClusterConfig)
export(generateCredentialsConfig)
export(getCluster)
export(getClusterFile)
export(getClusterList)
export(getJob)
export(getJobFile)
export(getJobList)
Expand Down
160 changes: 114 additions & 46 deletions R/cluster.R
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,7 @@ makeCluster <-
poolConfig$name),
fill = TRUE)

while (areShallowEqual(rAzureBatch::getPool(poolConfig$name)$state,
"deleting")) {
while (rAzureBatch::getPool(poolConfig$name)$state == "deleting") {
cat(".")
Sys.sleep(10)
}
Expand All @@ -256,8 +255,7 @@ makeCluster <-
)
} else {
stop(sprintf(message,
poolConfig$name),
fill = TRUE)
poolConfig$name))
}
}

Expand Down Expand Up @@ -321,26 +319,16 @@ makeCluster <-
if (!grepl("PoolExists", response)) {
waitForNodesToComplete(poolConfig$name, 60000)
}

cat("Your cluster has been registered.", fill = TRUE)
cat(sprintf("Dedicated Node Count: %i", pool$targetDedicatedNodes),
fill = TRUE)
cat(sprintf("Low Priority Node Count: %i", pool$targetLowPriorityNodes),
fill = TRUE)

config$poolId <- poolConfig$name
options("az_config" = config)
return(getOption("az_config"))
} else {
print(
paste0(
"Because the 'wait' parameter is set to FALSE, the returned value is cluster name ",
"Use this returned value with getCluster(clusterName) to get the cluster when the ",
"cluster is created in Azure"
)
)
return (poolConfig$name)
}

cat("Your cluster has been registered.", fill = TRUE)
cat(sprintf("Dedicated Node Count: %i", pool$targetDedicatedNodes),
fill = TRUE)
cat(sprintf("Low Priority Node Count: %i", pool$targetLowPriorityNodes),
fill = TRUE)
config$poolId <- poolConfig$name
options("az_config" = config)
return(getOption("az_config"))
}

#' Gets the cluster from your Azure account.
Expand All @@ -352,20 +340,17 @@ makeCluster <-
#' cluster <- getCluster("myCluster")
#' }
#' @export
getCluster <- function(clusterName) {
getCluster <- function(clusterName, verbose = TRUE) {
pool <- rAzureBatch::getPool(clusterName)

if (!is.null(pool$code) && !is.null(pool$message)) {
stop(sprintf("Code: %s - Message: %s", pool$code, pool$message))
}

if (pool$targetDedicatedNodes + pool$targetLowPriorityNodes <= 0) {
stop("Pool count needs to be greater than 0.")
stop("Cluster node count needs to be greater than 0.")
}

totalNodes <-
pool$targetDedicatedNodes + pool$targetLowPriorityNodes

if (!is.null(pool$resizeErrors)) {
cat("\n")

Expand All @@ -387,31 +372,114 @@ getCluster <- function(clusterName) {

nodes <- rAzureBatch::listPoolNodes(clusterName)

currentNodeCount <- 0
if (!is.null(nodes$value) && length(nodes$value) > 0) {
nodesStatus <- .processNodeCount(nodes)

currentNodeCount <- nodesStatus$currentNodeCount
nodesWithFailures <- nodesStatus$nodesWithFailures
nodesInfo <- .processNodeCount(nodes)
nodesState <- nodesInfo$nodesState
nodesWithFailures <- nodesInfo$nodesWithFailures

if (verbose == TRUE) {
cat("\nnodes:", fill = TRUE)
cat(sprintf("\tidle: %s", nodesState$idle), fill = TRUE)
cat(sprintf("\tcreating: %s", nodesState$creating), fill = TRUE)
cat(sprintf("\tstarting: %s", nodesState$starting), fill = TRUE)
cat(sprintf("\twaitingforstarttask: %s", nodesState$waitingforstarttask), fill = TRUE)
cat(sprintf("\tstarttaskfailed: %s", nodesState$starttaskfailed), fill = TRUE)
cat(sprintf("\tpreempted: %s", nodesState$preempted), fill = TRUE)
cat(sprintf("\trunning: %s", nodesState$running), fill = TRUE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably worth adding and 'other' state for all other states, since this could cause confusion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added 'other' state

cat(sprintf("\tother: %s", nodesState$other), fill = TRUE)
}

.showNodesFailure(nodesWithFailures)
}

if (currentNodeCount >= totalNodes) {
config <- getOption("az_config")
cat("Your cluster has been registered.", fill = TRUE)
cat(sprintf("Dedicated Node Count: %i", pool$targetDedicatedNodes),
fill = TRUE)
cat(sprintf("Low Priority Node Count: %i", pool$targetLowPriorityNodes),
fill = TRUE)
cat("Your cluster has been registered.", fill = TRUE)

config$poolId <- clusterName
options("az_config" = config)
return(getOption("az_config"))
} else {
cat("Your cluster is not ready yet.", fill = TRUE)
return (NULL)
config <- getOption("az_config")
config$targetDedicatedNodes <- pool$targetDedicatedNodes
config$targetLowPriorityNodes <- pool$targetLowPriorityNodes
cat(sprintf("Dedicated Node Count: %i", pool$targetDedicatedNodes),
fill = TRUE)
cat(sprintf("Low Priority Node Count: %i", pool$targetLowPriorityNodes),
fill = TRUE)

config$poolId <- clusterName
options("az_config" = config)
return (config)
}

#' Get a list of clusters by state from the given filter
#'
#' @param filter A filter containing cluster state
#'
#' @examples
#' \dontrun{
#' getClusterList()
#' }
#' @export
getClusterList <- function(filter = NULL) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some tests for this? I didn't see any below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test in test-async-cluster.R

filterClause <- ""

if (!is.null(filter)) {
if (!is.null(filter$state)) {
for (i in 1:length(filter$state)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is 'filter' a list or a vector? If so, can't se just do

filterClause <- paste(filter, " or ")

instead of this long for loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that won't work, filter is a list of vectors, for example:
filter <- filter <- list()
filter$state <- c("active", "deleting")
getClusterList(filter)
paste(filter, " or ") will produce "c('active', 'deleting') or " which is not a valid clause

filterClause <-
paste0(filterClause,
sprintf("state eq '%s'", filter$state[i]),
" or ")
}

filterClause <-
substr(filterClause, 1, nchar(filterClause) - 3)
}
}

pools <-
rAzureBatch::listPools(
query = list(
"$filter" = filterClause,
"$select" = "id,state,allocationState,vmSize,currentDedicatedNodes" +
",targetDedicatedNodes,currentLowPriorityNodes,targetLowPriorityNodes"
)
)

count <- length(pools$value)
id <- character(count)
state <- character(count)
allocationState <- character(count)
vmSize <- integer(count)
currentDedicatedNodes <- integer(count)
targetDedicatedNodes <- integer(count)
currentLowPriorityNodes <- integer(count)
targetLowPriorityNodes <- integer(count)

if (count > 0) {
if (is.null(pools$value[[1]]$id)) {
stop(pools$value)
}
for (j in 1:length(pools$value)) {
id[j] <- pools$value[[j]]$id
state[j] <- pools$value[[j]]$state
allocationState[j] <- pools$value[[j]]$allocationState
vmSize[j] <- pools$value[[j]]$vmSize
currentDedicatedNodes[j] <- pools$value[[j]]$currentDedicatedNodes
targetDedicatedNodes[j] <- pools$value[[j]]$targetDedicatedNodes
currentLowPriorityNodes[j] <- pools$value[[j]]$currentLowPriorityNodes
targetLowPriorityNodes[j] <- pools$value[[j]]$targetLowPriorityNodes
}
}

return (
data.frame(
Id = id,
State = state,
AllocationState = allocationState,
VmSize = vmSize,
CurrentDedicatedNodes = currentDedicatedNodes,
targetDedicatedNodes = targetDedicatedNodes,
currentLowPriorityNodes = currentLowPriorityNodes,
targetLowPriorityNodes = targetLowPriorityNodes
)
)
}

#' Deletes the cluster from your Azure account.
Expand Down
1 change: 1 addition & 0 deletions R/helpers.R
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@
startTask = startTask,
virtualMachineConfiguration = virtualMachineConfiguration,
enableAutoScale = TRUE,
metadata = list(list(name = "origin", value = "doAzureParallel")),
autoscaleFormula = getAutoscaleFormula(
pool$poolSize$autoscaleFormula,
pool$poolSize$dedicatedNodes$min,
Expand Down
32 changes: 23 additions & 9 deletions R/utility.R
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ waitForNodesToComplete <- function(poolId, timeout = 86400) {
nodes <- rAzureBatch::listPoolNodes(poolId)

if (!is.null(nodes$value) && length(nodes$value) > 0) {
nodesStatus <- .processNodeCount(nodes)
nodesInfo <- .processNodeCount(nodes)

currentProgressBarCount <- nodesStatus$currentNodeCount
nodesWithFailures <- nodesStatus$nodesWithFailures
currentProgressBarCount <- nodesInfo$currentNodeCount
nodesWithFailures <- nodesInfo$nodesWithFailures

if (currentProgressBarCount >= pb$getVal()) {
setTxtProgressBar(pb, currentProgressBarCount)
Expand All @@ -83,7 +83,25 @@ waitForNodesToComplete <- function(poolId, timeout = 86400) {
.processNodeCount <- function(nodes) {
nodesWithFailures <- c()
currentNodeCount <- 0
nodesState <- list(
idle = as.integer(0),
creating = as.integer(0),
starting = as.integer(0),
waitingforstarttask = as.integer(0),
starttaskfailed = as.integer(0),
preempted = as.integer(0),
running = as.integer(0),
other = as.integer(0)
)

for (i in 1:length(nodes$value)) {
state <- nodes$value[[i]]$state
if (is.null(nodesState[[state]])) {
nodesState[["other"]] <- nodesState[["other"]] + 1
} else {
nodesState[[state]] <- nodesState[[state]] + as.integer(1)
}

# The progress total count is the number of the nodes. Each node counts as 1.
# If a node is not in idle, prempted, running, or start task failed, the value is
# less than 1. The default value is 0 because the node has not been allocated to
Expand All @@ -99,7 +117,7 @@ waitForNodesToComplete <- function(poolId, timeout = 86400) {
"starting" = {
0.50
},
"waitingforstartask" = {
"waitingforstarttask" = {
0.75
},
"starttaskfailed" = {
Expand All @@ -118,7 +136,7 @@ waitForNodesToComplete <- function(poolId, timeout = 86400) {
currentNodeCount <-
currentNodeCount + nodeValue
}
return(list(currentNodeCount = currentNodeCount, nodesWithFailures = nodesWithFailures))
return(list(currentNodeCount = currentNodeCount, nodesWithFailures = nodesWithFailures, nodesState = nodesState))
}

.showNodesFailure <- function(nodesWithFailures) {
Expand Down Expand Up @@ -250,7 +268,3 @@ readMetadataBlob <- function(jobId) {
return(NULL)
}
}

areShallowEqual <- function(a, b) {
!is.null(a) && !is.null(b) && a == b
}
2 changes: 1 addition & 1 deletion man/getCluster.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions man/getClusterList.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion samples/long_running_job/long_running_job.R
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ install_github("azure/doazureparallel")
library(doAzureParallel)

credentialsFileName <- "credentials.json"
clusterFileName <- "test_cluster.json"
clusterFileName <- "cluster.json"

# generate a credentials json file
generateCredentialsConfig(credentialsFileName)
Expand Down
26 changes: 18 additions & 8 deletions tests/testthat/test-async-cluster.R
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,34 @@ test_that("Async cluster scenario test", {
testthat::skip("Live test")
testthat::skip_on_travis()
credentialsFileName <- "credentials.json"
clusterFileName <- "test_cluster.json"
clusterFileName <- "cluster.json"

doAzureParallel::generateCredentialsConfig(credentialsFileName)
doAzureParallel::generateClusterConfig(clusterFileName)

# set your credentials
doAzureParallel::setCredentials(credentialsFileName)

clusterName <-
cluster <-
doAzureParallel::makeCluster(clusterSetting = clusterFileName, wait = FALSE)

while (is.null(getCluster(clusterName))) {
Sys.sleep(30)
}

cat("\ncluster is ready")
cluster <- getCluster(clusterName)
cluster <- getCluster(cluster$poolId)
getClusterList()
filter <- filter <- list()
filter$state <- c("active", "deleting")
getClusterList(filter)
doAzureParallel::registerDoAzureParallel(cluster)

'%dopar%' <- foreach::'%dopar%'
res <-
foreach::foreach(i = 1:4) %dopar% {
mean(1:3)
}

res

testthat::expect_equal(length(res), 4)
testthat::expect_equal(res, list(2, 2, 2, 2))

stopCluster(cluster)
})
2 changes: 1 addition & 1 deletion tests/testthat/test-live.R
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ test_that("Basic scenario test", {
testthat::skip("Live test")
testthat::skip_on_travis()
credentialsFileName <- "credentials.json"
clusterFileName <- "test_cluster.json"
clusterFileName <- "cluster.json"

doAzureParallel::generateCredentialsConfig(credentialsFileName)
doAzureParallel::generateClusterConfig(clusterFileName)
Expand Down
2 changes: 1 addition & 1 deletion tests/testthat/test-set-credentials.R
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ test_that("set credentials/cluster config from Json file scenario test", {
testthat::skip_on_travis()

credentialsFileName <- "credentials.json"
clusterFileName <- "test_cluster.json"
clusterFileName <- "cluster.json"

doAzureParallel::generateCredentialsConfig(credentialsFileName)
doAzureParallel::generateClusterConfig(clusterFileName)
Expand Down