Skip to content

Commit 3584a09

Browse files
committed
add task attempt running check in hasAttemptOnHost
1 parent 6ade5cb commit 3584a09

File tree

2 files changed

+54
-1
lines changed

2 files changed

+54
-1
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ private[spark] class TaskSetManager(
289289

290290
/** Check whether a task is currently running an attempt on a given host */
291291
private def hasAttemptOnHost(taskIndex: Int, host: String): Boolean = {
292-
taskAttempts(taskIndex).exists(_.host == host)
292+
taskAttempts(taskIndex).exists { info => info.running && info.host == host }
293293
}
294294

295295
private def isTaskBlacklistedOnExecOrNode(index: Int, execId: String, host: String): Boolean = {

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -880,6 +880,59 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
880880
assert(manager.resourceOffer("execB", "host2", ANY).get.index === 3)
881881
}
882882

883+
test("speculative task should not run on a given host where another attempt " +
884+
"is already running on") {
885+
sc = new SparkContext("local", "test")
886+
sched = new FakeTaskScheduler(
887+
sc, ("execA", "host1"), ("execB", "host2"))
888+
val taskSet = FakeTask.createTaskSet(1,
889+
Seq(TaskLocation("host1", "execA"), TaskLocation("host2", "execB")))
890+
val clock = new ManualClock
891+
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
892+
893+
// let task0.0 run on host1
894+
assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index == 0)
895+
val info1 = manager.taskAttempts(0)(0)
896+
assert(info1.running === true)
897+
assert(info1.host === "host1")
898+
899+
// long time elapse, and task0.0 is still running,
900+
// so we launch a speculative task0.1 on host2
901+
clock.advance(1000)
902+
manager.speculatableTasks += 0
903+
assert(manager.resourceOffer("execB", "host2", PROCESS_LOCAL).get.index === 0)
904+
val info2 = manager.taskAttempts(0)(0)
905+
assert(info2.running === true)
906+
assert(info2.host === "host2")
907+
assert(manager.speculatableTasks.size === 0)
908+
909+
// now, task0 has two copies running on host1, host2 separately,
910+
// so we can not launch a speculative task on any hosts.
911+
manager.speculatableTasks += 0
912+
assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL) === None)
913+
assert(manager.resourceOffer("execB", "host2", PROCESS_LOCAL) === None)
914+
assert(manager.speculatableTasks.size === 1)
915+
916+
// after a long long time, task0.0 failed, and task0.0 can not re-run since
917+
// there's already a running copy.
918+
clock.advance(1000)
919+
info1.finishTime = clock.getTimeMillis()
920+
assert(info1.running === false)
921+
922+
// time goes on, and task0.1 is still running
923+
clock.advance(1000)
924+
// so we try to launch a new speculative task
925+
// we can not run it on host2, because task0.1 is already running on
926+
assert(manager.resourceOffer("execB", "host2", PROCESS_LOCAL) === None)
927+
// we successfully launch a speculative task0.2 on host1, since there's
928+
// no more running copy of task0
929+
assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 0)
930+
val info3 = manager.taskAttempts(0)(0)
931+
assert(info3.running === true)
932+
assert(info3.host === "host1")
933+
assert(manager.speculatableTasks.size === 0)
934+
}
935+
883936
test("node-local tasks should be scheduled right away " +
884937
"when there are only node-local and no-preference tasks") {
885938
sc = new SparkContext("local", "test")

0 commit comments

Comments
 (0)