Skip to content

Commit bcfa1f0

Browse files
author
John Lee
committed
unit test reworked
1 parent 3c2cf90 commit bcfa1f0

File tree

1 file changed

+85
-34
lines changed

1 file changed

+85
-34
lines changed

core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala

Lines changed: 85 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -314,8 +314,47 @@ class ExecutorAllocationManagerSuite
314314
assert(executorsPendingToRemove(manager).isEmpty)
315315
}
316316

317+
test ("Removing with various numExecutorsTarget condition") {
318+
sc = createSparkContext(5, 12, 5)
319+
val manager = sc.executorAllocationManager.get
320+
321+
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 8)))
322+
323+
// Remove when numExecutorsTarget is the same as the current number of executors
324+
assert(addExecutors(manager) === 1)
325+
assert(addExecutors(manager) === 2)
326+
(1 to 8).map { i => createTaskInfo(i, i, s"$i") }.foreach {
327+
info => sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, info)) }
328+
assert(executorIds(manager).size === 8)
329+
assert(numExecutorsTarget(manager) === 8)
330+
assert(maxNumExecutorsNeeded(manager) == 8)
331+
assert(!removeExecutor(manager, "1")) // won't work since numExecutorsTarget == numExecutors
332+
333+
// Remove executors when numExecutorsTarget is lower than current number of executors
334+
(1 to 3).map { i => createTaskInfo(i, i, s"$i") }.foreach {
335+
info => sc.listenerBus.postToAll(SparkListenerTaskEnd(0, 0, null, Success, info, null)) }
336+
adjustRequestedExecutors(manager)
337+
assert(executorIds(manager).size === 8)
338+
assert(numExecutorsTarget(manager) === 5)
339+
assert(maxNumExecutorsNeeded(manager) == 5)
340+
assert(removeExecutor(manager, "1"))
341+
assert(removeExecutors(manager, Seq("2", "3"))=== Seq("2", "3"))
342+
onExecutorRemoved(manager, "1")
343+
onExecutorRemoved(manager, "2")
344+
onExecutorRemoved(manager, "3")
345+
346+
// numExecutorsTarget is lower than minNumExecutors
347+
sc.listenerBus.postToAll(
348+
SparkListenerTaskEnd(0, 0, null, Success, createTaskInfo(4, 4, "4"), null))
349+
assert(executorIds(manager).size === 5)
350+
assert(numExecutorsTarget(manager) === 5)
351+
assert(maxNumExecutorsNeeded(manager) == 4)
352+
assert(!removeExecutor(manager, "4")) // lower limit
353+
assert(addExecutors(manager) === 0) // upper limit
354+
}
355+
317356
test ("interleaving add and remove") {
318-
sc = createSparkContext(5, 10, 5)
357+
sc = createSparkContext(5, 12, 5)
319358
val manager = sc.executorAllocationManager.get
320359
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
321360

@@ -331,52 +370,59 @@ class ExecutorAllocationManagerSuite
331370
onExecutorAdded(manager, "7")
332371
onExecutorAdded(manager, "8")
333372
assert(executorIds(manager).size === 8)
373+
assert(numExecutorsTarget(manager) === 8)
334374

335-
// Remove until limit
336-
assert(removeExecutor(manager, "1"))
337-
assert(removeExecutors(manager, Seq("2", "3")) === Seq("2", "3"))
338-
assert(!removeExecutor(manager, "4")) // lower limit reached
339-
assert(!removeExecutor(manager, "5"))
340-
onExecutorRemoved(manager, "1")
341-
onExecutorRemoved(manager, "2")
342-
onExecutorRemoved(manager, "3")
343-
assert(executorIds(manager).size === 5)
344375

345-
// Add until limit
346-
assert(addExecutors(manager) === 2) // upper limit reached
347-
assert(addExecutors(manager) === 0)
348-
assert(!removeExecutor(manager, "4")) // still at lower limit
349-
assert((manager, Seq("5")) !== Seq("5"))
376+
// Remove when numTargetExecutors is equal to the current number of executors
377+
assert(!removeExecutor(manager, "1"))
378+
assert(removeExecutors(manager, Seq("2", "3")) !== Seq("2", "3"))
379+
380+
// Remove until limit
350381
onExecutorAdded(manager, "9")
351382
onExecutorAdded(manager, "10")
352383
onExecutorAdded(manager, "11")
353384
onExecutorAdded(manager, "12")
354-
onExecutorAdded(manager, "13")
355-
assert(executorIds(manager).size === 10)
385+
assert(executorIds(manager).size === 12)
386+
assert(numExecutorsTarget(manager) === 8)
356387

357-
// Remove succeeds again, now that we are no longer at the lower limit
358-
assert(removeExecutors(manager, Seq("4", "5", "6")) === Seq("4", "5", "6"))
359-
assert(removeExecutor(manager, "7"))
360-
assert(executorIds(manager).size === 10)
361-
assert(addExecutors(manager) === 0)
388+
assert(removeExecutor(manager, "1"))
389+
assert(removeExecutors(manager, Seq("2", "3", "4")) === Seq("2", "3", "4"))
390+
assert(!removeExecutor(manager, "5")) // lower limit reached
391+
assert(!removeExecutor(manager, "6"))
392+
onExecutorRemoved(manager, "1")
393+
onExecutorRemoved(manager, "2")
394+
onExecutorRemoved(manager, "3")
362395
onExecutorRemoved(manager, "4")
363-
onExecutorRemoved(manager, "5")
364396
assert(executorIds(manager).size === 8)
365397

366-
// Number of executors pending restarts at 1
367-
assert(numExecutorsToAdd(manager) === 1)
368-
assert(addExecutors(manager) === 0)
369-
assert(executorIds(manager).size === 8)
370-
onExecutorRemoved(manager, "6")
371-
onExecutorRemoved(manager, "7")
398+
// Add until limit
399+
assert(!removeExecutor(manager, "7")) // still at lower limit
400+
assert((manager, Seq("8")) !== Seq("8"))
401+
onExecutorAdded(manager, "13")
372402
onExecutorAdded(manager, "14")
373403
onExecutorAdded(manager, "15")
374-
assert(executorIds(manager).size === 8)
375-
assert(addExecutors(manager) === 0) // still at upper limit
376404
onExecutorAdded(manager, "16")
405+
assert(executorIds(manager).size === 12)
406+
407+
// Remove succeeds again, now that we are no longer at the lower limit
408+
assert(removeExecutors(manager, Seq("5", "6", "7")) === Seq("5", "6", "7"))
409+
assert(removeExecutor(manager, "8"))
410+
assert(executorIds(manager).size === 12)
411+
onExecutorRemoved(manager, "5")
412+
onExecutorRemoved(manager, "6")
413+
assert(executorIds(manager).size === 10)
414+
assert(numExecutorsToAdd(manager) === 4)
415+
onExecutorRemoved(manager, "9")
416+
onExecutorRemoved(manager, "10")
417+
assert(addExecutors(manager) === 4) // at upper limit
377418
onExecutorAdded(manager, "17")
419+
onExecutorAdded(manager, "18")
378420
assert(executorIds(manager).size === 10)
379-
assert(numExecutorsTarget(manager) === 10)
421+
assert(addExecutors(manager) === 0) // still at upper limit
422+
onExecutorAdded(manager, "19")
423+
onExecutorAdded(manager, "20")
424+
assert(executorIds(manager).size === 12)
425+
assert(numExecutorsTarget(manager) === 12)
380426
}
381427

382428
test("starting/canceling add timer") {
@@ -915,12 +961,17 @@ class ExecutorAllocationManagerSuite
915961
onExecutorAdded(manager, "third")
916962
onExecutorAdded(manager, "fourth")
917963
onExecutorAdded(manager, "fifth")
918-
assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth"))
964+
onExecutorAdded(manager, "sixth")
965+
onExecutorAdded(manager, "seventh")
966+
onExecutorAdded(manager, "eighth")
967+
assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth",
968+
"sixth", "seventh", "eighth"))
919969

920970
removeExecutor(manager, "first")
921971
removeExecutors(manager, Seq("second", "third"))
922972
assert(executorsPendingToRemove(manager) === Set("first", "second", "third"))
923-
assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth"))
973+
assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth",
974+
"sixth", "seventh", "eighth"))
924975

925976

926977
// Cluster manager lost will make all the live executors lost, so here simulate this behavior

0 commit comments

Comments
 (0)