Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.spark.internal
* All structured logging keys should be defined here for standardization.
*/
object LogKey extends Enumeration {
val ACCUMULATOR_ID = Value
val APP_DESC = Value
val APP_ID = Value
val APP_STATE = Value
Expand All @@ -33,40 +34,56 @@ object LogKey extends Enumeration {
val CLASS_NAME = Value
val COMMAND = Value
val COMMAND_OUTPUT = Value
val COMPONENT = Value
val CONFIG = Value
val CONFIG2 = Value
val CONTAINER_ID = Value
val COUNT = Value
val DRIVER_ID = Value
val END_POINT = Value
val ERROR = Value
val EVENT_LOOP = Value
val EVENT_QUEUE = Value
val EXECUTOR_ID = Value
val EXECUTOR_STATE_CHANGED = Value
val EXECUTOR_STATE = Value
val EXIT_CODE = Value
val FAILURES = Value
val HOST = Value
val JOB_ID = Value
val LEARNING_RATE = Value
val LINE = Value
val LINE_NUM = Value
val LISTENER = Value
val LOG_TYPE = Value
val MASTER_URL = Value
val MAX_ATTEMPTS = Value
val MAX_CATEGORIES = Value
val MAX_EXECUTOR_FAILURES = Value
val MAX_SIZE = Value
val MERGE_DIR_NAME = Value
val METHOD_NAME = Value
val MIN_SIZE = Value
val NUM_ITERATIONS = Value
val OBJECT_ID = Value
val OLD_BLOCK_MANAGER_ID = Value
val OPTIMIZER_CLASS_NAME = Value
val PARTITION_ID = Value
val PATH = Value
val PATHS = Value
val POD_ID = Value
val PORT = Value
val RANGE = Value
val RDD_ID = Value
val REASON = Value
val REDUCE_ID = Value
val REMOTE_ADDRESS = Value
val RETRY_COUNT = Value
val RPC_ADDRESS = Value
val SHUFFLE_BLOCK_INFO = Value
val SHUFFLE_ID = Value
val SHUFFLE_MERGE_ID = Value
val SIZE = Value
val SLEEP_TIME_SECONDS = Value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the suffix _SECONDS?
Will we encounter SLEEP_TIME_MILLISECONDS like this in the future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can handle this when there is SLEEP_TIME_MILLISECONDS

val STAGE_ID = Value
val SUBMISSION_ID = Value
val SUBSAMPLING_RATE = Value
Expand All @@ -75,8 +92,12 @@ object LogKey extends Enumeration {
val TASK_NAME = Value
val TASK_SET_NAME = Value
val TASK_STATE = Value
val THREAD = Value
val THREAD_NAME = Value
val TID = Value
val TIMEOUT = Value
val URI = Value
val USER_NAME = Value
val WORKER_URL = Value

type LogKey = Value
Expand Down
19 changes: 12 additions & 7 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, Scheduled
import scala.jdk.CollectionConverters._

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{ACCUMULATOR_ID, BROADCAST_ID, LISTENER, RDD_ID, SHUFFLE_ID}
import org.apache.spark.internal.config._
import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData}
import org.apache.spark.scheduler.SparkListener
Expand Down Expand Up @@ -226,7 +227,7 @@ private[spark] class ContextCleaner(
listeners.asScala.foreach(_.rddCleaned(rddId))
logDebug("Cleaned RDD " + rddId)
} catch {
case e: Exception => logError("Error cleaning RDD " + rddId, e)
case e: Exception => logError(log"Error cleaning RDD ${MDC(RDD_ID, rddId)}", e)
}
}

Expand All @@ -245,7 +246,7 @@ private[spark] class ContextCleaner(
logDebug("Asked to cleanup non-existent shuffle (maybe it was already removed)")
}
} catch {
case e: Exception => logError("Error cleaning shuffle " + shuffleId, e)
case e: Exception => logError(log"Error cleaning shuffle ${MDC(SHUFFLE_ID, shuffleId)}", e)
}
}

Expand All @@ -257,7 +258,8 @@ private[spark] class ContextCleaner(
listeners.asScala.foreach(_.broadcastCleaned(broadcastId))
logDebug(s"Cleaned broadcast $broadcastId")
} catch {
case e: Exception => logError("Error cleaning broadcast " + broadcastId, e)
case e: Exception =>
logError(log"Error cleaning broadcast ${MDC(BROADCAST_ID, broadcastId)}", e)
}
}

Expand All @@ -269,7 +271,8 @@ private[spark] class ContextCleaner(
listeners.asScala.foreach(_.accumCleaned(accId))
logDebug("Cleaned accumulator " + accId)
} catch {
case e: Exception => logError("Error cleaning accumulator " + accId, e)
case e: Exception =>
logError(log"Error cleaning accumulator ${MDC(ACCUMULATOR_ID, accId)}", e)
}
}

Expand All @@ -285,7 +288,8 @@ private[spark] class ContextCleaner(
logDebug("Cleaned rdd checkpoint data " + rddId)
}
catch {
case e: Exception => logError("Error cleaning rdd checkpoint data " + rddId, e)
case e: Exception =>
logError(log"Error cleaning rdd checkpoint data ${MDC(RDD_ID, rddId)}", e)
}
}

Expand All @@ -295,7 +299,8 @@ private[spark] class ContextCleaner(
sc.listenerBus.removeListener(listener)
logDebug(s"Cleaned Spark listener $listener")
} catch {
case e: Exception => logError(s"Error cleaning Spark listener $listener", e)
case e: Exception =>
logError(log"Error cleaning Spark listener ${MDC(LISTENER, listener)}", e)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ private[spark] class MapOutputTrackerMaster(
.getOrElse(Seq.empty[BlockManagerId]))
}
} catch {
case NonFatal(e) => logError(e.getMessage, e)
case NonFatal(e) => logError(log"${MDC(ERROR, e.getMessage)}", e)
}
}
} catch {
Expand Down
15 changes: 10 additions & 5 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.executor.{Executor, ExecutorMetrics, ExecutorMetricsSource}
import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Tests._
import org.apache.spark.internal.config.UI._
Expand Down Expand Up @@ -748,7 +749,9 @@ class SparkContext(config: SparkConf) extends Logging {
}
} catch {
case e: Exception =>
logError(s"Exception getting thread dump from executor $executorId", e)
logError(
log"Exception getting thread dump from executor ${MDC(LogKey.EXECUTOR_ID, executorId)}",
e)
None
}
}
Expand Down Expand Up @@ -778,7 +781,9 @@ class SparkContext(config: SparkConf) extends Logging {
}
} catch {
case e: Exception =>
logError(s"Exception getting heap histogram from executor $executorId", e)
logError(
log"Exception getting heap histogram from " +
log"executor ${MDC(LogKey.EXECUTOR_ID, executorId)}", e)
None
}
}
Expand Down Expand Up @@ -2140,7 +2145,7 @@ class SparkContext(config: SparkConf) extends Logging {
Seq(env.rpcEnv.fileServer.addJar(file))
} catch {
case NonFatal(e) =>
logError(s"Failed to add $path to Spark environment", e)
logError(log"Failed to add ${MDC(LogKey.PATH, path)} to Spark environment", e)
Nil
}
}
Expand All @@ -2161,7 +2166,7 @@ class SparkContext(config: SparkConf) extends Logging {
Seq(path)
} catch {
case NonFatal(e) =>
logError(s"Failed to add $path to Spark environment", e)
logError(log"Failed to add ${MDC(LogKey.PATH, path)} to Spark environment", e)
Nil
}
} else {
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/TaskContextImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.{config, Logging, MDC}
import org.apache.spark.internal.LogKey.LISTENER
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.metrics.source.Source
Expand Down Expand Up @@ -246,7 +247,7 @@ private[spark] class TaskContextImpl(
}
}
listenerExceptions += e
logError(s"Error in $name", e)
logError(log"Error in ${MDC(LISTENER, name)}", e)
}
}
if (listenerExceptions.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import io.netty.handler.timeout.ReadTimeoutException

import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.api.r.SerDe._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{METHOD_NAME, OBJECT_ID}
import org.apache.spark.internal.config.R._
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.ArrayImplicits._
Expand Down Expand Up @@ -76,7 +77,7 @@ private[r] class RBackendHandler(server: RBackend)
writeObject(dos, null, server.jvmObjectTracker)
} catch {
case e: Exception =>
logError(s"Removing $objId failed", e)
logError(log"Removing ${MDC(OBJECT_ID, objId)} failed", e)
writeInt(dos, -1)
writeString(dos, s"Removing $objId failed: ${e.getMessage}")
}
Expand Down Expand Up @@ -192,7 +193,7 @@ private[r] class RBackendHandler(server: RBackend)
}
} catch {
case e: Exception =>
logError(s"$methodName on $objId failed", e)
logError(log"${MDC(METHOD_NAME, methodName)} on ${MDC(OBJECT_ID, objId)} failed", e)
writeInt(dos, -1)
// Writing the error message of the cause for the exception. This will be returned
// to user in the R process.
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.internal.{config, Logging, MDC}
import org.apache.spark.internal.LogKey.{DRIVER_ID, RPC_ADDRESS}
import org.apache.spark.internal.LogKey.{DRIVER_ID, ERROR, RPC_ADDRESS}
import org.apache.spark.internal.config.Network.RPC_ASK_TIMEOUT
import org.apache.spark.resource.ResourceUtils
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
Expand Down Expand Up @@ -61,7 +61,7 @@ private class ClientEndpoint(
t => t match {
case ie: InterruptedException => // Exit normally
case e: Throwable =>
logError(e.getMessage, e)
logError(log"${MDC(ERROR, e.getMessage)}", e)
System.exit(SparkExitCode.UNCAUGHT_EXCEPTION)
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import org.json4s.{DefaultFormats, Extraction, Formats}
import org.json4s.jackson.JsonMethods.{compact, render}

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.COMPONENT
import org.apache.spark.resource.{ResourceAllocation, ResourceID, ResourceInformation, ResourceRequirement}
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -103,9 +104,10 @@ private[spark] object StandaloneResourceUtils extends Logging {
writeResourceAllocationJson(allocations, tmpFile)
} catch {
case NonFatal(e) =>
val errMsg = s"Exception threw while preparing resource file for $compShortName"
val errMsg =
log"Exception threw while preparing resource file for ${MDC(COMPONENT, compShortName)}"
logError(errMsg, e)
throw new SparkException(errMsg, e)
throw new SparkException(errMsg.message, e)
}
val resourcesFile = File.createTempFile(s"resource-$compShortName-", ".json", dir)
tmpFile.renameTo(resourcesFile)
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import java.io.File
import jakarta.servlet.http.HttpServletRequest

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{LOG_TYPE, PATH}
import org.apache.spark.ui.JettyUtils.createServletHandler
import org.apache.spark.ui.WebUI
import org.apache.spark.util.Utils.{getFileLength, offsetBytes}
Expand Down Expand Up @@ -95,7 +96,8 @@ private[deploy] object Utils extends Logging {
(logText, startIndex, endIndex, totalLength)
} catch {
case e: Exception =>
logError(s"Error getting $logType logs from directory $logDirectory", e)
logError(log"Error getting ${MDC(LOG_TYPE, logType)} logs from " +
log"directory ${MDC(PATH, logDirectory)}", e)
("Error getting logs due to exception: " + e.getMessage, 0, 0, 0)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import org.apache.hadoop.security.AccessControlException

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.PATH
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.History._
import org.apache.spark.internal.config.Status._
Expand Down Expand Up @@ -920,7 +921,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
case e: AccessControlException =>
logWarning(s"Insufficient permission while compacting log for $rootPath", e)
case e: Exception =>
logError(s"Exception while compacting log for $rootPath", e)
logError(log"Exception while compacting log for ${MDC(PATH, rootPath)}", e)
} finally {
endProcessing(rootPath)
}
Expand Down Expand Up @@ -1402,7 +1403,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
case _: AccessControlException =>
logInfo(s"No permission to delete $log, ignoring.")
case ioe: IOException =>
logError(s"IOException in cleaning $log", ioe)
logError(log"IOException in cleaning ${MDC(PATH, log)}", ioe)
}
}
deleted
Expand Down
Loading