Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
c8f1ef9
Change blacklist to use exclude
tgravescs Sep 15, 2020
803c8cc
more changes
tgravescs Sep 17, 2020
a9710dc
Continue renaming blacklisting, start ui changes
tgravescs Sep 28, 2020
26aa116
more renames
tgravescs Sep 28, 2020
73b997f
update tests and revert some of the listener events so history files …
tgravescs Sep 28, 2020
d85f2b5
add release to deprecated and fix compilation
tgravescs Sep 28, 2020
f8b042a
Fix test reference blacklist
tgravescs Sep 28, 2020
b0d1c72
Fix one more referenc3:
tgravescs Sep 28, 2020
57a4ac9
minor fixes
tgravescs Sep 28, 2020
83a2b94
Fix test text
tgravescs Sep 29, 2020
eea7bdf
test fixes
tgravescs Sep 29, 2020
9f6b813
revert
tgravescs Sep 29, 2020
1155c1d
revert some changes for backwards compatibility
tgravescs Sep 29, 2020
12d99aa
Fix test
tgravescs Sep 29, 2020
b3f932f
test fixes
tgravescs Sep 29, 2020
f319b1b
a few comments missed
tgravescs Sep 29, 2020
6657785
rename a few hive blacklist variables
tgravescs Sep 29, 2020
9e5ace6
Fix scalastyle and update review comments. Revert Hve thriftserver
tgravescs Sep 30, 2020
1a3237a
renames blocked to excluded
tgravescs Sep 30, 2020
76714e7
Update blocklist to excluded
tgravescs Sep 30, 2020
08946ab
Merge remote-tracking branch 'upstream/master' into SPARK-32037
tgravescs Sep 30, 2020
6da870b
Update new test file
tgravescs Sep 30, 2020
4636159
Start addressing review comments
tgravescs Oct 7, 2020
0172aa4
Rework, remove the on*Blacklisted* functions and handle history events
tgravescs Oct 7, 2020
fcc3558
Merge remote-tracking branch 'upstream/master' into SPARK-32037
tgravescs Oct 7, 2020
5ed326d
Fix scala style
tgravescs Oct 7, 2020
32fca2d
Add mima excludes for the on*Blacklist removals
tgravescs Oct 8, 2020
dbc0ff1
Fix location and syntax in mima excludes
tgravescs Oct 8, 2020
3ebf881
Fix syntax
tgravescs Oct 8, 2020
34bf3c4
Revert "Fix syntax"
tgravescs Oct 19, 2020
b884ec3
Revert "Fix location and syntax in mima excludes"
tgravescs Oct 19, 2020
f9a8951
Revert "Add mima excludes for the on*Blacklist removals"
tgravescs Oct 19, 2020
05ac1ce
Revert "Rework, remove the on*Blacklisted* functions and handle histo…
tgravescs Oct 19, 2020
89d4fd2
deprecated and mispelled rework
tgravescs Oct 19, 2020
a27ac94
Post both types of messages for now until we total remove blacklist
tgravescs Oct 19, 2020
b344ef6
update since version on configs
tgravescs Oct 19, 2020
8241e61
Add developerapi to SparkFirehostListener
tgravescs Oct 22, 2020
3446fb0
Fix bug in blacklist metrics incrementing and minor fixes and documen…
tgravescs Oct 22, 2020
f20a75d
Fix typo
tgravescs Oct 22, 2020
32ab73d
Merge remote-tracking branch 'upstream/master' into SPARK-32037
tgravescs Oct 22, 2020
b38dd66
fix missing deprecated configs and minor issues
tgravescs Oct 28, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions core/src/main/java/org/apache/spark/SparkFirehoseListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark;

import org.apache.spark.annotation.DeveloperApi;
import org.apache.spark.scheduler.*;

/**
Expand All @@ -27,7 +28,11 @@
* new methods to SparkListener: forgetting to add a method will result in a compilation error (if
* this was a concrete Scala class, default implementations of new event handlers would be inherited
* from the SparkListener trait).
*
* Please note until Spark 3.1.0 this was missing the DevelopApi annotation, this needs to be
* taken into account if changing this API before a major release.
*/
@DeveloperApi
public class SparkFirehoseListener implements SparkListenerInterface {

public void onEvent(SparkListenerEvent event) { }
Expand Down Expand Up @@ -124,34 +129,67 @@ public final void onExecutorBlacklisted(SparkListenerExecutorBlacklisted executo
onEvent(executorBlacklisted);
}

@Override
public final void onExecutorExcluded(SparkListenerExecutorExcluded executorExcluded) {
onEvent(executorExcluded);
}

@Override
public void onExecutorBlacklistedForStage(
SparkListenerExecutorBlacklistedForStage executorBlacklistedForStage) {
onEvent(executorBlacklistedForStage);
}

@Override
public void onExecutorExcludedForStage(
SparkListenerExecutorExcludedForStage executorExcludedForStage) {
onEvent(executorExcludedForStage);
}

@Override
public void onNodeBlacklistedForStage(
SparkListenerNodeBlacklistedForStage nodeBlacklistedForStage) {
onEvent(nodeBlacklistedForStage);
}

@Override
public void onNodeExcludedForStage(
SparkListenerNodeExcludedForStage nodeExcludedForStage) {
onEvent(nodeExcludedForStage);
}

@Override
public final void onExecutorUnblacklisted(
SparkListenerExecutorUnblacklisted executorUnblacklisted) {
onEvent(executorUnblacklisted);
}

@Override
public final void onExecutorUnexcluded(
SparkListenerExecutorUnexcluded executorUnexcluded) {
onEvent(executorUnexcluded);
}

@Override
public final void onNodeBlacklisted(SparkListenerNodeBlacklisted nodeBlacklisted) {
onEvent(nodeBlacklisted);
}

@Override
public final void onNodeExcluded(SparkListenerNodeExcluded nodeExcluded) {
onEvent(nodeExcluded);
}

@Override
public final void onNodeUnblacklisted(SparkListenerNodeUnblacklisted nodeUnblacklisted) {
onEvent(nodeUnblacklisted);
}

@Override
public final void onNodeUnexcluded(SparkListenerNodeUnexcluded nodeUnexcluded) {
onEvent(nodeUnexcluded);
}

@Override
public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) {
onEvent(blockUpdated);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ <h4 class="title-table">Summary</h4>
</th>
<th>
<span data-toggle="tooltip" data-placement="top"
title="Number of executors blacklisted by the scheduler due to task failures.">
Blacklisted</span>
title="Number of executors excluded by the scheduler due to task failures.">
Excluded</span>
</th>
</tr>
</thead>
Expand Down
28 changes: 14 additions & 14 deletions core/src/main/resources/org/apache/spark/ui/static/executorspage.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ function getThreadDumpEnabled() {
}

function formatStatus(status, type, row) {
if (row.isBlacklisted) {
return "Blacklisted";
if (row.isExcluded) {
return "Excluded";
}

if (status) {
if (row.blacklistedInStages.length == 0) {
if (row.excludedInStages.length == 0) {
return "Active"
}
return "Active (Blacklisted in Stages: [" + row.blacklistedInStages.join(", ") + "])";
return "Active (Excluded in Stages: [" + row.excludedInStages.join(", ") + "])";
}
return "Dead"
}
Expand Down Expand Up @@ -168,7 +168,7 @@ $(document).ready(function () {
var allTotalInputBytes = 0;
var allTotalShuffleRead = 0;
var allTotalShuffleWrite = 0;
var allTotalBlacklisted = 0;
var allTotalExcluded = 0;

var activeExecCnt = 0;
var activeRDDBlocks = 0;
Expand All @@ -190,7 +190,7 @@ $(document).ready(function () {
var activeTotalInputBytes = 0;
var activeTotalShuffleRead = 0;
var activeTotalShuffleWrite = 0;
var activeTotalBlacklisted = 0;
var activeTotalExcluded = 0;

var deadExecCnt = 0;
var deadRDDBlocks = 0;
Expand All @@ -212,7 +212,7 @@ $(document).ready(function () {
var deadTotalInputBytes = 0;
var deadTotalShuffleRead = 0;
var deadTotalShuffleWrite = 0;
var deadTotalBlacklisted = 0;
var deadTotalExcluded = 0;

response.forEach(function (exec) {
var memoryMetrics = {
Expand Down Expand Up @@ -246,7 +246,7 @@ $(document).ready(function () {
allTotalInputBytes += exec.totalInputBytes;
allTotalShuffleRead += exec.totalShuffleRead;
allTotalShuffleWrite += exec.totalShuffleWrite;
allTotalBlacklisted += exec.isBlacklisted ? 1 : 0;
allTotalExcluded += exec.isExcluded ? 1 : 0;
if (exec.isActive) {
activeExecCnt += 1;
activeRDDBlocks += exec.rddBlocks;
Expand All @@ -268,7 +268,7 @@ $(document).ready(function () {
activeTotalInputBytes += exec.totalInputBytes;
activeTotalShuffleRead += exec.totalShuffleRead;
activeTotalShuffleWrite += exec.totalShuffleWrite;
activeTotalBlacklisted += exec.isBlacklisted ? 1 : 0;
activeTotalExcluded += exec.isExcluded ? 1 : 0;
} else {
deadExecCnt += 1;
deadRDDBlocks += exec.rddBlocks;
Expand All @@ -290,7 +290,7 @@ $(document).ready(function () {
deadTotalInputBytes += exec.totalInputBytes;
deadTotalShuffleRead += exec.totalShuffleRead;
deadTotalShuffleWrite += exec.totalShuffleWrite;
deadTotalBlacklisted += exec.isBlacklisted ? 1 : 0;
deadTotalExcluded += exec.isExcluded ? 1 : 0; // todo - TEST BACKWARDS compatibility history?
}
});

Expand All @@ -315,7 +315,7 @@ $(document).ready(function () {
"allTotalInputBytes": allTotalInputBytes,
"allTotalShuffleRead": allTotalShuffleRead,
"allTotalShuffleWrite": allTotalShuffleWrite,
"allTotalBlacklisted": allTotalBlacklisted
"allTotalExcluded": allTotalExcluded
};
var activeSummary = {
"execCnt": ( "Active(" + activeExecCnt + ")"),
Expand All @@ -338,7 +338,7 @@ $(document).ready(function () {
"allTotalInputBytes": activeTotalInputBytes,
"allTotalShuffleRead": activeTotalShuffleRead,
"allTotalShuffleWrite": activeTotalShuffleWrite,
"allTotalBlacklisted": activeTotalBlacklisted
"allTotalExcluded": activeTotalExcluded
};
var deadSummary = {
"execCnt": ( "Dead(" + deadExecCnt + ")" ),
Expand All @@ -361,7 +361,7 @@ $(document).ready(function () {
"allTotalInputBytes": deadTotalInputBytes,
"allTotalShuffleRead": deadTotalShuffleRead,
"allTotalShuffleWrite": deadTotalShuffleWrite,
"allTotalBlacklisted": deadTotalBlacklisted
"allTotalExcluded": deadTotalExcluded
};

var data = {executors: response, "execSummary": [activeSummary, deadSummary, totalSummary]};
Expand Down Expand Up @@ -547,7 +547,7 @@ $(document).ready(function () {
{data: 'allTotalInputBytes', render: formatBytes},
{data: 'allTotalShuffleRead', render: formatBytes},
{data: 'allTotalShuffleWrite', render: formatBytes},
{data: 'allTotalBlacklisted'}
{data: 'allTotalExcluded'}
],
"paging": false,
"searching": false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ $(document).ready(function () {
{data : "failedTasks"},
{data : "killedTasks"},
{data : "succeededTasks"},
{data : "isBlacklistedForStage"},
{data : "isExcludedForStage"},
{
data : function (row, type) {
return row.inputRecords != 0 ? formatBytes(row.inputBytes, type) + " / " + row.inputRecords : "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ <h4 class="title-table">Aggregated Metrics by Executor</h4>
<th>Succeeded Tasks</th>
<th>
<span data-toggle="tooltip" data-placement="top"
title="Shows if this executor has been blacklisted by the scheduler due to task failures.">
Blacklisted</span>
title="Shows if this executor has been excluded by the scheduler due to task failures.">
Excluded</span>
</th>
<th><span id="executor-summary-input">Input Size / Records</span></th>
<th><span id="executor-summary-output">Output Size / Records</span></th>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ private[spark] class ExecutorAllocationManager(

if (unschedulableTaskSets > 0) {
// Request additional executors to account for task sets having tasks that are unschedulable
// due to blacklisting when the active executor count has already reached the max needed
// which we would normally get.
// due to executors excluded for failures when the active executor count has already reached
// the max needed which we would normally get.
val maxNeededForUnschedulables = math.ceil(unschedulableTaskSets * executorAllocationRatio /
tasksPerExecutor).toInt
math.max(maxNeededWithSpeculationLocalityOffset,
Expand Down Expand Up @@ -659,10 +659,10 @@ private[spark] class ExecutorAllocationManager(
private val resourceProfileIdToStageAttempt =
new mutable.HashMap[Int, mutable.Set[StageAttempt]]

// Keep track of unschedulable task sets due to blacklisting. This is a Set of StageAttempt's
// because we'll only take the last unschedulable task in a taskset although there can be more.
// This is done in order to avoid costly loops in the scheduling.
// Check TaskSetManager#getCompletelyBlacklistedTaskIfAny for more details.
// Keep track of unschedulable task sets because of executor/node exclusions from too many task
// failures. This is a Set of StageAttempt's because we'll only take the last unschedulable task
// in a taskset although there can be more. This is done in order to avoid costly loops in the
// scheduling. Check TaskSetManager#getCompletelyExcludedTaskIfAny for more details.
private val unschedulableTaskSets = new mutable.HashSet[StageAttempt]

// stageAttempt to tuple (the number of task with locality preferences, a map where each pair
Expand Down
28 changes: 26 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ private[spark] object SparkConf extends Logging {
"are no longer accepted. To specify the equivalent now, one may use '64k'."),
DeprecatedConfig("spark.rpc", "2.0", "Not used anymore."),
DeprecatedConfig("spark.scheduler.executorTaskBlacklistTime", "2.1.0",
"Please use the new blacklisting options, spark.blacklist.*"),
"Please use the new excludedOnFailure options, spark.excludeOnFailure.*"),
DeprecatedConfig("spark.yarn.am.port", "2.0.0", "Not used anymore"),
DeprecatedConfig("spark.executor.port", "2.0.0", "Not used anymore"),
DeprecatedConfig("spark.shuffle.service.index.cache.entries", "2.3.0",
Expand All @@ -612,7 +612,31 @@ private[spark] object SparkConf extends Logging {
DeprecatedConfig("spark.yarn.credentials.file.retention.days", "2.4.0", "Not used anymore."),
DeprecatedConfig("spark.yarn.services", "3.0.0", "Feature no longer available."),
DeprecatedConfig("spark.executor.plugins", "3.0.0",
"Feature replaced with new plugin API. See Monitoring documentation.")
"Feature replaced with new plugin API. See Monitoring documentation."),
DeprecatedConfig("spark.blacklist.enabled", "3.1.0",
"Please use spark.excludeOnFailure.enabled"),
DeprecatedConfig("spark.blacklist.task.maxTaskAttemptsPerExecutor", "3.1.0",
"Please use spark.excludeOnFailure.task.maxTaskAttemptsPerExecutor"),
DeprecatedConfig("spark.blacklist.task.maxTaskAttemptsPerNode", "3.1.0",
"Please use spark.excludeOnFailure.task.maxTaskAttemptsPerNode"),
DeprecatedConfig("spark.blacklist.application.maxFailedTasksPerExecutor", "3.1.0",
"Please use spark.excludeOnFailure.application.maxFailedTasksPerExecutor"),
DeprecatedConfig("spark.blacklist.stage.maxFailedTasksPerExecutor", "3.1.0",
"Please use spark.excludeOnFailure.stage.maxFailedTasksPerExecutor"),
DeprecatedConfig("spark.blacklist.application.maxFailedExecutorsPerNode", "3.1.0",
"Please use spark.excludeOnFailure.application.maxFailedExecutorsPerNode"),
DeprecatedConfig("spark.blacklist.stage.maxFailedExecutorsPerNode", "3.1.0",
"Please use spark.excludeOnFailure.stage.maxFailedExecutorsPerNode"),
DeprecatedConfig("spark.blacklist.timeout", "3.1.0",
"Please use spark.excludeOnFailure.timeout"),
DeprecatedConfig("spark.blacklist.application.fetchFailure.enabled", "3.1.0",
"Please use spark.excludeOnFailure.application.fetchFailure.enabled"),
DeprecatedConfig("spark.scheduler.blacklist.unschedulableTaskSetTimeout", "3.1.0",
"Please use spark.scheduler.excludeOnFailure.unschedulableTaskSetTimeout"),
DeprecatedConfig("spark.blacklist.killBlacklistedExecutors", "3.1.0",
"Please use spark.excludeOnFailure.killExcludedExecutors"),
DeprecatedConfig("spark.yarn.blacklist.executor.launch.blacklisting.enabled", "3.1.0",
"Please use spark.yarn.executor.launch.excludeOnFailure.enabled")
)

Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
Expand Down
9 changes: 5 additions & 4 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,11 @@ case class FetchFailed(
/**
* Fetch failures lead to a different failure handling path: (1) we don't abort the stage after
* 4 task failures, instead we immediately go back to the stage which generated the map output,
* and regenerate the missing data. (2) we don't count fetch failures for blacklisting, since
* presumably its not the fault of the executor where the task ran, but the executor which
* stored the data. This is especially important because we might rack up a bunch of
* fetch-failures in rapid succession, on all nodes of the cluster, due to one bad node.
* and regenerate the missing data. (2) we don't count fetch failures from executors excluded
* due to too many task failures, since presumably its not the fault of the executor where
* the task ran, but the executor which stored the data. This is especially important because
* we might rack up a bunch of fetch-failures in rapid succession, on all nodes of the cluster,
* due to one bad node.
*/
override def countTowardsTaskFailures: Boolean = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ private[spark] class BasicEventFilter(
case e: SparkListenerExecutorRemoved => liveExecutors.contains(e.executorId)
case e: SparkListenerExecutorBlacklisted => liveExecutors.contains(e.executorId)
case e: SparkListenerExecutorUnblacklisted => liveExecutors.contains(e.executorId)
case e: SparkListenerExecutorExcluded => liveExecutors.contains(e.executorId)
case e: SparkListenerExecutorUnexcluded => liveExecutors.contains(e.executorId)
case e: SparkListenerStageExecutorMetrics => liveExecutors.contains(e.execId)
case e: SparkListenerBlockManagerAdded => acceptBlockManagerEvent(e.blockManagerId)
case e: SparkListenerBlockManagerRemoved => acceptBlockManagerEvent(e.blockManagerId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private[spark] class HistoryAppStatusStore(
source.totalShuffleWrite, source.isBlacklisted, source.maxMemory, source.addTime,
source.removeTime, source.removeReason, newExecutorLogs, source.memoryMetrics,
source.blacklistedInStages, source.peakMemoryMetrics, source.attributes, source.resources,
source.resourceProfileId)
source.resourceProfileId, source.isExcluded, source.excludedInStages)
}

}
Loading