diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index b8b63382fe4c..a596e93c6a68 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -731,12 +731,13 @@ object LogKeys { case object TARGET_PATH extends LogKey case object TASK_ATTEMPT_ID extends LogKey case object TASK_ID extends LogKey + case object TASK_INDEX extends LogKey + case object TASK_INFO_ID extends LogKey case object TASK_LOCALITY extends LogKey case object TASK_NAME extends LogKey case object TASK_REQUIREMENTS extends LogKey case object TASK_RESOURCES extends LogKey case object TASK_RESOURCE_ASSIGNMENTS extends LogKey - case object TASK_SET_ID extends LogKey case object TASK_SET_MANAGER extends LogKey case object TASK_SET_NAME extends LogKey case object TASK_STATE extends LogKey @@ -751,7 +752,6 @@ object LogKeys { case object THREAD_POOL_SIZE extends LogKey case object THREAD_POOL_WAIT_QUEUE_SIZE extends LogKey case object THRESHOLD extends LogKey - case object TID extends LogKey case object TIME extends LogKey case object TIMEOUT extends LogKey case object TIMER extends LogKey diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 7c096dd110e5..f50e8bd25fec 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -2019,7 +2019,7 @@ private[spark] class DAGScheduler( val ignoreStageFailure = ignoreDecommissionFetchFailure && isExecutorDecommissioningOrDecommissioned(taskScheduler, bmAddress) if (ignoreStageFailure) { - logInfo(log"Ignoring fetch failure from ${MDC(TASK_ID, task)} of " + + logInfo(log"Ignoring fetch failure from ${MDC(TASK_NAME, task)} of " + log"${MDC(STAGE, failedStage)} attempt " + log"${MDC(STAGE_ATTEMPT, task.stageAttemptId)} when count " + log"${MDC(MAX_ATTEMPTS, config.STAGE_MAX_CONSECUTIVE_ATTEMPTS.key)} " + @@ -2243,7 +2243,7 @@ private[spark] class DAGScheduler( // Always fail the current stage and retry all the tasks when a barrier task fail. val failedStage = stageIdToStage(task.stageId) if (failedStage.latestInfo.attemptNumber() != task.stageAttemptId) { - logInfo(log"Ignoring task failure from ${MDC(TASK_ID, task)} as it's from " + + logInfo(log"Ignoring task failure from ${MDC(TASK_NAME, task)} as it's from " + log"${MDC(FAILED_STAGE, failedStage)} attempt ${MDC(STAGE_ATTEMPT, task.stageAttemptId)} " + log"and there is a more recent attempt for that stage (attempt " + log"${MDC(NUM_ATTEMPT, failedStage.latestInfo.attemptNumber())}) running") @@ -2629,7 +2629,7 @@ private[spark] class DAGScheduler( } private def handleResubmittedFailure(task: Task[_], stage: Stage): Unit = { - logInfo(log"Resubmitted ${MDC(TASK_ID, task)}, so marking it as still running.") + logInfo(log"Resubmitted ${MDC(TASK_NAME, task)}, so marking it as still running.") stage match { case sms: ShuffleMapStage => sms.pendingPartitions += task.partitionId diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index ad0e0ddb687e..ec678256a708 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -250,8 +250,8 @@ private[spark] class TaskSchedulerImpl( override def submitTasks(taskSet: TaskSet): Unit = { val tasks = taskSet.tasks - logInfo(log"Adding task set ${MDC(LogKeys.TASK_SET_ID, taskSet.id)} with " + - log"${MDC(LogKeys.NUM_TASKS, tasks.length)} tasks resource profile " + + logInfo(log"Adding task set " + taskSet.logId + + log" with ${MDC(LogKeys.NUM_TASKS, tasks.length)} tasks resource profile " + log"${MDC(LogKeys.RESOURCE_PROFILE_ID, taskSet.resourceProfileId)}") this.synchronized { val manager = createTaskSetManager(taskSet, maxTaskFailures) @@ -364,8 +364,9 @@ private[spark] class TaskSchedulerImpl( } noRejectsSinceLastReset -= manager.taskSet manager.parent.removeSchedulable(manager) - logInfo(log"Removed TaskSet ${MDC(LogKeys.TASK_SET_NAME, manager.taskSet.id)}, whose tasks " + - log"have all completed, from pool ${MDC(LogKeys.POOL_NAME, manager.parent.name)}") + logInfo(log"Removed TaskSet " + manager.taskSet.logId + + log" whose tasks have all completed, from pool ${MDC(LogKeys.POOL_NAME, manager.parent.name)}" + ) } /** @@ -817,7 +818,7 @@ private[spark] class TaskSchedulerImpl( } case None => logError(log"Ignoring update with state ${MDC(LogKeys.TASK_STATE, state)} for " + - log"TID ${MDC(LogKeys.TID, tid)} because its task set is gone (this is " + + log"TID ${MDC(LogKeys.TASK_ID, tid)} because its task set is gone (this is " + log"likely the result of receiving duplicate task finished status updates) or its " + log"executor has been marked as failed.") } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala index e03c4101709c..2474a1342eb2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala @@ -19,6 +19,9 @@ package org.apache.spark.scheduler import java.util.Properties +import org.apache.spark.internal.LogKeys.{STAGE_ATTEMPT, STAGE_ID} +import org.apache.spark.internal.MessageWithContext + /** * A set of tasks submitted together to the low-level TaskScheduler, usually representing * missing partitions of a particular stage. @@ -34,4 +37,12 @@ private[spark] class TaskSet( val id: String = s"$stageId.$stageAttemptId" override def toString: String = "TaskSet " + id + + // Identifier used in the structured logging framework. + lazy val logId: MessageWithContext = { + val hashMap = new java.util.HashMap[String, String]() + hashMap.put(STAGE_ID.name, stageId.toString) + hashMap.put(STAGE_ATTEMPT.name, stageAttemptId.toString) + MessageWithContext(id, hashMap) + } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 6573ab2f23d6..7dba4a6dc8fc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -989,9 +989,11 @@ private[spark] class TaskSetManager( if (ef.className == classOf[TaskOutputFileAlreadyExistException].getName) { // If we can not write to output file in the task, there's no point in trying to // re-execute it. - logError(log"Task ${MDC(TASK_ID, info.id)} in stage ${MDC(STAGE_ID, taskSet.id)} " + - log"(TID ${MDC(TID, tid)}) can not write to output file: " + - log"${MDC(ERROR, ef.description)}; not retrying") + logError( + log"Task ${MDC(TASK_INDEX, info.index)}.${MDC(TASK_ATTEMPT_ID, info.attemptNumber)} " + + log"in stage ${MDC(STAGE_ID, taskSet.stageId)}." + + log"${MDC(STAGE_ATTEMPT, taskSet.stageAttemptId)} (TID ${MDC(TASK_ID, tid)}) " + + log"can not write to output file: ${MDC(ERROR, ef.description)}; not retrying") emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), reason, null, accumUpdates, metricPeaks) abort("Task %s in stage %s (TID %d) can not write to output file: %s".format( @@ -1057,8 +1059,8 @@ private[spark] class TaskSetManager( info.host, info.executorId, index, failureReasonString)) numFailures(index) += 1 if (numFailures(index) >= maxTaskFailures) { - logError(log"Task ${MDC(TASK_ID, index)} in stage ${MDC(STAGE_ID, taskSet.id)} failed " + - log"${MDC(MAX_ATTEMPTS, maxTaskFailures)} times; aborting job") + logError(log"Task ${MDC(TASK_INDEX, index)} in stage " + taskSet.logId + + log" failed ${MDC(MAX_ATTEMPTS, maxTaskFailures)} times; aborting job") abort("Task %d in stage %s failed %d times, most recent failure: %s\nDriver stacktrace:" .format(index, taskSet.id, maxTaskFailures, failureReasonString), failureException) return @@ -1252,8 +1254,8 @@ private[spark] class TaskSetManager( if (speculated) { addPendingTask(index, speculatable = true) logInfo( - log"Marking task ${MDC(INDEX, index)} in stage ${MDC(STAGE_ID, taskSet.id)} (on " + - log"${MDC(HOST, info.host)}) as speculatable because it ran more than " + + log"Marking task ${MDC(TASK_INDEX, index)} in stage " + taskSet.logId + + log" (on ${MDC(HOST, info.host)}) as speculatable because it ran more than " + log"${MDC(TIMEOUT, threshold)} ms(${MDC(NUM_TASKS, speculatableTasks.size + 1)}" + log"speculatable tasks in this taskset now)") speculatableTasks += index