From a795cc8dcaca9974d88665e96b817aa3e3f7d9c3 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Tue, 29 Oct 2019 13:31:50 -0700 Subject: [PATCH 1/2] [SPARK-25299] add more scheduling logs --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 ++++ core/src/main/scala/org/apache/spark/scheduler/Stage.scala | 1 + 2 files changed, 5 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 9f242585e9ddb..1033ffbf72d12 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1526,6 +1526,7 @@ private[spark] class DAGScheduler( s"(attempt ${failedStage.latestInfo.attemptNumber}) running") } else { failedStage.failedAttemptIds.add(task.stageAttemptId) + logInfo(s"Adding failed attemptId ${task.stageAttemptId} to stage ${failedStage}") val shouldAbortStage = failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts || disallowStageRetryForTest @@ -1547,8 +1548,11 @@ private[spark] class DAGScheduler( // Mark all the map as broken in the map stage, to ensure retry all the tasks on // resubmitted stage attempt. mapOutputTracker.unregisterAllMapOutput(shuffleId) + logInfo(s"Map stage ${mapStage} is a barrier, so unregister all map outputs") } else if (mapId != -1) { // Mark the map whose fetch failed as broken in the map stage + logInfo(s"Unregistering MapOutput " + + s"(shuffleId=$shuffleId, mapId=$mapId, bmAddr=$bmAddress") mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index 26cca334d3bd5..85d070e24ea72 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -101,6 +101,7 @@ private[scheduler] abstract class Stage( metrics.register(rdd.sparkContext) _latestInfo = StageInfo.fromStage( this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences) + logInfo(s"Making new attemptId $nextAttemptId for stage $id") nextAttemptId += 1 } From 4a9e1cddbf4c262a53da930b19f970fa5ae48317 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Tue, 29 Oct 2019 13:53:01 -0700 Subject: [PATCH 2/2] logs --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 1033ffbf72d12..7db42ab3966d6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1138,6 +1138,7 @@ private[spark] class DAGScheduler( } } catch { case NonFatal(e) => + logDebug(s"Creating new stage attempt due to exception", e) stage.makeNewStageAttempt(partitionsToCompute.size) listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e)) @@ -1145,6 +1146,7 @@ private[spark] class DAGScheduler( return } + logDebug(s"Creating new stage attempt") stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq) // If there are tasks to execute, record the submission time of the stage. Otherwise,