diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 86ea648d47c1..608c0c6d521e 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -21,17 +21,56 @@ package org.apache.spark.internal * All structured logging keys should be defined here for standardization. */ object LogKey extends Enumeration { - val APPLICATION_ID = Value - val APPLICATION_STATE = Value + val APP_DESC = Value + val APP_ID = Value + val APP_STATE = Value + val BLOCK_ID = Value + val BLOCK_MANAGER_ID = Value + val BROADCAST_ID = Value val BUCKET = Value + val CLASS_LOADER = Value + val CLASS_NAME = Value + val COMMAND = Value + val COMMAND_OUTPUT = Value + val CONFIG = Value + val CONFIG2 = Value val CONTAINER_ID = Value + val COUNT = Value + val DRIVER_ID = Value + val ERROR = Value + val EVENT_QUEUE = Value val EXECUTOR_ID = Value + val EXECUTOR_STATE_CHANGED = Value val EXIT_CODE = Value + val HOST = Value + val JOB_ID = Value + val LINE = Value + val LINE_NUM = Value + val MASTER_URL = Value + val MAX_ATTEMPTS = Value val MAX_EXECUTOR_FAILURES = Value val MAX_SIZE = Value val MIN_SIZE = Value + val OLD_BLOCK_MANAGER_ID = Value + val PARTITION_ID = Value + val PATH = Value val POD_ID = Value + val REASON = Value val REMOTE_ADDRESS = Value + val RETRY_COUNT = Value + val RPC_ADDRESS = Value + val SHUFFLE_ID = Value + val SIZE = Value + val STAGE_ID = Value + val SUBMISSION_ID = Value + val TASK_ATTEMPT_ID = Value + val TASK_ID = Value + val TASK_NAME = Value + val TASK_SET_NAME = Value + val TASK_STATE = Value + val TID = Value + val TIMEOUT = Value + val WORKER_URL = Value type LogKey = Value } diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index ed590a67522c..faa8504df365 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -34,7 +34,8 @@ import org.apache.commons.io.output.{ByteArrayOutputStream => ApacheByteArrayOut import org.roaringbitmap.RoaringBitmap import org.apache.spark.broadcast.{Broadcast, BroadcastManager} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey._ import org.apache.spark.internal.config._ import org.apache.spark.io.CompressionCodec import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv} @@ -725,11 +726,12 @@ private[spark] class MapOutputTrackerMaster( // Make sure that we aren't going to exceed the max RPC message size by making sure // we use broadcast to send large map output statuses. if (minSizeForBroadcast > maxRpcMessageSize) { - val msg = s"${SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST.key} ($minSizeForBroadcast bytes) " + - s"must be <= spark.rpc.message.maxSize ($maxRpcMessageSize bytes) to prevent sending an " + - "rpc message that is too large." - logError(msg) - throw new IllegalArgumentException(msg) + val logEntry = log"${MDC(CONFIG, SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST.key)} " + + log"(${MDC(MIN_SIZE, minSizeForBroadcast)} bytes) " + + log"must be <= spark.rpc.message.maxSize (${MDC(MAX_SIZE, maxRpcMessageSize)} " + + log"bytes) to prevent sending an rpc message that is too large." + logError(logEntry) + throw new IllegalArgumentException(logEntry.message) } def post(message: MapOutputTrackerMasterMessage): Unit = { @@ -815,7 +817,7 @@ private[spark] class MapOutputTrackerMaster( case None if shuffleMigrationEnabled => logWarning(s"Asked to update map output for unknown shuffle ${shuffleId}") case None => - logError(s"Asked to update map output for unknown shuffle ${shuffleId}") + logError(log"Asked to update map output for unknown shuffle ${MDC(SHUFFLE_ID, shuffleId)}") } } @@ -1736,9 +1738,11 @@ private[spark] object MapOutputTracker extends Logging { def validateStatus(status: ShuffleOutputStatus, shuffleId: Int, partition: Int) : Unit = { if (status == null) { - val errorMessage = s"Missing an output location for shuffle $shuffleId partition $partition" + // scalastyle:off line.size.limit + val errorMessage = log"Missing an output location for shuffle ${MDC(SHUFFLE_ID, shuffleId)} partition ${MDC(PARTITION_ID, partition)}" + // scalastyle:on logError(errorMessage) - throw new MetadataFetchFailedException(shuffleId, partition, errorMessage) + throw new MetadataFetchFailedException(shuffleId, partition, errorMessage.message) } } } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala b/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala index b4aaea648715..fb0584b45846 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala @@ -23,7 +23,8 @@ import java.nio.charset.StandardCharsets.UTF_8 import java.nio.file.Files import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{CLASS_NAME, PATH} /** * Process that starts a Py4J server on an ephemeral port. @@ -40,7 +41,7 @@ private[spark] object PythonGatewayServer extends Logging { gatewayServer.start() val boundPort: Int = gatewayServer.getListeningPort if (boundPort == -1) { - logError(s"${gatewayServer.server.getClass} failed to bind; exiting") + logError(log"${MDC(CLASS_NAME, gatewayServer.server.getClass)} failed to bind; exiting") System.exit(1) } else { val address = InetAddress.getLoopbackAddress() @@ -62,7 +63,7 @@ private[spark] object PythonGatewayServer extends Logging { dos.close() if (!tmpPath.renameTo(connectionInfoPath)) { - logError(s"Unable to write connection information to $connectionInfoPath.") + logError(log"Unable to write connection information to ${MDC(PATH, connectionInfoPath)}.") System.exit(1) } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala index 3f476772d65a..4f7c5bc0b0c0 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala @@ -25,7 +25,8 @@ import org.apache.hadoop.io._ import org.apache.spark.SparkException import org.apache.spark.broadcast.Broadcast -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.CLASS_NAME import org.apache.spark.rdd.RDD import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -49,7 +50,7 @@ private[python] object Converter extends Logging { } match { case Success(c) => c case Failure(err) => - logError(s"Failed to load converter: $cc") + logError(log"Failed to load converter: ${MDC(CLASS_NAME, cc)}") throw err } }.getOrElse { defaultConverter } diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 0d35d1bd1f29..b6ba9bbf29f3 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -27,7 +27,8 @@ import scala.reflect.ClassTag import scala.util.Random import org.apache.spark._ -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKey.BROADCAST_ID import org.apache.spark.io.CompressionCodec import org.apache.spark.serializer.Serializer import org.apache.spark.storage._ @@ -176,7 +177,9 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long, serializedO blocks.length } catch { case t: Throwable => - logError(s"Store broadcast $broadcastId fail, remove all pieces of the broadcast") + // scalastyle:off line.size.limit + logError(log"Store broadcast ${MDC(BROADCAST_ID, broadcastId)} fail, remove all pieces of the broadcast") + // scalastyle:on blockManager.removeBroadcast(id, tellMaster = true) throw t } diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 13a67a794c83..1eec3e82f1b7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -31,7 +31,8 @@ import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.deploy.master.DriverState.DriverState -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKey.{DRIVER_ID, RPC_ADDRESS} import org.apache.spark.internal.config.Network.RPC_ASK_TIMEOUT import org.apache.spark.resource.ResourceUtils import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} @@ -173,8 +174,7 @@ private class ClientEndpoint( // Exception, if present exception match { case Some(e) => - logError(s"Exception from cluster was: $e") - e.printStackTrace() + logError("Exception from cluster", e) System.exit(-1) case _ => state.get match { @@ -197,7 +197,7 @@ private class ClientEndpoint( } else if (exception.exists(e => Utils.responseFromBackup(e.getMessage))) { logDebug(s"The status response is reported from a backup spark instance. So, ignored.") } else { - logError(s"ERROR: Cluster master did not recognize $submittedDriverID") + logError(log"ERROR: Cluster master did not recognize ${MDC(DRIVER_ID, submittedDriverID)}") System.exit(-1) } } @@ -226,7 +226,7 @@ private class ClientEndpoint( override def onDisconnected(remoteAddress: RpcAddress): Unit = { if (!lostMasters.contains(remoteAddress)) { - logError(s"Error connecting to master $remoteAddress.") + logError(log"Error connecting to master ${MDC(RPC_ADDRESS, remoteAddress)}.") lostMasters += remoteAddress // Note that this heuristic does not account for the fact that a Master can recover within // the lifetime of this client. Thus, once a Master is lost it is lost to us forever. This @@ -240,8 +240,7 @@ private class ClientEndpoint( override def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = { if (!lostMasters.contains(remoteAddress)) { - logError(s"Error connecting to master ($remoteAddress).") - logError(s"Cause was: $cause") + logError(log"Error connecting to master (${MDC(RPC_ADDRESS, remoteAddress)}).", cause) lostMasters += remoteAddress if (lostMasters.size >= masterEndpoints.size) { logError("No master is available, exiting.") @@ -251,8 +250,7 @@ private class ClientEndpoint( } override def onError(cause: Throwable): Unit = { - logError(s"Error processing messages, exiting.") - cause.printStackTrace() + logError("Error processing messages, exiting.", cause) System.exit(-1) } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 789dc5d50ffc..9ab394741a82 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -39,7 +39,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.spark._ import org.apache.spark.api.r.RUtils import org.apache.spark.deploy.rest._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.CLASS_NAME import org.apache.spark.internal.config._ import org.apache.spark.internal.config.UI._ import org.apache.spark.launcher.SparkLauncher @@ -966,7 +967,7 @@ private[spark] class SparkSubmit extends Logging { mainClass = Utils.classForName(childMainClass) } catch { case e: ClassNotFoundException => - logError(s"Failed to load class $childMainClass.") + logError(log"Failed to load class ${MDC(CLASS_NAME, childMainClass)}.") if (childMainClass.contains("thriftserver")) { logInfo(s"Failed to load main class $childMainClass.") logInfo("You need to build Spark with -Phive and -Phive-thriftserver.") @@ -977,7 +978,7 @@ private[spark] class SparkSubmit extends Logging { } throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS) case e: NoClassDefFoundError => - logError(s"Failed to load $childMainClass: ${e.getMessage()}") + logError(log"Failed to load ${MDC(CLASS_NAME, childMainClass)}", e) if (e.getMessage.contains("org/apache/hadoop/hive")) { logInfo(s"Failed to load hive class.") logInfo("You need to build Spark with -Phive and -Phive-thriftserver.") @@ -1013,7 +1014,7 @@ private[spark] class SparkSubmit extends Logging { try { SparkContext.getActive.foreach(_.stop()) } catch { - case e: Throwable => logError(s"Failed to close SparkContext: $e") + case e: Throwable => logError("Failed to close SparkContext", e) } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala index 02c01a5598e8..59e52e649498 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala @@ -23,7 +23,8 @@ import scala.util.control.NonFatal import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.deploy.history.EventFilter.FilterStatistics -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{LINE, LINE_NUM, PATH} import org.apache.spark.scheduler._ import org.apache.spark.util.{JsonProtocol, Utils} @@ -98,8 +99,8 @@ private[spark] object EventFilter extends Logging { } } catch { case e: Exception => - logError(s"Exception parsing Spark event log: ${path.getName}", e) - logError(s"Malformed line #$lineNum: $line\n") + logError(log"Exception parsing Spark event log: ${MDC(PATH, path.getName)}", e) + logError(log"Malformed line #${MDC(LINE_NUM, lineNum)}: ${MDC(LINE, line)}\n") throw e } } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index fd3dc98be352..283443425635 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -32,7 +32,8 @@ import org.apache.spark.deploy.master.DriverState.DriverState import org.apache.spark.deploy.master.MasterMessages._ import org.apache.spark.deploy.master.ui.MasterWebUI import org.apache.spark.deploy.rest.StandaloneRestServer -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{APP_DESC, APP_ID, EXECUTOR_ID, RETRY_COUNT} import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Deploy._ import org.apache.spark.internal.config.Deploy.WorkerSelectionPolicy._ @@ -568,8 +569,9 @@ private[deploy] class Master( && maxExecutorRetries >= 0) { // < 0 disables this application-killing path val execs = appInfo.executors.values if (!execs.exists(_.state == ExecutorState.RUNNING)) { - logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " + - s"${appInfo.retryCount} times; removing it") + logError(log"Application ${MDC(APP_DESC, appInfo.desc.name)} " + + log"with ID ${MDC(APP_ID, appInfo.id)} " + + log"failed ${MDC(RETRY_COUNT, appInfo.retryCount)} times; removing it") removeApplication(appInfo, ApplicationState.FAILED) } } @@ -1245,7 +1247,9 @@ private[deploy] class Master( Some(executorId.toInt) } catch { case e: NumberFormatException => - logError(s"Encountered executor with a non-integer ID: $executorId. Ignoring") + // scalastyle:off line.size.limit + logError(log"Encountered executor with a non-integer ID: ${MDC(EXECUTOR_ID, executorId)}. Ignoring") + // scalastyle:on None } } diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala index 71c07ce33d3d..9107b2f5528c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala @@ -33,7 +33,8 @@ import jakarta.servlet.http.HttpServletResponse import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf, SparkException} import org.apache.spark.deploy.SparkApplication -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{CLASS_NAME, ERROR, SUBMISSION_ID} import org.apache.spark.util.Utils /** @@ -329,7 +330,7 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { !connection.getContentType().contains("application/json")) { throw new SubmitRestProtocolException(s"Server responded with exception:\n${errString}") } - logError(s"Server responded with error:\n${errString}") + logError(log"Server responded with error:\n${MDC(ERROR, errString)}") val error = new ErrorResponse if (responseCode == RestSubmissionServer.SC_UNKNOWN_PROTOCOL_VERSION) { error.highestProtocolVersion = RestSubmissionServer.PROTOCOL_VERSION @@ -350,7 +351,7 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { response match { // If the response is an error, log the message case error: ErrorResponse => - logError(s"Server responded with error:\n${error.message}") + logError(log"Server responded with error:\n${MDC(ERROR, error.message)}") error // Otherwise, simply return the response case response: SubmitRestProtocolResponse => response @@ -447,7 +448,7 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { } } else { val failMessage = Option(submitResponse.message).map { ": " + _ }.getOrElse("") - logError(s"Application submission failed$failMessage") + logError(log"Application submission failed${MDC(ERROR, failMessage)}") } } @@ -470,7 +471,8 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { // Log driver state, if present driverState match { case Some(state) => logInfo(s"State of driver $submissionId is now $state.") - case _ => logError(s"State of driver $submissionId was not found!") + case _ => + logError(log"State of driver ${MDC(SUBMISSION_ID, submissionId)} was not found!") } // Log worker node, if present (workerId, workerHostPort) match { @@ -478,12 +480,12 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { case _ => } // Log exception stack trace, if present - exception.foreach { e => logError(e) } + exception.foreach { e => logError(log"${MDC(ERROR, e)}") } return } Thread.sleep(REPORT_DRIVER_STATUS_INTERVAL) } - logError(s"Error: Master did not recognize driver $submissionId.") + logError(log"Error: Master did not recognize driver ${MDC(SUBMISSION_ID, submissionId)}.") } /** Log the response sent by the server in the REST application submission protocol. */ @@ -493,7 +495,9 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { /** Log an appropriate error if the response sent by the server is not of the expected type. */ private def handleUnexpectedRestResponse(unexpected: SubmitRestProtocolResponse): Unit = { - logError(s"Error: Server responded with message of unexpected type ${unexpected.messageType}.") + // scalastyle:off line.size.limit + logError(log"Error: Server responded with message of unexpected type ${MDC(CLASS_NAME, unexpected.messageType)}.") + // scalastyle:on } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 96ceb1e5e121..e2ba221fb00c 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -37,7 +37,8 @@ import org.apache.spark.deploy.ExternalShuffleService import org.apache.spark.deploy.StandaloneResourceUtils._ import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.deploy.worker.ui.WorkerWebUI -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKey.{DRIVER_ID, ERROR, EXECUTOR_STATE_CHANGED, MASTER_URL, MAX_ATTEMPTS} import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.internal.config.UI._ import org.apache.spark.internal.config.Worker._ @@ -498,7 +499,7 @@ private[deploy] class Worker( case RegisterWorkerFailed(message) => if (!registered) { - logError("Worker registration failed: " + message) + logError(log"Worker registration failed: ${MDC(ERROR, message)}") System.exit(1) } @@ -690,7 +691,7 @@ private[deploy] class Worker( case Some(runner) => runner.kill() case None => - logError(s"Asked to kill unknown driver $driverId") + logError(log"Asked to kill unknown driver ${MDC(DRIVER_ID, driverId)}") } case driverStateChanged @ DriverStateChanged(driverId, state, exception) => @@ -807,8 +808,9 @@ private[deploy] class Worker( } self.send(newState) } else { - logError(s"Failed to send $newState to Master $masterRef for " + - s"$executorStateSyncMaxAttempts times. Giving up.") + logError(log"Failed to send ${MDC(EXECUTOR_STATE_CHANGED, newState)} " + + log"to Master ${MDC(MASTER_URL, masterRef)} for " + + log"${MDC(MAX_ATTEMPTS, executorStateSyncMaxAttempts)} times. Giving up.") System.exit(1) } }(executorStateSyncFailureHandler) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala index deb5bb1a6974..248899411274 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala @@ -19,7 +19,8 @@ package org.apache.spark.deploy.worker import java.util.concurrent.atomic.AtomicBoolean -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.WORKER_URL import org.apache.spark.rpc._ /** @@ -75,7 +76,7 @@ private[spark] class WorkerWatcher( override def onDisconnected(remoteAddress: RpcAddress): Unit = { if (isWorker(remoteAddress)) { // This log message will never be seen - logError(s"Lost connection to worker rpc endpoint $workerUrl. Exiting.") + logError(log"Lost connection to worker rpc endpoint ${MDC(WORKER_URL, workerUrl)}. Exiting.") exitNonZero() } } @@ -83,8 +84,9 @@ private[spark] class WorkerWatcher( override def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = { if (isWorker(remoteAddress)) { // These logs may not be seen if the worker (and associated pipe) has died - logError(s"Could not initialize connection to worker $workerUrl. Exiting.") - logError(s"Error was: $cause") + logError( + log"Could not initialize connection to worker ${MDC(WORKER_URL, workerUrl)}. Exiting.", + cause) exitNonZero() } } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 85acfd81482a..8488333ec3ce 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -31,7 +31,8 @@ import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.worker.WorkerWatcher -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{CONFIG, CONFIG2, REASON} import org.apache.spark.internal.config._ import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.util.NettyUtils @@ -282,7 +283,7 @@ private[spark] class CoarseGrainedExecutorBackend( throwable: Throwable = null, notifyDriver: Boolean = true) = { if (stopping.compareAndSet(false, true)) { - val message = "Executor self-exiting due to : " + reason + val message = log"Executor self-exiting due to : ${MDC(REASON, reason)}" if (throwable != null) { logError(message, throwable) } else { @@ -320,9 +321,9 @@ private[spark] class CoarseGrainedExecutorBackend( if (migrationEnabled) { env.blockManager.decommissionBlockManager() } else if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) { - logError(s"Storage decommissioning attempted but neither " + - s"${STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED.key} or " + - s"${STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED.key} is enabled ") + logError(log"Storage decommissioning attempted but neither " + + log"${MDC(CONFIG, STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED.key)} or " + + log"${MDC(CONFIG2, STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED.key)} is enabled ") } if (executor != null) { executor.decommission() diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 206b293e08c2..67d0c37c3edd 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -40,7 +40,8 @@ import org.slf4j.MDC import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC => LogMDC} +import org.apache.spark.internal.LogKey.{ERROR, MAX_ATTEMPTS, TASK_ID, TASK_NAME, TIMEOUT} import org.apache.spark.internal.config._ import org.apache.spark.internal.plugin.PluginContainer import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager} @@ -795,7 +796,7 @@ private[spark] class Executor( case t: Throwable if env.isStopped => // Log the expected exception after executor.stop without stack traces // see: SPARK-19147 - logError(s"Exception in $taskName: ${t.getMessage}") + logError(log"Exception in ${LogMDC(TASK_NAME, taskName)}: ${LogMDC(ERROR, t.getMessage)}") case t: Throwable => // Attempt to exit cleanly by informing the driver of our failure. @@ -1011,8 +1012,9 @@ private[spark] class Executor( if (!taskRunner.isFinished && timeoutExceeded()) { val killTimeoutMs = TimeUnit.NANOSECONDS.toMillis(killTimeoutNs) if (isLocal) { - logError(s"Killed task $taskId could not be stopped within $killTimeoutMs ms; " + - "not killing JVM because we are running in local mode.") + logError(log"Killed task ${LogMDC(TASK_ID, taskId)} could not be stopped within " + + log"${LogMDC(TIMEOUT, killTimeoutMs)} ms; " + + log"not killing JVM because we are running in local mode.") } else { // In non-local-mode, the exception thrown here will bubble up to the uncaught exception // handler and cause the executor JVM to exit. @@ -1244,8 +1246,8 @@ private[spark] class Executor( logWarning("Issue communicating with driver in heartbeater", e) heartbeatFailures += 1 if (heartbeatFailures >= HEARTBEAT_MAX_FAILURES) { - logError(s"Exit as unable to send heartbeats to driver " + - s"more than $HEARTBEAT_MAX_FAILURES times") + logError(log"Exit as unable to send heartbeats to driver " + + log"more than ${LogMDC(MAX_ATTEMPTS, HEARTBEAT_MAX_FAILURES)} times") System.exit(ExecutorExitCode.HEARTBEAT_FAILURE) } } diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala index a84fadcf965b..20239980eee5 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala @@ -32,7 +32,8 @@ import org.apache.hadoop.mapreduce.task.{TaskAttemptContextImpl => NewTaskAttemp import org.apache.spark.{SerializableWritable, SparkConf, SparkException, TaskContext} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.TASK_ATTEMPT_ID import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.rdd.{HadoopRDD, RDD} import org.apache.spark.util.{SerializableConfiguration, SerializableJobConf, Utils} @@ -152,7 +153,7 @@ object SparkHadoopWriter extends Logging { config.closeWriter(taskContext) } finally { committer.abortTask(taskContext) - logError(s"Task ${taskContext.getTaskAttemptID} aborted.") + logError(log"Task ${MDC(TASK_ATTEMPT_ID, taskContext.getTaskAttemptID)} aborted.") } }) diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 777bc0a60e01..05e0b6b9c4ef 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -26,7 +26,8 @@ import com.codahale.metrics.{Metric, MetricRegistry} import org.eclipse.jetty.servlet.ServletContextHandler import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.CLASS_NAME import org.apache.spark.internal.config._ import org.apache.spark.metrics.sink.{MetricsServlet, PrometheusServlet, Sink} import org.apache.spark.metrics.source.{Source, StaticSources} @@ -227,7 +228,7 @@ private[spark] class MetricsSystem private ( } } catch { case e: Exception => - logError("Sink class " + classPath + " cannot be instantiated") + logError(log"Sink class ${MDC(CLASS_NAME, classPath)} cannot be instantiated") throw e } } diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index c4a35c4cdb47..086df6231324 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -34,6 +34,8 @@ import scala.reflect.ClassTag import org.apache.spark.{Partition, TaskContext} import org.apache.spark.errors.SparkCoreErrors +import org.apache.spark.internal.LogKey.{COMMAND, ERROR} +import org.apache.spark.internal.MDC import org.apache.spark.util.Utils @@ -221,8 +223,8 @@ private[spark] class PipedRDD[T: ClassTag]( val t = childThreadException.get() if (t != null) { val commandRan = command.mkString(" ") - logError(s"Caught exception while running pipe() operator. Command ran: $commandRan. " + - s"Exception: ${t.getMessage}") + logError(log"Caught exception while running pipe() operator. Command ran: " + + log"${MDC(COMMAND, commandRan)}. Exception: ${MDC(ERROR, t.getMessage)}") proc.destroy() cleanup() throw t diff --git a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala index 5164c30fce0a..2709fe27f406 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala @@ -23,7 +23,8 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import com.codahale.metrics.{Gauge, Timer} import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.EVENT_QUEUE import org.apache.spark.internal.config._ import org.apache.spark.util.Utils @@ -164,9 +165,9 @@ private class AsyncEventQueue( droppedEventsCounter.incrementAndGet() if (logDroppedEvent.compareAndSet(false, true)) { // Only log the following message once to avoid duplicated annoying logs. - logError(s"Dropping event from queue $name. " + - "This likely means one of the listeners is too slow and cannot keep up with " + - "the rate at which tasks are being started by the scheduler.") + logError(log"Dropping event from queue ${MDC(EVENT_QUEUE, name)}. " + + log"This likely means one of the listeners is too slow and cannot keep up with " + + log"the rate at which tasks are being started by the scheduler.") } logTrace(s"Dropping event $event") diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b4333d5533c5..7ee8dc7ec0c8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -36,8 +36,8 @@ import org.apache.spark._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} -import org.apache.spark.internal.Logging -import org.apache.spark.internal.config +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKey.{JOB_ID, STAGE_ID} import org.apache.spark.internal.config.{LEGACY_ABORT_STAGE_AFTER_KILL_TASKS, RDD_CACHE_VISIBILITY_TRACKING_ENABLED} import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY import org.apache.spark.network.shuffle.{BlockStoreClient, MergeFinalizerListener} @@ -857,7 +857,7 @@ private[spark] class DAGScheduler( private def cleanupStateForJobAndIndependentStages(job: ActiveJob): Unit = { val registeredStages = jobIdToStageIds.get(job.jobId) if (registeredStages.isEmpty || registeredStages.get.isEmpty) { - logError("No stages registered for job " + job.jobId) + logError(log"No stages registered for job ${MDC(JOB_ID, job.jobId)}") } else { stageIdToStage.filter { case (stageId, _) => registeredStages.get.contains(stageId) @@ -865,9 +865,9 @@ private[spark] class DAGScheduler( case (stageId, stage) => val jobSet = stage.jobIds if (!jobSet.contains(job.jobId)) { - logError( - "Job %d not registered for stage %d even though that stage was registered for the job" - .format(job.jobId, stageId)) + // scalastyle:off line.size.limit + logError(log"Job ${MDC(JOB_ID, job.jobId)} not registered for stage ${MDC(STAGE_ID, stageId)} even though that stage was registered for the job") + // scalastyle:on } else { def removeStage(stageId: Int): Unit = { // data structures based on Stage @@ -2868,17 +2868,16 @@ private[spark] class DAGScheduler( var ableToCancelStages = true val stages = jobIdToStageIds(job.jobId) if (stages.isEmpty) { - logError(s"No stages registered for job ${job.jobId}") + logError(log"No stages registered for job ${MDC(JOB_ID, job.jobId)}") } stages.foreach { stageId => val jobsForStage: Option[HashSet[Int]] = stageIdToStage.get(stageId).map(_.jobIds) if (jobsForStage.isEmpty || !jobsForStage.get.contains(job.jobId)) { - logError( - "Job %d not registered for stage %d even though that stage was registered for the job" - .format(job.jobId, stageId)) + logError(log"Job ${MDC(JOB_ID, job.jobId)} not registered for stage " + + log"${MDC(STAGE_ID, stageId)} even though that stage was registered for the job") } else if (jobsForStage.get.size == 1) { if (!stageIdToStage.contains(stageId)) { - logError(s"Missing Stage for stage with id $stageId") + logError(log"Missing Stage for stage with id ${MDC(STAGE_ID, stageId)}") } else { // This stage is only used by the job, so finish the stage if it is running. val stage = stageIdToStage(stageId) diff --git a/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala index d7ddeade2fd4..aaa9e5bdd9e1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala @@ -22,8 +22,8 @@ import java.util.concurrent.atomic.AtomicReference import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkContext} -import org.apache.spark.internal.Logging -import org.apache.spark.internal.config +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKey.HOST import org.apache.spark.util.{Clock, SystemClock, Utils} /** @@ -199,13 +199,13 @@ private[scheduler] class HealthTracker ( logInfo(s"Decommissioning all executors on excluded host $node " + s"since ${config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key} is set.") if (!a.decommissionExecutorsOnHost(node)) { - logError(s"Decommissioning executors on $node failed.") + logError(log"Decommissioning executors on ${MDC(HOST, node)} failed.") } } else { logInfo(s"Killing all executors on excluded host $node " + s"since ${config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key} is set.") if (!a.killExecutorsOnHost(node)) { - logError(s"Killing executors on node $node failed.") + logError(log"Killing executors on node ${MDC(HOST, node)} failed.") } } case None => diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index ca12835c32bc..bd0bff18ff57 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -30,7 +30,8 @@ import com.codahale.metrics.{Counter, MetricRegistry, Timer} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.errors.SparkCoreErrors -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{CLASS_NAME, MAX_SIZE} import org.apache.spark.internal.config._ import org.apache.spark.metrics.MetricsSystem import org.apache.spark.metrics.source.Source @@ -288,8 +289,9 @@ private[spark] class LiveListenerBusMetrics(conf: SparkConf) if (perListenerClassTimers.size == maxTimed) { if (maxTimed != 0) { // Explicitly disabled. - logError(s"Not measuring processing time for listener class $className because a " + - s"maximum of $maxTimed listener classes are already timed.") + logError(log"Not measuring processing time for listener class " + + log"${MDC(CLASS_NAME, className)} because a " + + log"maximum of ${MDC(MAX_SIZE, maxTimed)} listener classes are already timed.") } None } else { diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index dbb4fa74ded1..a3b8f1206b9d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -24,7 +24,8 @@ import scala.io.{Codec, Source} import com.fasterxml.jackson.core.JsonParseException import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{LINE, LINE_NUM} import org.apache.spark.scheduler.ReplayListenerBus._ import org.apache.spark.util.JsonProtocol @@ -125,7 +126,7 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { throw ioe case e: Exception => logError(s"Exception parsing Spark event log: $sourceName", e) - logError(s"Malformed line #$lineNumber: $currentLine\n") + logError(log"Malformed line #${MDC(LINE_NUM, lineNumber)}: ${MDC(LINE, currentLine)}\n") false } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index a4f293950955..e93bc0747349 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -25,7 +25,8 @@ import scala.util.control.NonFatal import org.apache.spark._ import org.apache.spark.TaskState.TaskState -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.CLASS_LOADER import org.apache.spark.serializer.{SerializerHelper, SerializerInstance} import org.apache.spark.util.{LongAccumulator, ThreadUtils, Utils} @@ -148,7 +149,8 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul // Log an error but keep going here -- the task failed, so not catastrophic // if we can't deserialize the reason. logError( - "Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader) + log"Could not deserialize TaskEndReason: ClassNotFound with classloader " + + log"${MDC(CLASS_LOADER, loader)}") case _: Exception => // No-op } finally { // If there's an error while deserializing the TaskEndReason, this Runnable diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index dc202aa1bb71..17c44926d626 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -33,7 +33,8 @@ import org.apache.spark.InternalAccumulator.{input, shuffleRead} import org.apache.spark.TaskState.TaskState import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.executor.ExecutorMetrics -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{config, Logging, LogKey, MDC} +import org.apache.spark.internal.LogKey.{REASON, TASK_SET_NAME, TASK_STATE, TID} import org.apache.spark.internal.config._ import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc.RpcEndpoint @@ -426,7 +427,9 @@ private[spark] class TaskSchedulerImpl( } } catch { case e: TaskNotSerializableException => - logError(s"Resource offer failed, task set ${taskSet.name} was not serializable") + // scalastyle:off line.size.limit + logError(log"Resource offer failed, task set ${MDC(TASK_SET_NAME, taskSet.name)} was not serializable") + // scalastyle:on // Do not offer resources for this task, but don't throw an error to allow other // task sets to be submitted. return (noDelayScheduleRejects, minLaunchedLocality) @@ -805,10 +808,8 @@ private[spark] class TaskSchedulerImpl( } case None => logError( - ("Ignoring update with state %s for TID %s because its task set is gone (this is " + - "likely the result of receiving duplicate task finished status updates) or its " + - "executor has been marked as failed.") - .format(state, tid)) + log"Ignoring update with state ${MDC(TASK_STATE, state)} for TID ${MDC(TID, tid)} because its task set is gone (this is " + + log"likely the result of receiving duplicate task finished status updates) or its executor has been marked as failed.") } } catch { case e: Exception => logError("Exception in statusUpdate", e) @@ -1023,7 +1024,7 @@ private[spark] class TaskSchedulerImpl( // one may be triggered by a dropped connection from the worker while another may be a // report of executor termination. We produce log messages for both so we // eventually report the termination reason. - logError(s"Lost an executor $executorId (already removed): $reason") + logError(log"Lost an executor ${MDC(LogKey.EXECUTOR_ID, executorId)} (already removed): ${MDC(REASON, reason)}") } } } @@ -1051,7 +1052,7 @@ private[spark] class TaskSchedulerImpl( logInfo(s"Executor $executorId on $hostPort is decommissioned" + s"${getDecommissionDuration(executorId)}.") case _ => - logError(s"Lost executor $executorId on $hostPort: $reason") + logError(log"Lost executor ${MDC(LogKey.EXECUTOR_ID, executorId)} on ${MDC(LogKey.HOST, hostPort)}: ${MDC(REASON, reason)}") } // return decommission duration in string or "" if decommission startTime not exists diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index b8ba6375e27a..dc0656778455 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -30,7 +30,8 @@ import org.apache.spark.InternalAccumulator import org.apache.spark.InternalAccumulator.{input, shuffleRead} import org.apache.spark.TaskState.TaskState import org.apache.spark.errors.SparkCoreErrors -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKey._ import org.apache.spark.internal.config._ import org.apache.spark.scheduler.SchedulingMode._ import org.apache.spark.util.{AccumulatorV2, Clock, LongAccumulator, SystemClock, Utils} @@ -769,11 +770,12 @@ private[spark] class TaskSetManager( totalResultSize += size calculatedTasks += 1 if (!isShuffleMapTasks && maxResultSize > 0 && totalResultSize > maxResultSize) { - val msg = s"Total size of serialized results of ${calculatedTasks} tasks " + - s"(${Utils.bytesToString(totalResultSize)}) is bigger than ${config.MAX_RESULT_SIZE.key} " + - s"(${Utils.bytesToString(maxResultSize)})" + val msg = log"Total size of serialized results of ${MDC(COUNT, calculatedTasks)} tasks " + + log"(${MDC(SIZE, Utils.bytesToString(totalResultSize))}) is bigger than " + + log"${MDC(CONFIG, config.MAX_RESULT_SIZE.key)} " + + log"(${MDC(MAX_SIZE, Utils.bytesToString(maxResultSize))})" logError(msg) - abort(msg) + abort(msg.message) false } else { true @@ -959,7 +961,8 @@ private[spark] class TaskSetManager( val task = taskName(tid) if (ef.className == classOf[NotSerializableException].getName) { // If the task result wasn't serializable, there's no point in trying to re-execute it. - logError(s"$task had a not serializable result: ${ef.description}; not retrying") + logError(log"${MDC(TASK_NAME, task)} had a not serializable result: " + + log"${MDC(ERROR, ef.description)}; not retrying") emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), reason, null, accumUpdates, metricPeaks) abort(s"$task had a not serializable result: ${ef.description}") @@ -968,8 +971,9 @@ private[spark] class TaskSetManager( if (ef.className == classOf[TaskOutputFileAlreadyExistException].getName) { // If we can not write to output file in the task, there's no point in trying to // re-execute it. - logError("Task %s in stage %s (TID %d) can not write to output file: %s; not retrying" - .format(info.id, taskSet.id, tid, ef.description)) + logError(log"Task ${MDC(TASK_ID, info.id)} in stage ${MDC(STAGE_ID, taskSet.id)} " + + log"(TID ${MDC(TID, tid)}) can not write to output file: " + + log"${MDC(ERROR, ef.description)}; not retrying") emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), reason, null, accumUpdates, metricPeaks) abort("Task %s in stage %s (TID %d) can not write to output file: %s".format( @@ -1033,8 +1037,8 @@ private[spark] class TaskSetManager( info.host, info.executorId, index, failureReason)) numFailures(index) += 1 if (numFailures(index) >= maxTaskFailures) { - logError("Task %d in stage %s failed %d times; aborting job".format( - index, taskSet.id, maxTaskFailures)) + logError(log"Task ${MDC(TASK_ID, index)} in stage ${MDC(STAGE_ID, taskSet.id)} failed " + + log"${MDC(MAX_ATTEMPTS, maxTaskFailures)} times; aborting job") abort("Task %d in stage %s failed %d times, most recent failure: %s\nDriver stacktrace:" .format(index, taskSet.id, maxTaskFailures, failureReason), failureException) return diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index cebe885378e3..06cfb53e2ded 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -32,7 +32,8 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.executor.ExecutorLogUrlHandler -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.ERROR import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Network._ import org.apache.spark.resource.ResourceProfile @@ -240,7 +241,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp listenerBus.post(SparkListenerMiscellaneousProcessAdded(time, processId, info)) case e => - logError(s"Received unexpected message. ${e}") + logError(log"Received unexpected message. ${MDC(ERROR, e)}") } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { @@ -360,7 +361,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case IsExecutorAlive(executorId) => context.reply(isExecutorActive(executorId)) case e => - logError(s"Received unexpected ask ${e}") + logError(log"Received unexpected ask ${MDC(ERROR, e)}") } // Make fake resource offers on all executors diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 2150b996f058..f92756105977 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -27,7 +27,8 @@ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.deploy.{ApplicationDescription, Command} import org.apache.spark.deploy.client.{StandaloneAppClient, StandaloneAppClientListener} import org.apache.spark.executor.ExecutorExitCode -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKey.REASON import org.apache.spark.internal.config.EXECUTOR_REMOVE_DELAY import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} @@ -161,7 +162,7 @@ private[spark] class StandaloneSchedulerBackend( notifyContext() if (!stopping.get) { launcherBackend.setState(SparkAppHandle.State.KILLED) - logError("Application has been killed. Reason: " + reason) + logError(log"Application has been killed. Reason: ${MDC(REASON, reason)}") try { scheduler.error(reason) } finally { diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index ab34bae996cd..b878c88c43b0 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -26,7 +26,8 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.{SecurityManager, SparkConf, SparkEnv, SparkException} import org.apache.spark.errors.SparkCoreErrors -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKey.PATH import org.apache.spark.io.NioBufferedFileInputStream import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.client.StreamCallbackWithID @@ -417,7 +418,8 @@ private[spark] class IndexShuffleBlockResolver( } finally { logDebug(s"Shuffle index for mapId $mapId: ${lengths.mkString("[", ",", "]")}") if (indexTmp.exists() && !indexTmp.delete()) { - logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}") + logError(log"Failed to delete temporary index file at " + + log"${MDC(PATH, indexTmp.getAbsolutePath)}") } checksumTmpOpt.foreach { checksumTmp => if (checksumTmp.exists()) { @@ -430,8 +432,8 @@ private[spark] class IndexShuffleBlockResolver( case e: Exception => // Unlike index deletion, we won't propagate the error for the checksum file since // checksum is only a best-effort. - logError(s"Failed to delete temporary checksum file " + - s"at ${checksumTmp.getAbsolutePath}", e) + logError(log"Failed to delete temporary checksum file " + + log"at ${MDC(PATH, checksumTmp.getAbsolutePath)}", e) } } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 89b3914e94af..e68239f260d9 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -41,8 +41,8 @@ import org.apache.commons.io.IOUtils import org.apache.spark._ import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.executor.DataReadMethod -import org.apache.spark.internal.Logging -import org.apache.spark.internal.config +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKey.BLOCK_ID import org.apache.spark.internal.config.{Network, RDD_CACHE_VISIBILITY_TRACKING_ENABLED, Tests} import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.metrics.source.Source @@ -649,7 +649,7 @@ private[spark] class BlockManager( for ((blockId, info) <- blockInfoManager.entries) { val status = getCurrentBlockStatus(blockId, info) if (info.tellMaster && !tryToReportBlockStatus(blockId, status)) { - logError(s"Failed to report $blockId to master; giving up.") + logError(log"Failed to report ${MDC(BLOCK_ID, blockId)} to master; giving up.") return } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 5dd536eeb304..ac453d0f743c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -31,7 +31,8 @@ import com.google.common.cache.CacheBuilder import org.apache.spark.{MapOutputTrackerMaster, SparkConf, SparkContext, SparkEnv} import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKey.{BLOCK_MANAGER_ID, OLD_BLOCK_MANAGER_ID} import org.apache.spark.internal.config.RDD_CACHE_VISIBILITY_TRACKING_ENABLED import org.apache.spark.network.shuffle.{ExternalBlockStoreClient, RemoteBlockPushResolver} import org.apache.spark.rpc.{IsolatedThreadSafeRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv} @@ -697,8 +698,9 @@ class BlockManagerMasterEndpoint( blockManagerIdByExecutor.get(id.executorId) match { case Some(oldId) => // A block manager of the same executor already exists, so remove it (assumed dead) - logError("Got two different block manager registrations on same executor - " - + s" will replace old one $oldId with new one $id") + logError(log"Got two different block manager registrations on same executor - " + + log" will replace old one ${MDC(OLD_BLOCK_MANAGER_ID, oldId)} " + + log"with new one ${MDC(BLOCK_MANAGER_ID, id)}") removeExecutor(id.executorId) case None => } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala index 2096da2fca02..80e268081fa7 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala @@ -24,7 +24,8 @@ import java.util.zip.Checksum import org.apache.spark.SparkException import org.apache.spark.errors.SparkCoreErrors -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{ERROR, PATH} import org.apache.spark.io.MutableCheckedOutputStream import org.apache.spark.serializer.{SerializationStream, SerializerInstance, SerializerManager} import org.apache.spark.shuffle.ShuffleWriteMetricsReporter @@ -74,8 +75,8 @@ private[spark] class DiskBlockObjectWriter( // get IOException when flushing the buffered data. We should catch and log the exception // to ensure the revertPartialWritesAndClose() function doesn't throw an exception. case e: IOException => - logError("Exception occurred while manually close the output stream to file " - + file + ", " + e.getMessage) + logError(log"Exception occurred while manually close the output stream to file " + + log"${MDC(PATH, file)}, ${MDC(ERROR, e.getMessage)}") } } } @@ -267,8 +268,8 @@ private[spark] class DiskBlockObjectWriter( // don't log the exception stack trace to avoid confusing users. // See: SPARK-28340 case ce: ClosedByInterruptException => - logError("Exception occurred while reverting partial writes to file " - + file + ", " + ce.getMessage) + logError(log"Exception occurred while reverting partial writes to file " + + log"${MDC(PATH, file)}, ${MDC(ERROR, ce.getMessage)}") case e: Exception => logError("Uncaught exception while reverting partial writes to file " + file, e) } finally { diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 4b59c1fdfa24..916cb83d379e 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -36,7 +36,8 @@ import org.roaringbitmap.RoaringBitmap import org.apache.spark.{MapOutputTracker, SparkException, TaskContext} import org.apache.spark.MapOutputTracker.SHUFFLE_PUSH_MAP_ID import org.apache.spark.errors.SparkCoreErrors -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{BLOCK_ID, ERROR, MAX_ATTEMPTS} import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.shuffle._ import org.apache.spark.network.shuffle.checksum.{Cause, ShuffleChecksumHelper} @@ -585,7 +586,8 @@ final class ShuffleBlockFetcherIterator( // don't log the exception stack trace to avoid confusing users. // See: SPARK-28340 case ce: ClosedByInterruptException => - logError("Error occurred while fetching local blocks, " + ce.getMessage) + logError( + log"Error occurred while fetching local blocks, ${MDC(ERROR, ce.getMessage)}") case ex: Exception => logError("Error occurred while fetching local blocks", ex) } results.put(FailureFetchResult(blockId, mapIndex, blockManager.blockManagerId, e)) @@ -874,8 +876,8 @@ final class ShuffleBlockFetcherIterator( assert(buf.isInstanceOf[FileSegmentManagedBuffer]) e match { case ce: ClosedByInterruptException => - logError("Failed to create input stream from local block, " + - ce.getMessage) + lazy val error = MDC(ERROR, ce.getMessage) + logError(log"Failed to create input stream from local block, $error") case e: IOException => logError("Failed to create input stream from local block", e) } @@ -964,9 +966,10 @@ final class ShuffleBlockFetcherIterator( case FailureFetchResult(blockId, mapIndex, address, e) => var errorMsg: String = null if (e.isInstanceOf[OutOfDirectMemoryError]) { - errorMsg = s"Block $blockId fetch failed after $maxAttemptsOnNettyOOM " + - s"retries due to Netty OOM" - logError(errorMsg) + val logMessage = log"Block ${MDC(BLOCK_ID, blockId)} fetch failed after " + + log"${MDC(MAX_ATTEMPTS, maxAttemptsOnNettyOOM)} retries due to Netty OOM" + logError(logMessage) + errorMsg = logMessage.message } throwFetchFailedException(blockId, mapIndex, address, e, Some(errorMsg)) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index e1ced9f8b41d..d7e174f5497c 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -68,7 +68,8 @@ import org.slf4j.Logger import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{COMMAND, COMMAND_OUTPUT, EXIT_CODE, PATH} import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Streaming._ import org.apache.spark.internal.config.Tests.IS_TESTING @@ -808,12 +809,14 @@ private[spark] object Utils chmod700(dir) Some(dir.getAbsolutePath) } else { - logError(s"Failed to create dir in $root. Ignoring this directory.") + logError(log"Failed to create dir in ${MDC(PATH, root)}. Ignoring this directory.") + None } } catch { case e: IOException => - logError(s"Failed to create local root dir in $root. Ignoring this directory.") + logError( + log"Failed to create local root dir in ${MDC(PATH, root)}. Ignoring this directory.") None } } @@ -1216,7 +1219,8 @@ private[spark] object Utils val exitCode = process.waitFor() stdoutThread.join() // Wait for it to finish reading output if (exitCode != 0) { - logError(s"Process $command exited with code $exitCode: $output") + logError(log"Process ${MDC(COMMAND, command)} exited with code " + + log"${MDC(EXIT_CODE, exitCode)}: ${MDC(COMMAND_OUTPUT, output)}") throw new SparkException(s"Process $command exited with code $exitCode") } output.toString diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 6b1aa5cb4436..dfd52a0fdb24 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -56,7 +56,7 @@ import org.apache.spark.deploy.yarn.ResourceRequestHelper._ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.APPLICATION_ID +import org.apache.spark.internal.LogKey.APP_ID import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Python._ import org.apache.spark.launcher.{JavaModuleOptions, LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils} @@ -1199,7 +1199,7 @@ private[spark] class Client( getApplicationReport() } catch { case e: ApplicationNotFoundException => - logError(log"Application ${MDC(APPLICATION_ID, appId)} not found.") + logError(log"Application ${MDC(APP_ID, appId)} not found.") cleanupStagingDir() return YarnAppReport(YarnApplicationState.KILLED, FinalApplicationStatus.KILLED, None) case NonFatal(e) if !e.isInstanceOf[InterruptedIOException] => diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 806a73eda76c..a8e655aed1bf 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -27,7 +27,7 @@ import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnAppReport} import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.{config, Logging, MDC} -import org.apache.spark.internal.LogKey.APPLICATION_STATE +import org.apache.spark.internal.LogKey.APP_STATE import org.apache.spark.launcher.SparkAppHandle import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ @@ -118,7 +118,7 @@ private[spark] class YarnClientSchedulerBackend( val YarnAppReport(_, state, diags) = client.monitorApplication(logApplicationReport = false) logError(log"YARN application has exited unexpectedly with state " + - log"${MDC(APPLICATION_STATE, state)}! Check the YARN application logs for more details.") + log"${MDC(APP_STATE, state)}! Check the YARN application logs for more details.") diags.foreach { err => logError(s"Diagnostics message: $err") }