From 97986788519b1ade8e5ab509719e39546d7ab03c Mon Sep 17 00:00:00 2001 From: Bo Xiong Date: Tue, 19 Sep 2023 21:07:49 -0700 Subject: [PATCH] [SPARK-45227][CORE] Fix an issue with CoarseGrainedExecutorBackend where an executor process randomly gets stuck --- .../spark/executor/CoarseGrainedExecutorBackend.scala | 11 +++++++---- .../executor/CoarseGrainedExecutorBackendSuite.scala | 6 +++--- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 4903421f9063..8d89baf54a23 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -20,9 +20,9 @@ package org.apache.spark.executor import java.net.URL import java.nio.ByteBuffer import java.util.Locale +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicBoolean -import scala.collection.mutable import scala.util.{Failure, Success} import scala.util.control.NonFatal @@ -71,9 +71,12 @@ private[spark] class CoarseGrainedExecutorBackend( /** * Map each taskId to the information about the resource allocated to it, Please refer to * [[ResourceInformation]] for specifics. + * CHM is used to ensure thread-safety (https://issues.apache.org/jira/browse/SPARK-45227) * Exposed for testing only. */ - private[executor] val taskResources = new mutable.HashMap[Long, Map[String, ResourceInformation]] + private[executor] val taskResources = new ConcurrentHashMap[ + Long, Map[String, ResourceInformation] + ] private var decommissioned = false @@ -184,7 +187,7 @@ private[spark] class CoarseGrainedExecutorBackend( } else { val taskDesc = TaskDescription.decode(data.value) logInfo("Got assigned task " + taskDesc.taskId) - taskResources(taskDesc.taskId) = taskDesc.resources + taskResources.put(taskDesc.taskId, taskDesc.resources) executor.launchTask(this, taskDesc) } @@ -261,7 +264,7 @@ private[spark] class CoarseGrainedExecutorBackend( } override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer): Unit = { - val resources = taskResources.getOrElse(taskId, Map.empty[String, ResourceInformation]) + val resources = taskResources.getOrDefault(taskId, Map.empty[String, ResourceInformation]) val msg = StatusUpdate(executorId, taskId, state, data, resources) if (TaskState.isFinished(state)) { taskResources.remove(taskId) diff --git a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala index a12b7034a6df..c6cd7560c8c7 100644 --- a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala @@ -300,7 +300,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite resourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf)) assert(backend.taskResources.isEmpty) - val taskId = 1000000 + val taskId = 1000000L // We don't really verify the data, just pass it around. val data = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4)) val taskDescription = new TaskDescription(taskId, 2, "1", "TASK 1000000", 19, @@ -314,14 +314,14 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite backend.self.send(LaunchTask(new SerializableBuffer(serializedTaskDescription))) eventually(timeout(10.seconds)) { assert(backend.taskResources.size == 1) - val resources = backend.taskResources(taskId) + val resources = backend.taskResources.get(taskId) assert(resources(GPU).addresses sameElements Array("0", "1")) } // Update the status of a running task shall not affect `taskResources` map. backend.statusUpdate(taskId, TaskState.RUNNING, data) assert(backend.taskResources.size == 1) - val resources = backend.taskResources(taskId) + val resources = backend.taskResources.get(taskId) assert(resources(GPU).addresses sameElements Array("0", "1")) // Update the status of a finished task shall remove the entry from `taskResources` map.