Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ private[yarn] class YarnAllocator(
}

// Visible for testing.
val allocatedHostToContainersMap =
new HashMap[String, collection.mutable.Set[ContainerId]]
val allocatedHostToContainersMap = new HashMap[String, collection.mutable.Set[ContainerId]]
val allocatedContainerToHostMap = new HashMap[ContainerId, String]

// Containers that we no longer care about. We've either already told the RM to release them or
Expand All @@ -84,7 +83,7 @@ private[yarn] class YarnAllocator(
private var executorIdCounter = 0
@volatile private var numExecutorsFailed = 0

@volatile private var maxExecutors = args.numExecutors
@volatile private var targetNumExecutors = args.numExecutors

// Keep track of which container is running which executor to remove the executors later
private val executorIdToContainer = new HashMap[String, Container]
Expand Down Expand Up @@ -133,10 +132,12 @@ private[yarn] class YarnAllocator(
amClient.getMatchingRequests(RM_REQUEST_PRIORITY, location, resource).map(_.size).sum

/**
* Request as many executors from the ResourceManager as needed to reach the desired total.
* Request as many executors from the ResourceManager as needed to reach the desired total. If
* the requested total is smaller than the current number of running executors, no executors will
* be killed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this true based on your code down there? If there are 10 executors running and 0 pending, and I set targetNumExecutors to 0, won't it find 10 matching requests and kill them?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never mind, I guess the semantics of amClient.removeContainerRequest refers only to the request itself, but not allocated containers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes,amClient.removeContainerRequest is only to remove requests, not allocated containers.

*/
def requestTotalExecutors(requestedTotal: Int): Unit = synchronized {
maxExecutors = requestedTotal
targetNumExecutors = requestedTotal
}

/**
Expand All @@ -147,8 +148,8 @@ private[yarn] class YarnAllocator(
val container = executorIdToContainer.remove(executorId).get
internalReleaseContainer(container)
numExecutorsRunning -= 1
maxExecutors -= 1
assert(maxExecutors >= 0, "Allocator killed more executors than are allocated!")
targetNumExecutors -= 1
assert(targetNumExecutors >= 0, "Allocator killed more executors than are allocated!")
} else {
logWarning(s"Attempted to kill unknown executor $executorId!")
}
Expand All @@ -163,15 +164,8 @@ private[yarn] class YarnAllocator(
* This must be synchronized because variables read in this method are mutated by other methods.
*/
def allocateResources(): Unit = synchronized {
val numPendingAllocate = getNumPendingAllocate
val missing = maxExecutors - numPendingAllocate - numExecutorsRunning
updateResourceRequests()

if (missing > 0) {
logInfo(s"Will request $missing executor containers, each with ${resource.getVirtualCores} " +
s"cores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead")
}

addResourceRequests(missing)
val progressIndicator = 0.1f
// Poll the ResourceManager. This doubles as a heartbeat if there are no pending container
// requests.
Expand Down Expand Up @@ -201,15 +195,36 @@ private[yarn] class YarnAllocator(
}

/**
* Request numExecutors additional containers from YARN. Visible for testing.
* Update the set of container requests that we will sync with the RM based on the number of
* executors we have currently running and our target number of executors.
*
* Visible for testing.
*/
def addResourceRequests(numExecutors: Int): Unit = {
for (i <- 0 until numExecutors) {
val request = new ContainerRequest(resource, null, null, RM_REQUEST_PRIORITY)
amClient.addContainerRequest(request)
val nodes = request.getNodes
val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.last
logInfo("Container request (host: %s, capability: %s".format(hostStr, resource))
def updateResourceRequests(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about making it private[yarn]? will still be visible in tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YarnAllocator is already private[yarn], so this will end up private[yarn] already.

val numPendingAllocate = getNumPendingAllocate
val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning

if (missing > 0) {
logInfo(s"Will request $missing executor containers, each with ${resource.getVirtualCores} " +
s"cores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead")

for (i <- 0 until missing) {
val request = new ContainerRequest(resource, null, null, RM_REQUEST_PRIORITY)
amClient.addContainerRequest(request)
val nodes = request.getNodes
val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.last
logInfo(s"Container request (host: $hostStr, capability: $resource)")
}
} else if (missing < 0) {
val numToCancel = math.min(numPendingAllocate, -missing)
logInfo(s"Canceling requests for $numToCancel executor containers")

val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource)
if (!matchingRequests.isEmpty) {
matchingRequests.head.take(numToCancel).foreach(amClient.removeContainerRequest)
} else {
logWarning("Expected to find pending requests, but found none.")
}
}
}

Expand Down Expand Up @@ -266,7 +281,7 @@ private[yarn] class YarnAllocator(
* containersToUse or remaining.
*
* @param allocatedContainer container that was given to us by YARN
* @location resource name, either a node, rack, or *
* @param location resource name, either a node, rack, or *
* @param containersToUse list of containers that will be used
* @param remaining list of containers that will not be used
*/
Expand Down Expand Up @@ -294,7 +309,7 @@ private[yarn] class YarnAllocator(
private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {
for (container <- containersToUse) {
numExecutorsRunning += 1
assert(numExecutorsRunning <= maxExecutors)
assert(numExecutorsRunning <= targetNumExecutors)
val executorHostname = container.getNodeId.getHost
val containerId = container.getId
executorIdCounter += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ class YarnAllocatorSuite extends FunSuite with Matchers with BeforeAndAfterEach

test("single container allocated") {
// request a single container and receive it
val handler = createAllocator()
handler.addResourceRequests(1)
val handler = createAllocator(1)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getNumPendingAllocate should be (1)

Expand All @@ -123,8 +123,8 @@ class YarnAllocatorSuite extends FunSuite with Matchers with BeforeAndAfterEach

test("some containers allocated") {
// request a few containers and receive some of them
val handler = createAllocator()
handler.addResourceRequests(4)
val handler = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getNumPendingAllocate should be (4)

Expand All @@ -144,7 +144,7 @@ class YarnAllocatorSuite extends FunSuite with Matchers with BeforeAndAfterEach

test("receive more containers than requested") {
val handler = createAllocator(2)
handler.addResourceRequests(2)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getNumPendingAllocate should be (2)

Expand All @@ -162,6 +162,50 @@ class YarnAllocatorSuite extends FunSuite with Matchers with BeforeAndAfterEach
handler.allocatedHostToContainersMap.contains("host4") should be (false)
}

test("decrease total requested executors") {
val handler = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getNumPendingAllocate should be (4)

handler.requestTotalExecutors(3)
handler.updateResourceRequests()
handler.getNumPendingAllocate should be (3)

val container = createContainer("host1")
handler.handleAllocatedContainers(Array(container))

handler.getNumExecutorsRunning should be (1)
handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId)

handler.requestTotalExecutors(2)
handler.updateResourceRequests()
handler.getNumPendingAllocate should be (1)
}

test("decrease total requested executors to less than currently running") {
val handler = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getNumPendingAllocate should be (4)

handler.requestTotalExecutors(3)
handler.updateResourceRequests()
handler.getNumPendingAllocate should be (3)

val container1 = createContainer("host1")
val container2 = createContainer("host2")
handler.handleAllocatedContainers(Array(container1, container2))

handler.getNumExecutorsRunning should be (2)

handler.requestTotalExecutors(1)
handler.updateResourceRequests()
handler.getNumPendingAllocate should be (0)
handler.getNumExecutorsRunning should be (2)
}

test("memory exceeded diagnostic regexes") {
val diagnostics =
"Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +
Expand Down