Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

emit shuffle pod watch exception #601

Open
wants to merge 1 commit into
base: branch-2.2-kubernetes
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -106,18 +106,14 @@ private[spark] class KubernetesExternalShuffleManagerImpl(
}

private def addShufflePodToCache(pod: Pod): Unit = shufflePodCache.synchronized {
if (shufflePodCache.contains(pod.getSpec.getNodeName)) {
val registeredPodName = shufflePodCache.get(pod.getSpec.getNodeName).get
logError(s"Ambiguous specification of shuffle service pod. " +
s"Found multiple matching pods: ${pod.getMetadata.getName}, " +
s"${registeredPodName} on ${pod.getSpec.getNodeName}")

throw new SparkException(s"Ambiguous specification of shuffle service pod. " +
s"Found multiple matching pods: ${pod.getMetadata.getName}, " +
s"${registeredPodName} on ${pod.getSpec.getNodeName}")
} else {
shufflePodCache(pod.getSpec.getNodeName) = pod.getStatus.getPodIP
if (shufflePodCache.exists(kv => kv._1 == pod.getSpec.getNodeName
&& kv._2 != pod.getStatus.getPodIP)) {
val registeredPodIP = shufflePodCache(pod.getSpec.getNodeName)
logWarning(s"Ambiguous specification of shuffle service pod. " +
s"Found multiple matching pods: ${pod.getMetadata.getName}(${pod.getStatus.getPodIP}), " +
s"$registeredPodIP on ${pod.getSpec.getNodeName}, will update")
}
shufflePodCache(pod.getSpec.getNodeName) = pod.getStatus.getPodIP
}

override def stop(): Unit = {