Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -731,12 +731,13 @@ object LogKeys {
case object TARGET_PATH extends LogKey
case object TASK_ATTEMPT_ID extends LogKey
case object TASK_ID extends LogKey
case object TASK_INDEX extends LogKey
case object TASK_INFO_ID extends LogKey
case object TASK_LOCALITY extends LogKey
case object TASK_NAME extends LogKey
case object TASK_REQUIREMENTS extends LogKey
case object TASK_RESOURCES extends LogKey
case object TASK_RESOURCE_ASSIGNMENTS extends LogKey
case object TASK_SET_ID extends LogKey
case object TASK_SET_MANAGER extends LogKey
case object TASK_SET_NAME extends LogKey
case object TASK_STATE extends LogKey
Expand All @@ -751,7 +752,6 @@ object LogKeys {
case object THREAD_POOL_SIZE extends LogKey
case object THREAD_POOL_WAIT_QUEUE_SIZE extends LogKey
case object THRESHOLD extends LogKey
case object TID extends LogKey
case object TIME extends LogKey
case object TIMEOUT extends LogKey
case object TIMER extends LogKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2019,7 +2019,7 @@ private[spark] class DAGScheduler(
val ignoreStageFailure = ignoreDecommissionFetchFailure &&
isExecutorDecommissioningOrDecommissioned(taskScheduler, bmAddress)
if (ignoreStageFailure) {
logInfo(log"Ignoring fetch failure from ${MDC(TASK_ID, task)} of " +
logInfo(log"Ignoring fetch failure from ${MDC(TASK_NAME, task)} of " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want to call this TASK_NAME?
From the code below, it seems inconsistent:

log"${MDC(TASK_NAME, taskName(attemptInfo.taskId))} on " +

def taskName(tid: Long): String = {
val info = taskInfos.get(tid)
assert(info.isDefined, s"Can not find TaskInfo for task (TID $tid)")
s"task ${info.get.id} in stage ${taskSet.id} (TID $tid)"
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, I can't find a better name here..

log"${MDC(STAGE, failedStage)} attempt " +
log"${MDC(STAGE_ATTEMPT, task.stageAttemptId)} when count " +
log"${MDC(MAX_ATTEMPTS, config.STAGE_MAX_CONSECUTIVE_ATTEMPTS.key)} " +
Expand Down Expand Up @@ -2243,7 +2243,7 @@ private[spark] class DAGScheduler(
// Always fail the current stage and retry all the tasks when a barrier task fail.
val failedStage = stageIdToStage(task.stageId)
if (failedStage.latestInfo.attemptNumber() != task.stageAttemptId) {
logInfo(log"Ignoring task failure from ${MDC(TASK_ID, task)} as it's from " +
logInfo(log"Ignoring task failure from ${MDC(TASK_NAME, task)} as it's from " +
log"${MDC(FAILED_STAGE, failedStage)} attempt ${MDC(STAGE_ATTEMPT, task.stageAttemptId)} " +
log"and there is a more recent attempt for that stage (attempt " +
log"${MDC(NUM_ATTEMPT, failedStage.latestInfo.attemptNumber())}) running")
Expand Down Expand Up @@ -2629,7 +2629,7 @@ private[spark] class DAGScheduler(
}

private def handleResubmittedFailure(task: Task[_], stage: Stage): Unit = {
logInfo(log"Resubmitted ${MDC(TASK_ID, task)}, so marking it as still running.")
logInfo(log"Resubmitted ${MDC(TASK_NAME, task)}, so marking it as still running.")
stage match {
case sms: ShuffleMapStage =>
sms.pendingPartitions += task.partitionId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,8 @@ private[spark] class TaskSchedulerImpl(

override def submitTasks(taskSet: TaskSet): Unit = {
val tasks = taskSet.tasks
logInfo(log"Adding task set ${MDC(LogKeys.TASK_SET_ID, taskSet.id)} with " +
log"${MDC(LogKeys.NUM_TASKS, tasks.length)} tasks resource profile " +
logInfo(log"Adding task set " + taskSet.logId +
log" with ${MDC(LogKeys.NUM_TASKS, tasks.length)} tasks resource profile " +
log"${MDC(LogKeys.RESOURCE_PROFILE_ID, taskSet.resourceProfileId)}")
this.synchronized {
val manager = createTaskSetManager(taskSet, maxTaskFailures)
Expand Down Expand Up @@ -364,8 +364,9 @@ private[spark] class TaskSchedulerImpl(
}
noRejectsSinceLastReset -= manager.taskSet
manager.parent.removeSchedulable(manager)
logInfo(log"Removed TaskSet ${MDC(LogKeys.TASK_SET_NAME, manager.taskSet.id)}, whose tasks " +
log"have all completed, from pool ${MDC(LogKeys.POOL_NAME, manager.parent.name)}")
logInfo(log"Removed TaskSet " + manager.taskSet.logId +
log" whose tasks have all completed, from pool ${MDC(LogKeys.POOL_NAME, manager.parent.name)}"
)
}

/**
Expand Down Expand Up @@ -817,7 +818,7 @@ private[spark] class TaskSchedulerImpl(
}
case None =>
logError(log"Ignoring update with state ${MDC(LogKeys.TASK_STATE, state)} for " +
log"TID ${MDC(LogKeys.TID, tid)} because its task set is gone (this is " +
log"TID ${MDC(LogKeys.TASK_ID, tid)} because its task set is gone (this is " +
log"likely the result of receiving duplicate task finished status updates) or its " +
log"executor has been marked as failed.")
}
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package org.apache.spark.scheduler

import java.util.Properties

import org.apache.spark.internal.LogKeys.{STAGE_ATTEMPT, STAGE_ID}
import org.apache.spark.internal.MessageWithContext

/**
* A set of tasks submitted together to the low-level TaskScheduler, usually representing
* missing partitions of a particular stage.
Expand All @@ -34,4 +37,12 @@ private[spark] class TaskSet(
val id: String = s"$stageId.$stageAttemptId"

override def toString: String = "TaskSet " + id

// Identifier used in the structured logging framework.
lazy val logId: MessageWithContext = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we intentionally not treat it as a log parameter?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean by "log parameter"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, it's a MessageWithContext

val hashMap = new java.util.HashMap[String, String]()
hashMap.put(STAGE_ID.name, stageId.toString)
hashMap.put(STAGE_ATTEMPT.name, stageAttemptId.toString)
MessageWithContext(id, hashMap)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -989,9 +989,11 @@ private[spark] class TaskSetManager(
if (ef.className == classOf[TaskOutputFileAlreadyExistException].getName) {
// If we can not write to output file in the task, there's no point in trying to
// re-execute it.
logError(log"Task ${MDC(TASK_ID, info.id)} in stage ${MDC(STAGE_ID, taskSet.id)} " +
log"(TID ${MDC(TID, tid)}) can not write to output file: " +
log"${MDC(ERROR, ef.description)}; not retrying")
logError(
log"Task ${MDC(TASK_INDEX, info.index)}.${MDC(TASK_ATTEMPT_ID, info.attemptNumber)} " +
log"in stage ${MDC(STAGE_ID, taskSet.stageId)}." +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we unify the writing style of taskSet.logid?
image
the current writing style more consistent with traditional styles.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one only shows once...I was thinking about not having a new method

log"${MDC(STAGE_ATTEMPT, taskSet.stageAttemptId)} (TID ${MDC(TASK_ID, tid)}) " +
log"can not write to output file: ${MDC(ERROR, ef.description)}; not retrying")
emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), reason, null,
accumUpdates, metricPeaks)
abort("Task %s in stage %s (TID %d) can not write to output file: %s".format(
Expand Down Expand Up @@ -1057,8 +1059,8 @@ private[spark] class TaskSetManager(
info.host, info.executorId, index, failureReasonString))
numFailures(index) += 1
if (numFailures(index) >= maxTaskFailures) {
logError(log"Task ${MDC(TASK_ID, index)} in stage ${MDC(STAGE_ID, taskSet.id)} failed " +
log"${MDC(MAX_ATTEMPTS, maxTaskFailures)} times; aborting job")
logError(log"Task ${MDC(TASK_INDEX, index)} in stage " + taskSet.logId +
log" failed ${MDC(MAX_ATTEMPTS, maxTaskFailures)} times; aborting job")
abort("Task %d in stage %s failed %d times, most recent failure: %s\nDriver stacktrace:"
.format(index, taskSet.id, maxTaskFailures, failureReasonString), failureException)
return
Expand Down Expand Up @@ -1252,8 +1254,8 @@ private[spark] class TaskSetManager(
if (speculated) {
addPendingTask(index, speculatable = true)
logInfo(
log"Marking task ${MDC(INDEX, index)} in stage ${MDC(STAGE_ID, taskSet.id)} (on " +
log"${MDC(HOST, info.host)}) as speculatable because it ran more than " +
log"Marking task ${MDC(TASK_INDEX, index)} in stage " + taskSet.logId +
log" (on ${MDC(HOST, info.host)}) as speculatable because it ran more than " +
log"${MDC(TIMEOUT, threshold)} ms(${MDC(NUM_TASKS, speculatableTasks.size + 1)}" +
log"speculatable tasks in this taskset now)")
speculatableTasks += index
Expand Down