Skip to content
Closed
43 changes: 41 additions & 2 deletions common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,56 @@ package org.apache.spark.internal
* All structured logging keys should be defined here for standardization.
*/
object LogKey extends Enumeration {
val APPLICATION_ID = Value
val APPLICATION_STATE = Value
val APP_DESC = Value
val APP_ID = Value
val APP_STATE = Value
val BLOCK_ID = Value
val BLOCK_MANAGER_ID = Value
val BROADCAST_ID = Value
val BUCKET = Value
val CLASS_LOADER = Value
val CLASS_NAME = Value
val COMMAND = Value
val COMMAND_OUTPUT = Value
val CONFIG = Value
val CONFIG2 = Value
val CONTAINER_ID = Value
val COUNT = Value
val DRIVER_ID = Value
val ERROR = Value
val EVENT_QUEUE = Value
val EXECUTOR_ID = Value
val EXECUTOR_STATE_CHANGED = Value
val EXIT_CODE = Value
val HOST = Value
val JOB_ID = Value
val LINE = Value
val LINE_NUM = Value
val MASTER_URL = Value
val MAX_ATTEMPTS = Value
val MAX_EXECUTOR_FAILURES = Value
val MAX_SIZE = Value
val MIN_SIZE = Value
val OLD_BLOCK_MANAGER_ID = Value
val PARTITION_ID = Value
val PATH = Value
val POD_ID = Value
val REASON = Value
val REMOTE_ADDRESS = Value
val RETRY_COUNT = Value
val RPC_ADDRESS = Value
val SHUFFLE_ID = Value
val SIZE = Value
val STAGE_ID = Value
val SUBMISSION_ID = Value
val TASK_ATTEMPT_ID = Value
val TASK_ID = Value
val TASK_NAME = Value
val TASK_SET_NAME = Value
val TASK_STATE = Value
val TID = Value
val TIMEOUT = Value
val WORKER_URL = Value

type LogKey = Value
}
22 changes: 13 additions & 9 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ import org.apache.commons.io.output.{ByteArrayOutputStream => ApacheByteArrayOut
import org.roaringbitmap.RoaringBitmap

import org.apache.spark.broadcast.{Broadcast, BroadcastManager}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey._
import org.apache.spark.internal.config._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
Expand Down Expand Up @@ -725,11 +726,12 @@ private[spark] class MapOutputTrackerMaster(
// Make sure that we aren't going to exceed the max RPC message size by making sure
// we use broadcast to send large map output statuses.
if (minSizeForBroadcast > maxRpcMessageSize) {
val msg = s"${SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST.key} ($minSizeForBroadcast bytes) " +
s"must be <= spark.rpc.message.maxSize ($maxRpcMessageSize bytes) to prevent sending an " +
"rpc message that is too large."
logError(msg)
throw new IllegalArgumentException(msg)
val logEntry = log"${MDC(CONFIG, SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST.key)} " +
log"(${MDC(MIN_SIZE, minSizeForBroadcast)} bytes) " +
log"must be <= spark.rpc.message.maxSize (${MDC(MAX_SIZE, maxRpcMessageSize)} " +
log"bytes) to prevent sending an rpc message that is too large."
logError(logEntry)
throw new IllegalArgumentException(logEntry.message)
}

def post(message: MapOutputTrackerMasterMessage): Unit = {
Expand Down Expand Up @@ -815,7 +817,7 @@ private[spark] class MapOutputTrackerMaster(
case None if shuffleMigrationEnabled =>
logWarning(s"Asked to update map output for unknown shuffle ${shuffleId}")
case None =>
logError(s"Asked to update map output for unknown shuffle ${shuffleId}")
logError(log"Asked to update map output for unknown shuffle ${MDC(SHUFFLE_ID, shuffleId)}")
}
}

Expand Down Expand Up @@ -1736,9 +1738,11 @@ private[spark] object MapOutputTracker extends Logging {

def validateStatus(status: ShuffleOutputStatus, shuffleId: Int, partition: Int) : Unit = {
if (status == null) {
val errorMessage = s"Missing an output location for shuffle $shuffleId partition $partition"
// scalastyle:off line.size.limit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we split it into multiple lines? To avoid using // scalastyle:off line.size.limit

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the log is not long, one line makes it easier to read. I suggest we allow both styles.

val errorMessage = log"Missing an output location for shuffle ${MDC(SHUFFLE_ID, shuffleId)} partition ${MDC(PARTITION_ID, partition)}"
// scalastyle:on
logError(errorMessage)
throw new MetadataFetchFailedException(shuffleId, partition, errorMessage)
throw new MetadataFetchFailedException(shuffleId, partition, errorMessage.message)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.Files

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{CLASS_NAME, PATH}

/**
* Process that starts a Py4J server on an ephemeral port.
Expand All @@ -40,7 +41,7 @@ private[spark] object PythonGatewayServer extends Logging {
gatewayServer.start()
val boundPort: Int = gatewayServer.getListeningPort
if (boundPort == -1) {
logError(s"${gatewayServer.server.getClass} failed to bind; exiting")
logError(log"${MDC(CLASS_NAME, gatewayServer.server.getClass)} failed to bind; exiting")
System.exit(1)
} else {
val address = InetAddress.getLoopbackAddress()
Expand All @@ -62,7 +63,7 @@ private[spark] object PythonGatewayServer extends Logging {
dos.close()

if (!tmpPath.renameTo(connectionInfoPath)) {
logError(s"Unable to write connection information to $connectionInfoPath.")
logError(log"Unable to write connection information to ${MDC(PATH, connectionInfoPath)}.")
System.exit(1)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import org.apache.hadoop.io._

import org.apache.spark.SparkException
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.CLASS_NAME
import org.apache.spark.rdd.RDD
import org.apache.spark.util.{SerializableConfiguration, Utils}

Expand All @@ -49,7 +50,7 @@ private[python] object Converter extends Logging {
} match {
case Success(c) => c
case Failure(err) =>
logError(s"Failed to load converter: $cc")
logError(log"Failed to load converter: ${MDC(CLASS_NAME, cc)}")
throw err
}
}.getOrElse { defaultConverter }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import scala.reflect.ClassTag
import scala.util.Random

import org.apache.spark._
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.{config, Logging, MDC}
import org.apache.spark.internal.LogKey.BROADCAST_ID
import org.apache.spark.io.CompressionCodec
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage._
Expand Down Expand Up @@ -176,7 +177,9 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long, serializedO
blocks.length
} catch {
case t: Throwable =>
logError(s"Store broadcast $broadcastId fail, remove all pieces of the broadcast")
// scalastyle:off line.size.limit
logError(log"Store broadcast ${MDC(BROADCAST_ID, broadcastId)} fail, remove all pieces of the broadcast")
// scalastyle:on
blockManager.removeBroadcast(id, tellMaster = true)
throw t
}
Expand Down
16 changes: 7 additions & 9 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.{config, Logging, MDC}
import org.apache.spark.internal.LogKey.{DRIVER_ID, RPC_ADDRESS}
import org.apache.spark.internal.config.Network.RPC_ASK_TIMEOUT
import org.apache.spark.resource.ResourceUtils
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
Expand Down Expand Up @@ -173,8 +174,7 @@ private class ClientEndpoint(
// Exception, if present
exception match {
case Some(e) =>
logError(s"Exception from cluster was: $e")
e.printStackTrace()
logError("Exception from cluster", e)
System.exit(-1)
case _ =>
state.get match {
Expand All @@ -197,7 +197,7 @@ private class ClientEndpoint(
} else if (exception.exists(e => Utils.responseFromBackup(e.getMessage))) {
logDebug(s"The status response is reported from a backup spark instance. So, ignored.")
} else {
logError(s"ERROR: Cluster master did not recognize $submittedDriverID")
logError(log"ERROR: Cluster master did not recognize ${MDC(DRIVER_ID, submittedDriverID)}")
System.exit(-1)
}
}
Expand Down Expand Up @@ -226,7 +226,7 @@ private class ClientEndpoint(

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
if (!lostMasters.contains(remoteAddress)) {
logError(s"Error connecting to master $remoteAddress.")
logError(log"Error connecting to master ${MDC(RPC_ADDRESS, remoteAddress)}.")
lostMasters += remoteAddress
// Note that this heuristic does not account for the fact that a Master can recover within
// the lifetime of this client. Thus, once a Master is lost it is lost to us forever. This
Expand All @@ -240,8 +240,7 @@ private class ClientEndpoint(

override def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = {
if (!lostMasters.contains(remoteAddress)) {
logError(s"Error connecting to master ($remoteAddress).")
logError(s"Cause was: $cause")
logError(log"Error connecting to master (${MDC(RPC_ADDRESS, remoteAddress)}).", cause)
lostMasters += remoteAddress
if (lostMasters.size >= masterEndpoints.size) {
logError("No master is available, exiting.")
Expand All @@ -251,8 +250,7 @@ private class ClientEndpoint(
}

override def onError(cause: Throwable): Unit = {
logError(s"Error processing messages, exiting.")
cause.printStackTrace()
logError("Error processing messages, exiting.", cause)
System.exit(-1)
}

Expand Down
9 changes: 5 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark._
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.rest._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.CLASS_NAME
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.UI._
import org.apache.spark.launcher.SparkLauncher
Expand Down Expand Up @@ -966,7 +967,7 @@ private[spark] class SparkSubmit extends Logging {
mainClass = Utils.classForName(childMainClass)
} catch {
case e: ClassNotFoundException =>
logError(s"Failed to load class $childMainClass.")
logError(log"Failed to load class ${MDC(CLASS_NAME, childMainClass)}.")
if (childMainClass.contains("thriftserver")) {
logInfo(s"Failed to load main class $childMainClass.")
logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
Expand All @@ -977,7 +978,7 @@ private[spark] class SparkSubmit extends Logging {
}
throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
case e: NoClassDefFoundError =>
logError(s"Failed to load $childMainClass: ${e.getMessage()}")
logError(log"Failed to load ${MDC(CLASS_NAME, childMainClass)}", e)
if (e.getMessage.contains("org/apache/hadoop/hive")) {
logInfo(s"Failed to load hive class.")
logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
Expand Down Expand Up @@ -1013,7 +1014,7 @@ private[spark] class SparkSubmit extends Logging {
try {
SparkContext.getActive.foreach(_.stop())
} catch {
case e: Throwable => logError(s"Failed to close SparkContext: $e")
case e: Throwable => logError("Failed to close SparkContext", e)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import scala.util.control.NonFatal
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.deploy.history.EventFilter.FilterStatistics
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{LINE, LINE_NUM, PATH}
import org.apache.spark.scheduler._
import org.apache.spark.util.{JsonProtocol, Utils}

Expand Down Expand Up @@ -98,8 +99,8 @@ private[spark] object EventFilter extends Logging {
}
} catch {
case e: Exception =>
logError(s"Exception parsing Spark event log: ${path.getName}", e)
logError(s"Malformed line #$lineNum: $line\n")
logError(log"Exception parsing Spark event log: ${MDC(PATH, path.getName)}", e)
logError(log"Malformed line #${MDC(LINE_NUM, lineNum)}: ${MDC(LINE, line)}\n")
throw e
}
}
Expand Down
12 changes: 8 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.deploy.rest.StandaloneRestServer
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{APP_DESC, APP_ID, EXECUTOR_ID, RETRY_COUNT}
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Deploy._
import org.apache.spark.internal.config.Deploy.WorkerSelectionPolicy._
Expand Down Expand Up @@ -568,8 +569,9 @@ private[deploy] class Master(
&& maxExecutorRetries >= 0) { // < 0 disables this application-killing path
val execs = appInfo.executors.values
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
s"${appInfo.retryCount} times; removing it")
logError(log"Application ${MDC(APP_DESC, appInfo.desc.name)} " +
log"with ID ${MDC(APP_ID, appInfo.id)} " +
log"failed ${MDC(RETRY_COUNT, appInfo.retryCount)} times; removing it")
removeApplication(appInfo, ApplicationState.FAILED)
}
}
Expand Down Expand Up @@ -1245,7 +1247,9 @@ private[deploy] class Master(
Some(executorId.toInt)
} catch {
case e: NumberFormatException =>
logError(s"Encountered executor with a non-integer ID: $executorId. Ignoring")
// scalastyle:off line.size.limit
logError(log"Encountered executor with a non-integer ID: ${MDC(EXECUTOR_ID, executorId)}. Ignoring")
// scalastyle:on
None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import jakarta.servlet.http.HttpServletResponse

import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf, SparkException}
import org.apache.spark.deploy.SparkApplication
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{CLASS_NAME, ERROR, SUBMISSION_ID}
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -329,7 +330,7 @@ private[spark] class RestSubmissionClient(master: String) extends Logging {
!connection.getContentType().contains("application/json")) {
throw new SubmitRestProtocolException(s"Server responded with exception:\n${errString}")
}
logError(s"Server responded with error:\n${errString}")
logError(log"Server responded with error:\n${MDC(ERROR, errString)}")
val error = new ErrorResponse
if (responseCode == RestSubmissionServer.SC_UNKNOWN_PROTOCOL_VERSION) {
error.highestProtocolVersion = RestSubmissionServer.PROTOCOL_VERSION
Expand All @@ -350,7 +351,7 @@ private[spark] class RestSubmissionClient(master: String) extends Logging {
response match {
// If the response is an error, log the message
case error: ErrorResponse =>
logError(s"Server responded with error:\n${error.message}")
logError(log"Server responded with error:\n${MDC(ERROR, error.message)}")
error
// Otherwise, simply return the response
case response: SubmitRestProtocolResponse => response
Expand Down Expand Up @@ -447,7 +448,7 @@ private[spark] class RestSubmissionClient(master: String) extends Logging {
}
} else {
val failMessage = Option(submitResponse.message).map { ": " + _ }.getOrElse("")
logError(s"Application submission failed$failMessage")
logError(log"Application submission failed${MDC(ERROR, failMessage)}")
}
}

Expand All @@ -470,20 +471,21 @@ private[spark] class RestSubmissionClient(master: String) extends Logging {
// Log driver state, if present
driverState match {
case Some(state) => logInfo(s"State of driver $submissionId is now $state.")
case _ => logError(s"State of driver $submissionId was not found!")
case _ =>
logError(log"State of driver ${MDC(SUBMISSION_ID, submissionId)} was not found!")
}
// Log worker node, if present
(workerId, workerHostPort) match {
case (Some(id), Some(hp)) => logInfo(s"Driver is running on worker $id at $hp.")
case _ =>
}
// Log exception stack trace, if present
exception.foreach { e => logError(e) }
exception.foreach { e => logError(log"${MDC(ERROR, e)}") }
return
}
Thread.sleep(REPORT_DRIVER_STATUS_INTERVAL)
}
logError(s"Error: Master did not recognize driver $submissionId.")
logError(log"Error: Master did not recognize driver ${MDC(SUBMISSION_ID, submissionId)}.")
}

/** Log the response sent by the server in the REST application submission protocol. */
Expand All @@ -493,7 +495,9 @@ private[spark] class RestSubmissionClient(master: String) extends Logging {

/** Log an appropriate error if the response sent by the server is not of the expected type. */
private def handleUnexpectedRestResponse(unexpected: SubmitRestProtocolResponse): Unit = {
logError(s"Error: Server responded with message of unexpected type ${unexpected.messageType}.")
// scalastyle:off line.size.limit
logError(log"Error: Server responded with message of unexpected type ${MDC(CLASS_NAME, unexpected.messageType)}.")
// scalastyle:on
}

/**
Expand Down
Loading