From a0dcde583c76cb96f5112f4ff863874415ec9140 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Fri, 24 Aug 2018 10:27:01 -0500 Subject: [PATCH] [SPARK-25231] : Executor Heartbeat Receiver does not need to synchronize on the TaskSchedulerImpl object The main reason for the heartbeat timeouts was that the heartbeat-receiver-event-loop-thread was blocked waiting on the TaskSchedulerImpl object which was being held by one of the dispatcher-event-loop threads executing the method dequeueSpeculativeTasks() in TaskSetManager.scala. On further analysis of the heartbeat receiver method, it turns out there is no need to hold the lock on the whole object. The block of code in the method only uses one global HashMap taskIdToTaskSetManager. Making that map a ConcurrentHashMap, we are ensuring atomicity of operations and speeding up the heartbeat receiver thread operation. --- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 12 ++++++------ .../cluster/CoarseGrainedSchedulerBackend.scala | 2 +- .../spark/scheduler/SchedulerIntegrationSuite.scala | 3 ++- .../spark/scheduler/TaskSchedulerImplSuite.scala | 6 +++--- 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 8992d7e2284a4..8b71170668639 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -19,7 +19,7 @@ package org.apache.spark.scheduler import java.nio.ByteBuffer import java.util.{Locale, Timer, TimerTask} -import java.util.concurrent.TimeUnit +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic.AtomicLong import scala.collection.Set @@ -91,7 +91,7 @@ private[spark] class TaskSchedulerImpl( private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]] // Protected by `this` - private[scheduler] val taskIdToTaskSetManager = new HashMap[Long, TaskSetManager] + private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, TaskSetManager] val taskIdToExecutorId = new HashMap[Long, String] @volatile private var hasReceivedTask = false @@ -315,7 +315,7 @@ private[spark] class TaskSchedulerImpl( for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { tasks(i) += task val tid = task.taskId - taskIdToTaskSetManager(tid) = taskSet + taskIdToTaskSetManager.put(tid, taskSet) taskIdToExecutorId(tid) = execId executorIdToRunningTaskIds(execId).add(tid) availableCpus(i) -= CPUS_PER_TASK @@ -465,7 +465,7 @@ private[spark] class TaskSchedulerImpl( var reason: Option[ExecutorLossReason] = None synchronized { try { - taskIdToTaskSetManager.get(tid) match { + Option(taskIdToTaskSetManager.get(tid)) match { case Some(taskSet) => if (state == TaskState.LOST) { // TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode, @@ -517,10 +517,10 @@ private[spark] class TaskSchedulerImpl( accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean = { // (taskId, stageId, stageAttemptId, accumUpdates) - val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = synchronized { + val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = { accumUpdates.flatMap { case (id, updates) => val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None)) - taskIdToTaskSetManager.get(id).map { taskSetMgr => + Option(taskIdToTaskSetManager.get(id)).map { taskSetMgr => (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, accInfos) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 747e8c7dc0fa5..de7c0d813ae65 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -290,7 +290,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp for (task <- tasks.flatten) { val serializedTask = TaskDescription.encode(task) if (serializedTask.limit() >= maxRpcMessageSize) { - scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => + Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr => try { var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + "spark.rpc.message.maxSize (%d bytes). Consider increasing " + diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index cea7f173c8f2f..2d409d94ca1b3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -400,7 +400,8 @@ private[spark] abstract class MockBackend( // get the task now, since that requires a lock on TaskSchedulerImpl, to prevent individual // tests from introducing a race if they need it. val newTasks = newTaskDescriptions.map { taskDescription => - val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet + val taskSet = + Option(taskScheduler.taskIdToTaskSetManager.get(taskDescription.taskId).taskSet).get val task = taskSet.tasks(taskDescription.index) (taskDescription, task) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 7a457a0a72d90..9e1d13e369ad9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -248,7 +248,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskScheduler.submitTasks(attempt2) val taskDescriptions3 = taskScheduler.resourceOffers(workerOffers).flatten assert(1 === taskDescriptions3.length) - val mgr = taskScheduler.taskIdToTaskSetManager.get(taskDescriptions3(0).taskId).get + val mgr = Option(taskScheduler.taskIdToTaskSetManager.get(taskDescriptions3(0).taskId)).get assert(mgr.taskSet.stageAttemptId === 1) assert(!failedTaskSet) } @@ -286,7 +286,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(10 === taskDescriptions3.length) taskDescriptions3.foreach { task => - val mgr = taskScheduler.taskIdToTaskSetManager.get(task.taskId).get + val mgr = Option(taskScheduler.taskIdToTaskSetManager.get(task.taskId)).get assert(mgr.taskSet.stageAttemptId === 1) } assert(!failedTaskSet) @@ -724,7 +724,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // only schedule one task because of locality assert(taskDescs.size === 1) - val mgr = taskScheduler.taskIdToTaskSetManager.get(taskDescs(0).taskId).get + val mgr = Option(taskScheduler.taskIdToTaskSetManager.get(taskDescs(0).taskId)).get assert(mgr.myLocalityLevels.toSet === Set(TaskLocality.NODE_LOCAL, TaskLocality.ANY)) // we should know about both executors, even though we only scheduled tasks on one of them assert(taskScheduler.getExecutorsAliveOnHost("host0") === Some(Set("executor0")))