@@ -120,6 +120,8 @@ private[yarn] class YarnAllocator(
120120 @ GuardedBy (" this" )
121121 private val containerIdToExecutorIdAndResourceProfileId = new HashMap [ContainerId , (String , Int )]
122122
123+ // use a ConcurrentHashMap because this is used in matchContainerToRequest, which is called
124+ // from the rack resolver thread where synchronize(this) on this would cause a deadlock
123125 @ GuardedBy (" ConcurrentHashMap" )
124126 private [yarn] val rpIdToYarnResource = new ConcurrentHashMap [Int , Resource ]()
125127
@@ -586,34 +588,28 @@ private[yarn] class YarnAllocator(
586588 *
587589 * Visible for testing.
588590 */
589- def handleAllocatedContainers (allocatedContainers : Seq [Container ]): Unit = synchronized {
591+ def handleAllocatedContainers (allocatedContainers : Seq [Container ]): Unit = {
590592 val containersToUse = new ArrayBuffer [Container ](allocatedContainers.size)
591593
592594 // Match incoming requests by host
593- val remainingAfterHostMatches = new ArrayBuffer [(Container , Resource )]
594- val allocatedContainerAndResource = allocatedContainers.map { case container =>
595- val rpId = getResourceProfileIdFromPriority(container.getPriority)
596- val resourceForRP = rpIdToYarnResource.get(rpId)
597- (container, resourceForRP)
598- }
599-
600- for ((allocatedContainer, resource) <- allocatedContainerAndResource) {
601- matchContainerToRequest(allocatedContainer, resource, allocatedContainer.getNodeId.getHost,
595+ val remainingAfterHostMatches = new ArrayBuffer [Container ]
596+ for (allocatedContainer <- allocatedContainers) {
597+ matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,
602598 containersToUse, remainingAfterHostMatches)
603599 }
604600
605601 // Match remaining by rack. Because YARN's RackResolver swallows thread interrupts
606602 // (see SPARK-27094), which can cause this code to miss interrupts from the AM, use
607603 // a separate thread to perform the operation.
608- val remainingAfterRackMatches = new ArrayBuffer [( Container , Resource ) ]
604+ val remainingAfterRackMatches = new ArrayBuffer [Container ]
609605 if (remainingAfterHostMatches.nonEmpty) {
610606 var exception : Option [Throwable ] = None
611607 val thread = new Thread (" spark-rack-resolver" ) {
612608 override def run (): Unit = {
613609 try {
614- for (( allocatedContainer, resource) <- remainingAfterHostMatches) {
610+ for (allocatedContainer <- remainingAfterHostMatches) {
615611 val rack = resolver.resolve(allocatedContainer.getNodeId.getHost)
616- matchContainerToRequest(allocatedContainer, resource, rack, containersToUse,
612+ matchContainerToRequest(allocatedContainer, rack, containersToUse,
617613 remainingAfterRackMatches)
618614 }
619615 } catch {
@@ -639,16 +635,16 @@ private[yarn] class YarnAllocator(
639635 }
640636
641637 // Assign remaining that are neither node-local nor rack-local
642- val remainingAfterOffRackMatches = new ArrayBuffer [( Container , Resource ) ]
643- for (( allocatedContainer, resource) <- remainingAfterRackMatches) {
644- matchContainerToRequest(allocatedContainer, resource, ANY_HOST , containersToUse,
638+ val remainingAfterOffRackMatches = new ArrayBuffer [Container ]
639+ for (allocatedContainer <- remainingAfterRackMatches) {
640+ matchContainerToRequest(allocatedContainer, ANY_HOST , containersToUse,
645641 remainingAfterOffRackMatches)
646642 }
647643
648644 if (remainingAfterOffRackMatches.nonEmpty) {
649645 logDebug(s " Releasing ${remainingAfterOffRackMatches.size} unneeded containers that were " +
650646 s " allocated to us " )
651- for (( container, _) <- remainingAfterOffRackMatches) {
647+ for (container <- remainingAfterOffRackMatches) {
652648 internalReleaseContainer(container)
653649 }
654650 }
@@ -659,37 +655,32 @@ private[yarn] class YarnAllocator(
659655 .format(allocatedContainers.size, containersToUse.size))
660656 }
661657
662- /* private def validateYarnContainerResources(
663- allocatedContainer: Container,
664- resource:Resource): Boolean = {
665-
666- } */
667-
668658 /**
669659 * Looks for requests for the given location that match the given container allocation. If it
670660 * finds one, removes the request so that it won't be submitted again. Places the container into
671661 * containersToUse or remaining.
672662 *
673663 * @param allocatedContainer container that was given to us by YARN
674- * @param resource yarn resource used to request the container
675664 * @param location resource name, either a node, rack, or *
676665 * @param containersToUse list of containers that will be used
677- * @param remaining list of containers and their corresponding resource that will not be used
666+ * @param remaining list of containers that will not be used
678667 */
679668 private def matchContainerToRequest (
680669 allocatedContainer : Container ,
681- resource : Resource ,
682670 location : String ,
683671 containersToUse : ArrayBuffer [Container ],
684- remaining : ArrayBuffer [(Container , Resource )]): Unit = {
672+ remaining : ArrayBuffer [Container ]): Unit = {
673+ // Match on the exact resource we requested so there shouldn't be a mismatch,
674+ // we are relying on YARN to return a container with resources no less then we requested.
675+ // If we change this, or starting validating the container, be sure the logic covers SPARK-6050.
685676 val rpId = getResourceProfileIdFromPriority(allocatedContainer.getPriority)
686677 val resourceForRP = rpIdToYarnResource.get(rpId)
687678
688679 logDebug(s " Calling amClient.getMatchingRequests with parameters: " +
689680 s " priority: ${allocatedContainer.getPriority}, " +
690- s " location: $location, resource: $resource " )
681+ s " location: $location, resource: $resourceForRP " )
691682 val matchingRequests = amClient.getMatchingRequests(allocatedContainer.getPriority, location,
692- resource )
683+ resourceForRP )
693684
694685 // Match the allocation to a request
695686 if (! matchingRequests.isEmpty) {
@@ -698,7 +689,7 @@ private[yarn] class YarnAllocator(
698689 amClient.removeContainerRequest(containerRequest)
699690 containersToUse += allocatedContainer
700691 } else {
701- remaining += (( allocatedContainer, resource))
692+ remaining += allocatedContainer
702693 }
703694 }
704695
0 commit comments