Skip to content

Commit 6072fcc

Browse files
lianhuiwangAndrew Or
authored andcommitted
[SPARK-5593][Core]Replace BlockManagerListener with ExecutorListener in ExecutorAllocationListener
More strictly, in ExecutorAllocationListener, we need to replace onBlockManagerAdded, onBlockManagerRemoved with onExecutorAdded,onExecutorRemoved. because at some time, onExecutorAdded and onExecutorRemoved are more accurate to express these meanings. example at SPARK-5529, BlockManager has been removed,but executor is existed. andrewor14 sryza Author: lianhuiwang <lianhuiwang09@gmail.com> Closes #4369 from lianhuiwang/SPARK-5593 and squashes the following commits: 333367c [lianhuiwang] Replace BlockManagerListener with ExecutorListener in ExecutorAllocationListener
1 parent 9792bec commit 6072fcc

File tree

2 files changed

+19
-22
lines changed

2 files changed

+19
-22
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -486,8 +486,8 @@ private[spark] class ExecutorAllocationManager(
486486
}
487487
}
488488

489-
override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = {
490-
val executorId = blockManagerAdded.blockManagerId.executorId
489+
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
490+
val executorId = executorAdded.executorId
491491
if (executorId != SparkContext.DRIVER_IDENTIFIER) {
492492
// This guards against the race condition in which the `SparkListenerTaskStart`
493493
// event is posted before the `SparkListenerBlockManagerAdded` event, which is
@@ -498,9 +498,8 @@ private[spark] class ExecutorAllocationManager(
498498
}
499499
}
500500

501-
override def onBlockManagerRemoved(
502-
blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = {
503-
allocationManager.onExecutorRemoved(blockManagerRemoved.blockManagerId.executorId)
501+
override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
502+
allocationManager.onExecutorRemoved(executorRemoved.executorId)
504503
}
505504

506505
/**

core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.collection.mutable
2222
import org.scalatest.{FunSuite, PrivateMethodTester}
2323
import org.apache.spark.executor.TaskMetrics
2424
import org.apache.spark.scheduler._
25-
import org.apache.spark.storage.BlockManagerId
25+
import org.apache.spark.scheduler.cluster.ExecutorInfo
2626

2727
/**
2828
* Test add and remove behavior of ExecutorAllocationManager.
@@ -144,8 +144,8 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
144144

145145
// Verify that running a task reduces the cap
146146
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 3)))
147-
sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
148-
0L, BlockManagerId("executor-1", "host1", 1), 100L))
147+
sc.listenerBus.postToAll(SparkListenerExecutorAdded(
148+
0L, "executor-1", new ExecutorInfo("host1", 1)))
149149
sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1")))
150150
assert(numExecutorsPending(manager) === 4)
151151
assert(addExecutors(manager) === 1)
@@ -578,30 +578,28 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
578578
assert(removeTimes(manager).isEmpty)
579579

580580
// New executors have registered
581-
sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
582-
0L, BlockManagerId("executor-1", "host1", 1), 100L))
581+
sc.listenerBus.postToAll(SparkListenerExecutorAdded(
582+
0L, "executor-1", new ExecutorInfo("host1", 1)))
583583
assert(executorIds(manager).size === 1)
584584
assert(executorIds(manager).contains("executor-1"))
585585
assert(removeTimes(manager).size === 1)
586586
assert(removeTimes(manager).contains("executor-1"))
587-
sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
588-
0L, BlockManagerId("executor-2", "host2", 1), 100L))
587+
sc.listenerBus.postToAll(SparkListenerExecutorAdded(
588+
0L, "executor-2", new ExecutorInfo("host2", 1)))
589589
assert(executorIds(manager).size === 2)
590590
assert(executorIds(manager).contains("executor-2"))
591591
assert(removeTimes(manager).size === 2)
592592
assert(removeTimes(manager).contains("executor-2"))
593593

594594
// Existing executors have disconnected
595-
sc.listenerBus.postToAll(SparkListenerBlockManagerRemoved(
596-
0L, BlockManagerId("executor-1", "host1", 1)))
595+
sc.listenerBus.postToAll(SparkListenerExecutorRemoved(0L, "executor-1", ""))
597596
assert(executorIds(manager).size === 1)
598597
assert(!executorIds(manager).contains("executor-1"))
599598
assert(removeTimes(manager).size === 1)
600599
assert(!removeTimes(manager).contains("executor-1"))
601600

602601
// Unknown executor has disconnected
603-
sc.listenerBus.postToAll(SparkListenerBlockManagerRemoved(
604-
0L, BlockManagerId("executor-3", "host3", 1)))
602+
sc.listenerBus.postToAll(SparkListenerExecutorRemoved(0L, "executor-3", ""))
605603
assert(executorIds(manager).size === 1)
606604
assert(removeTimes(manager).size === 1)
607605
}
@@ -613,8 +611,8 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
613611
assert(removeTimes(manager).isEmpty)
614612

615613
sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1")))
616-
sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
617-
0L, BlockManagerId("executor-1", "host1", 1), 100L))
614+
sc.listenerBus.postToAll(SparkListenerExecutorAdded(
615+
0L, "executor-1", new ExecutorInfo("host1", 1)))
618616
assert(executorIds(manager).size === 1)
619617
assert(executorIds(manager).contains("executor-1"))
620618
assert(removeTimes(manager).size === 0)
@@ -625,16 +623,16 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
625623
val manager = sc.executorAllocationManager.get
626624
assert(executorIds(manager).isEmpty)
627625
assert(removeTimes(manager).isEmpty)
628-
sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
629-
0L, BlockManagerId("executor-1", "host1", 1), 100L))
626+
sc.listenerBus.postToAll(SparkListenerExecutorAdded(
627+
0L, "executor-1", new ExecutorInfo("host1", 1)))
630628
sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1")))
631629

632630
assert(executorIds(manager).size === 1)
633631
assert(executorIds(manager).contains("executor-1"))
634632
assert(removeTimes(manager).size === 0)
635633

636-
sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
637-
0L, BlockManagerId("executor-2", "host1", 1), 100L))
634+
sc.listenerBus.postToAll(SparkListenerExecutorAdded(
635+
0L, "executor-2", new ExecutorInfo("host1", 1)))
638636
assert(executorIds(manager).size === 2)
639637
assert(executorIds(manager).contains("executor-2"))
640638
assert(removeTimes(manager).size === 1)

0 commit comments

Comments
 (0)