1717
1818package org .apache .spark .deploy .yarn
1919
20+ import java .util .concurrent .ConcurrentHashMap
2021import java .util .concurrent .atomic .AtomicInteger
2122import javax .annotation .concurrent .GuardedBy
2223
@@ -119,8 +120,8 @@ private[yarn] class YarnAllocator(
119120 @ GuardedBy (" this" )
120121 private val containerIdToExecutorIdAndResourceProfileId = new HashMap [ContainerId , (String , Int )]
121122
122- @ GuardedBy (" this " )
123- private [yarn] val rpIdToYarnResource = new mutable. HashMap [Int , Resource ]
123+ @ GuardedBy (" ConcurrentHashMap " )
124+ private [yarn] val rpIdToYarnResource = new ConcurrentHashMap [Int , Resource ]()
124125
125126 // note currently we don't remove ResourceProfiles
126127 @ GuardedBy (" this" )
@@ -210,7 +211,7 @@ private[yarn] class YarnAllocator(
210211 numExecutorsStartingPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID ) = new AtomicInteger (0 )
211212 targetNumExecutorsPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID ) =
212213 SchedulerBackendUtils .getInitialTargetExecutorNumber(sparkConf)
213- rpIdToYarnResource(DEFAULT_RESOURCE_PROFILE_ID ) = defaultResource
214+ rpIdToYarnResource.put (DEFAULT_RESOURCE_PROFILE_ID , defaultResource)
214215 rpIdToResourceProfile(DEFAULT_RESOURCE_PROFILE_ID ) =
215216 ResourceProfile .getOrCreateDefaultProfile(sparkConf)
216217 }
@@ -289,7 +290,8 @@ private[yarn] class YarnAllocator(
289290 private def getPendingAtLocation (
290291 location : String ): Map [Int , Seq [ContainerRequest ]] = synchronized {
291292 val allContainerRequests = new mutable.HashMap [Int , Seq [ContainerRequest ]]
292- rpIdToYarnResource.map { case (id, profResource) =>
293+ rpIdToResourceProfile.keys.map { id =>
294+ val profResource = rpIdToYarnResource.get(id)
293295 val result = amClient.getMatchingRequests(getContainerPriority(id), location, profResource)
294296 .asScala.flatMap(_.asScala)
295297 allContainerRequests(id) = result
@@ -336,7 +338,7 @@ private[yarn] class YarnAllocator(
336338 val resource = Resource .newInstance(totalMem, cores)
337339 ResourceRequestHelper .setResourceRequests(customResources.toMap, resource)
338340 logDebug(s " Created resource capability: $resource" )
339- rpIdToYarnResource(rp.id) = resource
341+ rpIdToYarnResource.putIfAbsent (rp.id, resource)
340342 rpIdToResourceProfile(rp.id) = rp
341343 }
342344 }
@@ -467,7 +469,7 @@ private[yarn] class YarnAllocator(
467469 hostToLocalTaskCount, pendingAllocate)
468470
469471 if (missing > 0 ) {
470- val resource = rpIdToYarnResource(rpId)
472+ val resource = rpIdToYarnResource.get (rpId)
471473 if (log.isInfoEnabled()) {
472474 var requestContainerMessage = s " Will request $missing executor container(s) for " +
473475 s " ResourceProfile Id: $rpId, each with " +
@@ -584,28 +586,34 @@ private[yarn] class YarnAllocator(
584586 *
585587 * Visible for testing.
586588 */
587- def handleAllocatedContainers (allocatedContainers : Seq [Container ]): Unit = {
589+ def handleAllocatedContainers (allocatedContainers : Seq [Container ]): Unit = synchronized {
588590 val containersToUse = new ArrayBuffer [Container ](allocatedContainers.size)
589591
590592 // Match incoming requests by host
591- val remainingAfterHostMatches = new ArrayBuffer [Container ]
592- for (allocatedContainer <- allocatedContainers) {
593- matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,
593+ val remainingAfterHostMatches = new ArrayBuffer [(Container , Resource )]
594+ val allocatedContainerAndResource = allocatedContainers.map { case container =>
595+ val rpId = getResourceProfileIdFromPriority(container.getPriority)
596+ val resourceForRP = rpIdToYarnResource.get(rpId)
597+ (container, resourceForRP)
598+ }
599+
600+ for ((allocatedContainer, resource) <- allocatedContainerAndResource) {
601+ matchContainerToRequest(allocatedContainer, resource, allocatedContainer.getNodeId.getHost,
594602 containersToUse, remainingAfterHostMatches)
595603 }
596604
597605 // Match remaining by rack. Because YARN's RackResolver swallows thread interrupts
598606 // (see SPARK-27094), which can cause this code to miss interrupts from the AM, use
599607 // a separate thread to perform the operation.
600- val remainingAfterRackMatches = new ArrayBuffer [Container ]
608+ val remainingAfterRackMatches = new ArrayBuffer [( Container , Resource ) ]
601609 if (remainingAfterHostMatches.nonEmpty) {
602610 var exception : Option [Throwable ] = None
603611 val thread = new Thread (" spark-rack-resolver" ) {
604612 override def run (): Unit = {
605613 try {
606- for (allocatedContainer <- remainingAfterHostMatches) {
614+ for (( allocatedContainer, resource) <- remainingAfterHostMatches) {
607615 val rack = resolver.resolve(allocatedContainer.getNodeId.getHost)
608- matchContainerToRequest(allocatedContainer, rack, containersToUse,
616+ matchContainerToRequest(allocatedContainer, resource, rack, containersToUse,
609617 remainingAfterRackMatches)
610618 }
611619 } catch {
@@ -631,16 +639,16 @@ private[yarn] class YarnAllocator(
631639 }
632640
633641 // Assign remaining that are neither node-local nor rack-local
634- val remainingAfterOffRackMatches = new ArrayBuffer [Container ]
635- for (allocatedContainer <- remainingAfterRackMatches) {
636- matchContainerToRequest(allocatedContainer, ANY_HOST , containersToUse,
642+ val remainingAfterOffRackMatches = new ArrayBuffer [( Container , Resource ) ]
643+ for (( allocatedContainer, resource) <- remainingAfterRackMatches) {
644+ matchContainerToRequest(allocatedContainer, resource, ANY_HOST , containersToUse,
637645 remainingAfterOffRackMatches)
638646 }
639647
640648 if (remainingAfterOffRackMatches.nonEmpty) {
641649 logDebug(s " Releasing ${remainingAfterOffRackMatches.size} unneeded containers that were " +
642650 s " allocated to us " )
643- for (container <- remainingAfterOffRackMatches) {
651+ for (( container, _) <- remainingAfterOffRackMatches) {
644652 internalReleaseContainer(container)
645653 }
646654 }
@@ -651,35 +659,37 @@ private[yarn] class YarnAllocator(
651659 .format(allocatedContainers.size, containersToUse.size))
652660 }
653661
662+ /* private def validateYarnContainerResources(
663+ allocatedContainer: Container,
664+ resource:Resource): Boolean = {
665+
666+ } */
667+
654668 /**
655669 * Looks for requests for the given location that match the given container allocation. If it
656670 * finds one, removes the request so that it won't be submitted again. Places the container into
657671 * containersToUse or remaining.
658672 *
659673 * @param allocatedContainer container that was given to us by YARN
674+ * @param resource yarn resource used to request the container
660675 * @param location resource name, either a node, rack, or *
661676 * @param containersToUse list of containers that will be used
662- * @param remaining list of containers that will not be used
677+ * @param remaining list of containers and their corresponding resource that will not be used
663678 */
664679 private def matchContainerToRequest (
665680 allocatedContainer : Container ,
681+ resource : Resource ,
666682 location : String ,
667683 containersToUse : ArrayBuffer [Container ],
668- remaining : ArrayBuffer [Container ]): Unit = synchronized {
684+ remaining : ArrayBuffer [( Container , Resource ) ]): Unit = {
669685 val rpId = getResourceProfileIdFromPriority(allocatedContainer.getPriority)
670- // SPARK-6050: certain Yarn configurations return a virtual core count that doesn't match the
671- // request; for example, capacity scheduler + DefaultResourceCalculator. So match on requested
672- // memory, but use the asked vcore count for matching, effectively disabling matching on vcore
673- // count.
674-
675- // this should be exactly what we requested
676- val resourceForRP = rpIdToYarnResource(rpId)
686+ val resourceForRP = rpIdToYarnResource.get(rpId)
677687
678688 logDebug(s " Calling amClient.getMatchingRequests with parameters: " +
679689 s " priority: ${allocatedContainer.getPriority}, " +
680- s " location: $location, resource: $resourceForRP " )
690+ s " location: $location, resource: $resource " )
681691 val matchingRequests = amClient.getMatchingRequests(allocatedContainer.getPriority, location,
682- resourceForRP )
692+ resource )
683693
684694 // Match the allocation to a request
685695 if (! matchingRequests.isEmpty) {
@@ -688,7 +698,7 @@ private[yarn] class YarnAllocator(
688698 amClient.removeContainerRequest(containerRequest)
689699 containersToUse += allocatedContainer
690700 } else {
691- remaining += allocatedContainer
701+ remaining += (( allocatedContainer, resource))
692702 }
693703 }
694704
@@ -702,7 +712,7 @@ private[yarn] class YarnAllocator(
702712 val executorHostname = container.getNodeId.getHost
703713 val containerId = container.getId
704714 val executorId = executorIdCounter.toString
705- val yarnResourceForRpId = rpIdToYarnResource(rpId)
715+ val yarnResourceForRpId = rpIdToYarnResource.get (rpId)
706716 assert(container.getResource.getMemory >= yarnResourceForRpId.getMemory)
707717 logInfo(s " Launching container $containerId on host $executorHostname " +
708718 s " for executor with ID $executorId for ResourceProfile Id $rpId" )
0 commit comments