From 3631a0484619b2549b5bd3817543bda91e2049e9 Mon Sep 17 00:00:00 2001 From: Tuan Pham Date: Sat, 18 May 2024 23:32:34 +1000 Subject: [PATCH 01/12] Migrate to structured logging --- .../org/apache/spark/internal/LogKey.scala | 33 ++++ .../deploy/history/FsHistoryProvider.scala | 69 ++++--- .../deploy/rest/RestSubmissionClient.scala | 27 ++- .../apache/spark/deploy/worker/Worker.scala | 82 ++++---- .../CoarseGrainedExecutorBackend.scala | 18 +- .../org/apache/spark/executor/Executor.scala | 62 +++--- .../executor/ExecutorLogUrlHandler.scala | 9 +- .../io/HadoopMapRedCommitProtocol.scala | 6 +- .../spark/internal/io/SparkHadoopWriter.scala | 7 +- .../main/scala/org/apache/spark/rdd/RDD.scala | 2 +- .../ResourceDiscoveryScriptPlugin.scala | 6 +- .../spark/resource/ResourceProfile.scala | 8 +- .../resource/ResourceProfileManager.scala | 5 +- .../apache/spark/scheduler/DAGScheduler.scala | 180 ++++++++++-------- .../collection/ExternalAppendOnlyMap.scala | 5 +- 15 files changed, 311 insertions(+), 208 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index e03987933306..5e492a2decb8 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -31,10 +31,13 @@ trait LogKey { */ object LogKeys { case object ACCUMULATOR_ID extends LogKey + case object ACL_ENABLED extends LogKey case object ACTUAL_BROADCAST_OUTPUT_STATUS_SIZE extends LogKey case object ACTUAL_NUM_FILES extends LogKey case object ACTUAL_PARTITION_COLUMN extends LogKey case object ADDED_JARS extends LogKey + case object ADMIN_ACLS extends LogKey + case object ADMIN_ACL_GROUPS extends LogKey case object AGGREGATE_FUNCTIONS extends LogKey case object ALPHA extends LogKey case object ANALYSIS_ERROR extends LogKey @@ -45,8 +48,10 @@ object LogKeys { case object APP_ID extends LogKey case object APP_NAME extends LogKey case object APP_STATE extends LogKey + case object ARCHIVE_NAME extends LogKey case object ARGS extends LogKey case object AUTH_ENABLED extends LogKey + case object ATTRIBUTE_MAP extends LogKey case object BACKUP_FILE extends LogKey case object BARRIER_EPOCH extends LogKey case object BARRIER_ID extends LogKey @@ -73,6 +78,7 @@ object LogKeys { case object CACHE_UNTIL_LAST_PRODUCED_SIZE extends LogKey case object CALL_SITE_LONG_FORM extends LogKey case object CALL_SITE_SHORT_FORM extends LogKey + case object CANCEL_FUTURE_JOBS extends LogKey case object CATALOG_NAME extends LogKey case object CATEGORICAL_FEATURES extends LogKey case object CHECKPOINT_FILE extends LogKey @@ -262,10 +268,12 @@ object LogKeys { case object HIVE_OPERATION_STATE extends LogKey case object HIVE_OPERATION_TYPE extends LogKey case object HOST extends LogKey + case object HOST_NAME extends LogKey case object HOST_NAMES extends LogKey case object HOST_PORT extends LogKey case object HOST_PORT2 extends LogKey case object HUGE_METHOD_LIMIT extends LogKey + case object HYBRID_STORE_DISK_BACKEND extends LogKey case object IDENTIFIER extends LogKey case object INCOMPATIBLE_TYPES extends LogKey case object INDEX extends LogKey @@ -276,6 +284,7 @@ object LogKeys { case object INIT extends LogKey case object INITIAL_CAPACITY extends LogKey case object INITIAL_HEARTBEAT_INTERVAL extends LogKey + case object INITIAL_REGISTRATION_RETRIES extends LogKey case object INIT_MODE extends LogKey case object INPUT extends LogKey case object INTERVAL extends LogKey @@ -288,6 +297,7 @@ object LogKeys { case object JAVA_VERSION extends LogKey case object JAVA_VM_NAME extends LogKey case object JOB_ID extends LogKey + case object JOB_TAG extends LogKey case object JOIN_CONDITION extends LogKey case object JOIN_CONDITION_SUB_EXPR extends LogKey case object JOIN_TYPE extends LogKey @@ -340,6 +350,7 @@ object LogKeys { case object MAX_METHOD_CODE_SIZE extends LogKey case object MAX_NUM_BINS extends LogKey case object MAX_NUM_CHUNKS extends LogKey + case object MAX_NUM_FILES extends LogKey case object MAX_NUM_PARTITIONS extends LogKey case object MAX_NUM_POSSIBLE_BINS extends LogKey case object MAX_NUM_ROWS_IN_MEMORY_BUFFER extends LogKey @@ -427,6 +438,7 @@ object LogKeys { case object NUM_LOADED_ENTRIES extends LogKey case object NUM_LOCAL_DIRS extends LogKey case object NUM_LOCAL_FREQUENT_PATTERN extends LogKey + case object NUM_MERGERS extends LogKey case object NUM_MERGER_LOCATIONS extends LogKey case object NUM_META_FILES extends LogKey case object NUM_NODES extends LogKey @@ -441,6 +453,7 @@ object LogKeys { case object NUM_POINT extends LogKey case object NUM_PREFIXES extends LogKey case object NUM_PRUNED extends LogKey + case object NUM_RELEASED_LOCKS extends LogKey case object NUM_REMOVED_WORKERS extends LogKey case object NUM_REPLICAS extends LogKey case object NUM_REQUESTS extends LogKey @@ -486,6 +499,7 @@ object LogKeys { case object PARTITIONER extends LogKey case object PARTITIONS_SIZE extends LogKey case object PARTITION_ID extends LogKey + case object PARTITION_IDS extends LogKey case object PARTITION_SPECIFICATION extends LogKey case object PARTITION_SPECS extends LogKey case object PATH extends LogKey @@ -535,6 +549,7 @@ object LogKeys { case object QUERY_PLAN_LENGTH_MAX extends LogKey case object QUERY_RUN_ID extends LogKey case object RANGE extends LogKey + case object RDD_CREATION_SITE extends LogKey case object RDD_CHECKPOINT_DIR extends LogKey case object RDD_DEBUG_STRING extends LogKey case object RDD_DESCRIPTION extends LogKey @@ -550,10 +565,13 @@ object LogKeys { case object RECOVERY_STATE extends LogKey case object REDACTED_STATEMENT extends LogKey case object REDUCE_ID extends LogKey + case object REGEX extends LogKey + case object REGISTER_MERGE_RESULT extends LogKey case object REGISTERED_EXECUTOR_FILE extends LogKey case object RELATION_NAME extends LogKey case object RELATION_OUTPUT extends LogKey case object RELATIVE_TOLERANCE extends LogKey + case object RELEASED_LOCKS extends LogKey case object REMAINING_PARTITIONS extends LogKey case object REMOTE_ADDRESS extends LogKey case object REMOVE_FROM_MASTER extends LogKey @@ -565,10 +583,12 @@ object LogKeys { case object RESOURCE_PROFILE_ID extends LogKey case object RESOURCE_PROFILE_IDS extends LogKey case object RESOURCE_PROFILE_TO_TOTAL_EXECS extends LogKey + case object RESPONSE extends LogKey case object RESPONSE_BODY_SIZE extends LogKey case object RESULT extends LogKey case object RESULT_SIZE_BYTES extends LogKey case object RESULT_SIZE_BYTES_MAX extends LogKey + case object REST_PROTOCOL_RESPONSE_MESSAGE_TYPE extends LogKey case object RETRY_COUNT extends LogKey case object RETRY_INTERVAL extends LogKey case object RETRY_WAIT_TIME extends LogKey @@ -630,8 +650,16 @@ object LogKeys { case object SPILL_TIMES extends LogKey case object SQL_TEXT extends LogKey case object SRC_PATH extends LogKey + case object STAGE extends LogKey + case object STAGES extends LogKey + case object STAGE2 extends LogKey case object STAGE_ATTEMPT extends LogKey + case object STAGE_ATTEMPT_ID extends LogKey + case object STAGE_ATTEMPT_NUMBER extends LogKey case object STAGE_ID extends LogKey + case object STAGE_NAME extends LogKey + case object STAGE_NAME2 extends LogKey + case object STAGE_PARENTS extends LogKey case object START_INDEX extends LogKey case object STATEMENT_ID extends LogKey case object STATE_STORE_ID extends LogKey @@ -666,10 +694,12 @@ object LogKeys { case object TARGET_NUM_EXECUTOR extends LogKey case object TARGET_NUM_EXECUTOR_DELTA extends LogKey case object TARGET_PATH extends LogKey + case object TASK extends LogKey case object TASK_ATTEMPT_ID extends LogKey case object TASK_ID extends LogKey case object TASK_NAME extends LogKey case object TASK_REQUIREMENTS extends LogKey + case object TASK_RESOURCES extends LogKey case object TASK_SET_NAME extends LogKey case object TASK_STATE extends LogKey case object TEMP_FILE extends LogKey @@ -702,6 +732,7 @@ object LogKeys { case object TOTAL extends LogKey case object TOTAL_EFFECTIVE_TIME extends LogKey case object TOTAL_RECORDS_READ extends LogKey + case object TOTAL_REGISTRATION_RETRIES extends LogKey case object TOTAL_TIME extends LogKey case object TOTAL_TIME_READ extends LogKey case object TO_TIME extends LogKey @@ -723,6 +754,7 @@ object LogKeys { case object URI extends LogKey case object URIS extends LogKey case object URL extends LogKey + case object URLS extends LogKey case object URL2 extends LogKey case object USER_ID extends LogKey case object USER_NAME extends LogKey @@ -739,6 +771,7 @@ object LogKeys { case object WEIGHTED_NUM extends LogKey case object WORKER extends LogKey case object WORKER_HOST extends LogKey + case object WORKER_MEMORY extends LogKey case object WORKER_ID extends LogKey case object WORKER_PORT extends LogKey case object WORKER_URL extends LogKey diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 53b239ddfd79..010aef1b00c7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -44,6 +44,7 @@ import org.apache.spark.internal.config._ import org.apache.spark.internal.config.History._ import org.apache.spark.internal.config.Status._ import org.apache.spark.internal.config.Tests.IS_TESTING +import org.apache.spark.internal.config.UI import org.apache.spark.internal.config.UI._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.ReplayListenerBus._ @@ -107,9 +108,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val historyUiAclsEnable = conf.get(History.HISTORY_SERVER_UI_ACLS_ENABLE) private val historyUiAdminAcls = conf.get(History.HISTORY_SERVER_UI_ADMIN_ACLS) private val historyUiAdminAclsGroups = conf.get(History.HISTORY_SERVER_UI_ADMIN_ACLS_GROUPS) - logInfo(s"History server ui acls " + (if (historyUiAclsEnable) "enabled" else "disabled") + - "; users with admin permissions: " + historyUiAdminAcls.mkString(",") + - "; groups with admin permissions: " + historyUiAdminAclsGroups.mkString(",")) + logInfo(log"History server ui acls" + + log" ${MDC(ACL_ENABLED, if (historyUiAclsEnable) "enabled" else "disabled")}" + + log"; users with admin permissions:" + + log" ${MDC(LogKeys.ADMIN_ACLS, historyUiAdminAcls.mkString(","))}" + + log"; groups with admin permissions:" + + log" ${MDC(ADMIN_ACL_GROUPS, historyUiAdminAclsGroups.mkString(","))}") private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) // Visible for testing @@ -482,8 +486,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) true } catch { case e: IllegalArgumentException => - logInfo("Exception in getting modificationTime of " - + reader.rootPath.getName + ". " + e.toString) + logInfo(log"Exception in getting modificationTime of" + + log" ${MDC(PATH, reader.rootPath.getName)}. ${MDC(EXCEPTION, e.toString)}") false } } @@ -550,7 +554,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) try { if (conf.get(CLEANER_ENABLED) && reader.modificationTime < clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000) { - logInfo(s"Deleting expired event log ${reader.rootPath.toString}") + logInfo(log"Deleting expired event log ${MDC(PATH, reader.rootPath.toString)}") deleteLog(fs, reader.rootPath) // If the LogInfo read had succeeded, but the ApplicationInafoWrapper // read failure and throw the exception, we should also cleanup the log @@ -801,7 +805,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val listener = new AppListingListener(reader, clock, shouldHalt) bus.addListener(listener) - logInfo(s"Parsing $logPath for listing data...") + logInfo(log"Parsing ${MDC(PATH, logPath)} for listing data...") val logFiles = reader.listEventLogFiles parseAppEventLogs(logFiles, bus, !appCompleted, eventsFilter) @@ -829,7 +833,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) Utils.tryWithResource(EventLogFileReader.openEventLog(lastFile.getPath, fs)) { in => val target = lastFile.getLen - reparseChunkSize if (target > 0) { - logInfo(s"Looking for end event; skipping $target bytes from $logPath...") + logInfo(log"Looking for end event; skipping ${MDC(NUM_BYTES, target)} bytes" + + log" from ${MDC(PATH, logPath)}...") var skipped = 0L while (skipped < target) { skipped += in.skip(target - skipped) @@ -848,7 +853,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - logInfo(s"Finished parsing $logPath") + logInfo(log"Finished parsing ${MDC(PATH, logPath)}") listener.applicationInfo match { case Some(app) if !lookForEndEvent || app.attempts.head.info.completed => @@ -883,7 +888,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // In this case, the attempt is still not marked as finished but was expected to. This can // mean the end event is before the configured threshold, so call the method again to // re-parse the whole log. - logInfo(s"Reparsing $logPath since end event was not found.") + logInfo(log"Reparsing ${MDC(PATH, logPath)} since end event was not found.") doMergeApplicationListingInternal(reader, scanTime, enableOptimizations = false, lastEvaluatedForCompaction) @@ -952,7 +957,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val log = listing.read(classOf[LogInfo], logPath) if (log.lastProcessed <= maxTime && log.appId.isEmpty) { - logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") + logInfo(log"Deleting invalid / corrupt event log ${MDC(PATH, log.logPath)}") deleteLog(fs, new Path(log.logPath)) listing.delete(classOf[LogInfo], log.logPath) } @@ -994,7 +999,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) .first(maxTime), Int.MaxValue) { l => l.logType == null || l.logType == LogType.EventLogs } stale.filterNot(isProcessing).foreach { log => if (log.appId.isEmpty) { - logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") + logInfo(log"Deleting invalid / corrupt event log ${MDC(PATH, log.logPath)}") deleteLog(fs, new Path(log.logPath)) listing.delete(classOf[LogInfo], log.logPath) } @@ -1005,7 +1010,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val num = KVUtils.size(listing.view(classOf[LogInfo]).index("lastProcessed")) var count = num - maxNum if (count > 0) { - logInfo(s"Try to delete $count old event logs to keep $maxNum logs in total.") + logInfo(log"Try to delete ${MDC(NUM_FILES, count)} old event logs" + + log" to keep ${MDC(MAX_NUM_FILES, maxNum)} logs in total.") KVUtils.foreach(listing.view(classOf[ApplicationInfoWrapper]).index("oldestAttempt")) { app => if (count > 0) { // Applications may have multiple attempts, some of which may not be completed yet. @@ -1034,7 +1040,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) var countDeleted = 0 toDelete.foreach { attempt => - logInfo(s"Deleting expired event log for ${attempt.logPath}") + logInfo(log"Deleting expired event log for ${MDC(PATH, attempt.logPath)}") val logPath = new Path(logDir, attempt.logPath) listing.delete(classOf[LogInfo], logPath.toString()) cleanAppData(app.id, attempt.info.attemptId, logPath.toString()) @@ -1082,7 +1088,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) false } if (deleteFile) { - logInfo(s"Deleting expired driver log for: $logFileStr") + logInfo(log"Deleting expired driver log for: ${MDC(PATH, logFileStr)}") listing.delete(classOf[LogInfo], logFileStr) deleteLog(driverLogFs, f.getPath()) } @@ -1095,7 +1101,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) .reverse() .first(maxTime), Int.MaxValue) { l => l.logType != null && l.logType == LogType.DriverLogs } stale.filterNot(isProcessing).foreach { log => - logInfo(s"Deleting invalid driver log ${log.logPath}") + logInfo(log"Deleting invalid driver log ${MDC(PATH, log.logPath)}") listing.delete(classOf[LogInfo], log.logPath) deleteLog(driverLogFs, new Path(log.logPath)) } @@ -1124,10 +1130,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) try { val eventLogFiles = reader.listEventLogFiles - logInfo(s"Parsing ${reader.rootPath} to re-build UI...") + logInfo(log"Parsing ${MDC(PATH, reader.rootPath)} to re-build UI...") parseAppEventLogs(eventLogFiles, replayBus, !reader.completed) trackingStore.close(false) - logInfo(s"Finished parsing ${reader.rootPath}") + logInfo(log"Finished parsing ${MDC(PATH, reader.rootPath)}") } catch { case e: Exception => Utils.tryLogNonFatalError { @@ -1228,7 +1234,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) return KVUtils.open(path, metadata, conf, live = false) } catch { case e: Exception => - logInfo(s"Failed to open existing store for $appId/${attempt.info.attemptId}.", e) + logInfo(log"Failed to open existing store for" + + log" ${MDC(APP_ID, appId)}/${MDC(LogKeys.APP_ATTEMPT_ID, attempt.info.attemptId)}.", e) dm.release(appId, attempt.info.attemptId, delete = true) } } @@ -1244,11 +1251,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) case e: RuntimeException if e.getMessage != null && e.getMessage.contains("Not enough memory to create hybrid") => // Handle exception from `HistoryServerMemoryManager.lease`. - logInfo(s"Failed to create HybridStore for $appId/${attempt.info.attemptId}." + - s" Using $hybridStoreDiskBackend. " + e.getMessage) + logInfo(log"Failed to create HybridStore for" + + log" ${MDC(APP_ID, appId)}/${MDC(LogKeys.APP_ATTEMPT_ID, attempt.info.attemptId)}." + + log" Using ${MDC(LogKeys.HYBRID_STORE_DISK_BACKEND, hybridStoreDiskBackend)}." + + log" ${MDC(EXCEPTION, e.getMessage)}") case e: Exception => - logInfo(s"Failed to create HybridStore for $appId/${attempt.info.attemptId}." + - s" Using $hybridStoreDiskBackend.", e) + logInfo(log"Failed to create HybridStore for" + + log" ${MDC(APP_ID, appId)}/${MDC(LogKeys.APP_ATTEMPT_ID, attempt.info.attemptId)}." + + log" Using ${MDC(LogKeys.HYBRID_STORE_DISK_BACKEND, hybridStoreDiskBackend)}.", e) } } @@ -1295,13 +1305,15 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // Create a disk-base KVStore and start a background thread to dump data to it var lease: dm.Lease = null try { - logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...") + logInfo(log"Leasing disk manager space for app" + + log" ${MDC(APP_ID, appId)} / ${MDC(LogKeys.APP_ATTEMPT_ID, attempt.info.attemptId)}...") lease = dm.lease(reader.totalSize, reader.compressionCodec.isDefined) val diskStore = KVUtils.open(lease.tmpPath, metadata, conf, live = false) hybridStore.setDiskStore(diskStore) hybridStore.switchToDiskStore(new HybridStore.SwitchToDiskStoreListener { override def onSwitchToDiskStoreSuccess(): Unit = { - logInfo(s"Completely switched to diskStore for app $appId / ${attempt.info.attemptId}.") + logInfo(log"Completely switched to diskStore for app" + + log" ${MDC(APP_ID, appId)} / ${MDC(LogKeys.APP_ATTEMPT_ID, attempt.info.attemptId)}.") diskStore.close() val newStorePath = lease.commit(appId, attempt.info.attemptId) hybridStore.setDiskStore(KVUtils.open(newStorePath, metadata, conf, live = false)) @@ -1338,7 +1350,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath), attempt.lastIndex) val isCompressed = reader.compressionCodec.isDefined - logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...") + logInfo(log"Leasing disk manager space for app" + + log" ${MDC(APP_ID, appId)} / ${MDC(LogKeys.APP_ATTEMPT_ID, attempt.info.attemptId)}...") val lease = dm.lease(reader.totalSize, isCompressed) try { Utils.tryWithResource(KVUtils.open(lease.tmpPath, metadata, conf, live = false)) { store => @@ -1408,7 +1421,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) deleted = fs.delete(log, true) } catch { case _: AccessControlException => - logInfo(s"No permission to delete $log, ignoring.") + logInfo(log"No permission to delete ${MDC(PATH, log)}, ignoring.") case ioe: IOException => logError(log"IOException in cleaning ${MDC(PATH, log)}", ioe) } @@ -1560,7 +1573,7 @@ private[history] class AppListingListener( val allProperties = event.environmentDetails("Spark Properties").toMap attempt.viewAcls = emptyStringToNone(allProperties.get(UI_VIEW_ACLS.key)) - attempt.adminAcls = emptyStringToNone(allProperties.get(ADMIN_ACLS.key)) + attempt.adminAcls = emptyStringToNone(allProperties.get(UI.ADMIN_ACLS.key)) attempt.viewAclsGroups = emptyStringToNone(allProperties.get(UI_VIEW_ACLS_GROUPS.key)) attempt.adminAclsGroups = emptyStringToNone(allProperties.get(ADMIN_ACLS_GROUPS.key)) diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala index 247504f5ebbb..db0618d2978f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala @@ -79,7 +79,7 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { * it to the user. Otherwise, report the error message provided by the server. */ def createSubmission(request: CreateSubmissionRequest): SubmitRestProtocolResponse = { - logInfo(s"Submitting a request to launch an application in $master.") + logInfo(log"Submitting a request to launch an application in ${MDC(MASTER_URL, master)}.") var handled: Boolean = false var response: SubmitRestProtocolResponse = null for (m <- masters if !handled) { @@ -109,7 +109,8 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { /** Request that the server kill the specified submission. */ def killSubmission(submissionId: String): SubmitRestProtocolResponse = { - logInfo(s"Submitting a request to kill submission $submissionId in $master.") + logInfo(log"Submitting a request to kill submission" + + log" ${MDC(SUBMISSION_ID, submissionId)} in ${MDC(MASTER_URL, master)}}.") var handled: Boolean = false var response: SubmitRestProtocolResponse = null for (m <- masters if !handled) { @@ -138,7 +139,7 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { /** Request that the server kill all submissions. */ def killAllSubmissions(): SubmitRestProtocolResponse = { - logInfo(s"Submitting a request to kill all submissions in $master.") + logInfo(log"Submitting a request to kill all submissions in ${MDC(MASTER_URL, master)}.") var handled: Boolean = false var response: SubmitRestProtocolResponse = null for (m <- masters if !handled) { @@ -167,7 +168,7 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { /** Request that the server clears all submissions and applications. */ def clear(): SubmitRestProtocolResponse = { - logInfo(s"Submitting a request to clear $master.") + logInfo(log"Submitting a request to clear ${MDC(MASTER_URL, master)}.") var handled: Boolean = false var response: SubmitRestProtocolResponse = null for (m <- masters if !handled) { @@ -196,7 +197,7 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { /** Check the readiness of Master. */ def readyz(): SubmitRestProtocolResponse = { - logInfo(s"Submitting a request to check the status of $master.") + logInfo(log"Submitting a request to check the status of ${MDC(MASTER_URL, master)}.") var handled: Boolean = false var response: SubmitRestProtocolResponse = new ErrorResponse for (m <- masters if !handled) { @@ -227,7 +228,8 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { def requestSubmissionStatus( submissionId: String, quiet: Boolean = false): SubmitRestProtocolResponse = { - logInfo(s"Submitting a request for the status of submission $submissionId in $master.") + logInfo(log"Submitting a request for the status of submission" + + log" ${MDC(SUBMISSION_ID, submissionId)} in ${MDC(MASTER_URL, master)}.") var handled: Boolean = false var response: SubmitRestProtocolResponse = null @@ -440,7 +442,8 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { if (submitResponse.success) { val submissionId = submitResponse.submissionId if (submissionId != null) { - logInfo(s"Submission successfully created as $submissionId. Polling submission state...") + logInfo(log"Submission successfully created as" + + log" ${MDC(SUBMISSION_ID, submissionId)}. Polling submission state...") pollSubmissionStatus(submissionId) } else { // should never happen @@ -470,13 +473,15 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { val exception = Option(statusResponse.message) // Log driver state, if present driverState match { - case Some(state) => logInfo(s"State of driver $submissionId is now $state.") + case Some(state) => logInfo(log"State of driver ${MDC(SUBMISSION_ID, submissionId)}" + + log" is now ${MDC(DRIVER_STATE, state)}.") case _ => logError(log"State of driver ${MDC(SUBMISSION_ID, submissionId)} was not found!") } // Log worker node, if present (workerId, workerHostPort) match { - case (Some(id), Some(hp)) => logInfo(s"Driver is running on worker $id at $hp.") + case (Some(id), Some(hp)) => logInfo(log"Driver is running on worker" + + log" ${MDC(WORKER_ID, id)} at ${MDC(HOST_PORT, hp)}.") case _ => } // Log exception stack trace, if present @@ -490,7 +495,9 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { /** Log the response sent by the server in the REST application submission protocol. */ private def handleRestResponse(response: SubmitRestProtocolResponse): Unit = { - logInfo(s"Server responded with ${response.messageType}:\n${response.toJson}") + logInfo(log"Server responded with" + + log" ${MDC(REST_PROTOCOL_RESPONSE_MESSAGE_TYPE, response.messageType)}:\n" + + log"${MDC(RESPONSE, response.toJson)}") } /** Log an appropriate error if the response sent by the server is not of the expected type. */ diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index f030475131d2..49697641b30f 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -74,7 +74,7 @@ private[deploy] class Worker( // If worker decommissioning is enabled register a handler on the configured signal to shutdown. if (conf.get(config.DECOMMISSION_ENABLED)) { val signal = conf.get(config.Worker.WORKER_DECOMMISSION_SIGNAL) - logInfo(s"Registering SIG$signal handler to trigger decommissioning.") + logInfo(log"Registering SIG${MDC(SIGNAL, signal)} handler to trigger decommissioning.") SignalUtils.register(signal, log"Failed to register SIG${MDC(LogKeys.SIGNAL, signal)} " + log"handler - disabling worker decommission feature.") { self.send(WorkerDecommissionSigReceived) @@ -106,8 +106,10 @@ private[deploy] class Worker( private val INITIAL_REGISTRATION_RETRIES = conf.get(WORKER_INITIAL_REGISTRATION_RETRIES) private val TOTAL_REGISTRATION_RETRIES = conf.get(WORKER_MAX_REGISTRATION_RETRIES) if (INITIAL_REGISTRATION_RETRIES > TOTAL_REGISTRATION_RETRIES) { - logInfo(s"${WORKER_INITIAL_REGISTRATION_RETRIES.key} ($INITIAL_REGISTRATION_RETRIES) is " + - s"capped by ${WORKER_MAX_REGISTRATION_RETRIES.key} ($TOTAL_REGISTRATION_RETRIES)") + logInfo(log"${MDC(KEY, WORKER_INITIAL_REGISTRATION_RETRIES.key)}" + + log" (${MDC(LogKeys.INITIAL_REGISTRATION_RETRIES, INITIAL_REGISTRATION_RETRIES)} is" + + log" capped by ${MDC(KEY2, WORKER_MAX_REGISTRATION_RETRIES.key)}" + + log" (${MDC(LogKeys.TOTAL_REGISTRATION_RETRIES, TOTAL_REGISTRATION_RETRIES)})") } private val FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND = 0.500 private val REGISTRATION_RETRY_FUZZ_MULTIPLIER = { @@ -236,10 +238,11 @@ private[deploy] class Worker( override def onStart(): Unit = { assert(!registered) - logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format( - host, port, cores, Utils.megabytesToString(memory))) - logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}") - logInfo("Spark home: " + sparkHome) + logInfo(log"Starting Spark worker ${MDC(HOST, host)}:${MDC(PORT, port)} with" + + log" ${MDC(NUM_CORES, cores)} cores," + + log" ${MDC(WORKER_MEMORY, Utils.megabytesToString(memory))} RAM") + logInfo(log"Running Spark version ${MDC(SPARK_VERSION, org.apache.spark.SPARK_VERSION)}") + logInfo(log"Spark home: ${MDC(PATH, sparkHome)}") createWorkDir() startExternalShuffleService() setupWorkerResources() @@ -300,8 +303,9 @@ private[deploy] class Worker( master = Some(masterRef) connected = true if (reverseProxy) { - logInfo("WorkerWebUI is available at %s/proxy/%s".format( - activeMasterWebUiUrl.stripSuffix("/"), workerId)) + logInfo(log"WorkerWebUI is available at" + + log" ${MDC(WEB_URL, activeMasterWebUiUrl.stripSuffix("/"))}" + + log"/proxy/${MDC(WORKER_ID, workerId)}") // if reverseProxyUrl is not set, then we continue to generate relative URLs // starting with "/" throughout the UI and do not use activeMasterWebUiUrl val proxyUrl = conf.get(UI_REVERSE_PROXY_URL.key, "").stripSuffix("/") @@ -318,7 +322,7 @@ private[deploy] class Worker( registerMasterThreadPool.submit(new Runnable { override def run(): Unit = { try { - logInfo("Connecting to master " + masterAddress + "...") + logInfo(log"Connecting to master ${MDC(MASTER_URL, masterAddress)}...") val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME) sendRegisterMessageToMaster(masterEndpoint) } catch { @@ -342,7 +346,8 @@ private[deploy] class Worker( if (registered) { cancelLastRegistrationRetry() } else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) { - logInfo(s"Retrying connection to master (attempt # $connectionAttemptCount)") + logInfo(log"Retrying connection to master" + + log" (attempt # ${MDC(COUNT, connectionAttemptCount)})") /** * Re-register with the active master this worker has been communicating with. If there * is none, then it means this worker is still bootstrapping and hasn't established a @@ -376,7 +381,7 @@ private[deploy] class Worker( registerMasterFutures = Array(registerMasterThreadPool.submit(new Runnable { override def run(): Unit = { try { - logInfo("Connecting to master " + masterAddress + "...") + logInfo(log"Connecting to master ${MDC(MASTER_URL, masterAddress)}...") val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME) sendRegisterMessageToMaster(masterEndpoint) } catch { @@ -483,15 +488,15 @@ private[deploy] class Worker( log"${MDC(MASTER_URL, preferredMasterAddress)}") } - logInfo(s"Successfully registered with master $preferredMasterAddress") + logInfo(log"Successfully registered with master ${MDC(MASTER_URL, preferredMasterAddress)}") registered = true changeMaster(masterRef, masterWebUiUrl, masterAddress) forwardMessageScheduler.scheduleAtFixedRate( () => Utils.tryLogNonFatalError { self.send(SendHeartbeat) }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS) if (CLEANUP_ENABLED) { - logInfo( - s"Worker cleanup enabled; old application directories will be deleted in: $workDir") + logInfo(log"Worker cleanup enabled; old application directories will be deleted in:" + + log" ${MDC(PATH, workDir)}") forwardMessageScheduler.scheduleAtFixedRate( () => Utils.tryLogNonFatalError { self.send(WorkDirCleanup) }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS) @@ -539,7 +544,7 @@ private[deploy] class Worker( dir.isDirectory && !isAppStillRunning && !Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECONDS) }.foreach { dir => - logInfo(s"Removing directory: ${dir.getPath}") + logInfo(log"Removing directory: ${MDC(PATH, dir.getPath)}") Utils.deleteRecursively(dir) // Remove some registeredExecutors information of DB in external shuffle service when @@ -562,7 +567,7 @@ private[deploy] class Worker( } case MasterChanged(masterRef, masterWebUiUrl) => - logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL) + logInfo(log"Master has changed, new master is at ${MDC(URL, masterRef.address.toSparkURL)}") changeMaster(masterRef, masterWebUiUrl, masterRef.address) val executorResponses = executors.values.map { e => @@ -575,7 +580,8 @@ private[deploy] class Worker( workerId, executorResponses.toList, driverResponses.toSeq)) case ReconnectWorker(masterUrl) => - logInfo(s"Master with url $masterUrl requested this worker to reconnect.") + logInfo(log"Master with url ${MDC(MASTER_URL, masterUrl)}" + + log" requested this worker to reconnect.") registerWithMaster() case LaunchExecutor(masterUrl, appId, execId, rpId, appDesc, cores_, memory_, resources_) => @@ -586,7 +592,8 @@ private[deploy] class Worker( logWarning("Asked to launch an executor while decommissioned. Not launching executor.") } else { try { - logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) + logInfo(log"Asked to launch executor ${MDC(APP_ID, appId)}/${MDC(EXECUTOR_ID, execId)}" + + log" for ${MDC(APP_NAME, appDesc.name)}") // Create the executor's working directory val executorDir = new File(workDir, appId + "/" + execId) @@ -667,15 +674,15 @@ private[deploy] class Worker( val fullId = appId + "/" + execId executors.get(fullId) match { case Some(executor) => - logInfo("Asked to kill executor " + fullId) + logInfo(log"Asked to kill executor ${MDC(EXECUTOR_ID, fullId)}") executor.kill() case None => - logInfo("Asked to kill unknown executor " + fullId) + logInfo(log"Asked to kill unknown executor ${MDC(EXECUTOR_ID, fullId)}") } } case LaunchDriver(driverId, driverDesc, resources_) => - logInfo(s"Asked to launch driver $driverId") + logInfo(log"Asked to launch driver ${MDC(DRIVER_ID, driverId)}") val driver = new DriverRunner( conf, driverId, @@ -695,7 +702,7 @@ private[deploy] class Worker( addResourcesUsed(resources_) case KillDriver(driverId) => - logInfo(s"Asked to kill driver $driverId") + logInfo(log"Asked to kill driver ${MDC(DRIVER_ID, driverId)}") drivers.get(driverId) match { case Some(runner) => runner.kill() @@ -735,7 +742,7 @@ private[deploy] class Worker( override def onDisconnected(remoteAddress: RpcAddress): Unit = { if (master.exists(_.address == remoteAddress) || masterAddressToConnect.contains(remoteAddress)) { - logInfo(s"$remoteAddress Disassociated !") + logInfo(log"${MDC(REMOTE_ADDRESS, remoteAddress)} Disassociated !") masterDisconnected() } } @@ -753,7 +760,7 @@ private[deploy] class Worker( try { appDirectories.remove(id).foreach { dirList => concurrent.Future { - logInfo(s"Cleaning up local directories for application $id") + logInfo(log"Cleaning up local directories for application ${MDC(APP_ID, id)}") dirList.foreach { dir => Utils.deleteRecursively(new File(dir)) } @@ -874,7 +881,7 @@ private[deploy] class Worker( private[deploy] def decommissionSelf(): Unit = { if (conf.get(config.DECOMMISSION_ENABLED) && !decommissioned) { decommissioned = true - logInfo(s"Decommission worker $workerId.") + logInfo(log"Decommission worker ${MDC(WORKER_ID, workerId)}.") } else if (decommissioned) { logWarning(log"Worker ${MDC(WORKER_ID, workerId)} already started decommissioning.") } else { @@ -898,12 +905,13 @@ private[deploy] class Worker( logWarning(log"Driver ${MDC(DRIVER_ID, driverId)} " + log"exited successfully while master is disconnected.") case _ => - logInfo(s"Driver $driverId exited successfully") + logInfo(log"Driver ${MDC(DRIVER_ID, driverId)} exited successfully") } case DriverState.KILLED => - logInfo(s"Driver $driverId was killed by user") + logInfo(log"Driver ${MDC(DRIVER_ID, driverId)} was killed by user") case _ => - logDebug(s"Driver $driverId changed state to $state") + logDebug(log"Driver ${MDC(DRIVER_ID, driverId)} changed state" + + log" to ${MDC(DRIVER_STATE, state)}") } sendToMaster(driverStateChanged) val driver = drivers.remove(driverId).get @@ -921,13 +929,13 @@ private[deploy] class Worker( if (ExecutorState.isFinished(state)) { val appId = executorStateChanged.appId val fullId = appId + "/" + executorStateChanged.execId - val message = executorStateChanged.message - val exitStatus = executorStateChanged.exitStatus + val message = executorStateChanged.message.map("message " + _).getOrElse("") + val exitStatus = executorStateChanged.exitStatus.map("exitStatus " + _).getOrElse("") executors.get(fullId) match { case Some(executor) => - logInfo("Executor " + fullId + " finished with state " + state + - message.map(" message " + _).getOrElse("") + - exitStatus.map(" exitStatus " + _).getOrElse("")) + logInfo(log"Executor ${MDC(EXECUTOR_ID, fullId)}" + + log" finished with state ${MDC(EXECUTOR_STATE, state)}" + + log" ${MDC(MESSAGE, message)} ${MDC(STATUS, exitStatus)}") executors -= fullId finishedExecutors(fullId) = executor trimFinishedExecutorsIfNecessary() @@ -939,9 +947,9 @@ private[deploy] class Worker( shuffleService.executorRemoved(executorStateChanged.execId.toString, appId) } case None => - logInfo("Unknown Executor " + fullId + " finished with state " + state + - message.map(" message " + _).getOrElse("") + - exitStatus.map(" exitStatus " + _).getOrElse("")) + logInfo(log"Unknown Executor ${MDC(EXECUTOR_ID, fullId)}" + + log" finished with state ${MDC(EXECUTOR_STATE, state)}" + + log" ${MDC(MESSAGE, message)} ${MDC(STATUS, exitStatus)}") } maybeCleanupApplication(appId) } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 21b30cf854e6..1b1053a7013e 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -74,13 +74,14 @@ private[spark] class CoarseGrainedExecutorBackend( override def onStart(): Unit = { if (env.conf.get(DECOMMISSION_ENABLED)) { val signal = env.conf.get(EXECUTOR_DECOMMISSION_SIGNAL) - logInfo(s"Registering SIG$signal handler to trigger decommissioning.") + logInfo(log"Registering SIG${MDC(LogKeys.SIGNAL, signal)}" + + log" handler to trigger decommissioning.") SignalUtils.register(signal, log"Failed to register SIG${MDC(LogKeys.SIGNAL, signal)} " + log"handler - disabling executor decommission feature.")( self.askSync[Boolean](ExecutorDecommissionSigReceived)) } - logInfo("Connecting to driver: " + driverUrl) + logInfo(log"Connecting to driver: ${MDC(LogKeys.URL, driverUrl)}" ) try { val securityManager = new SecurityManager(env.conf) val shuffleClientTransportConf = SparkTransportConf.fromSparkConf( @@ -182,7 +183,7 @@ private[spark] class CoarseGrainedExecutorBackend( exitExecutor(1, "Received LaunchTask command but executor was null") } else { val taskDesc = TaskDescription.decode(data.value) - logInfo("Got assigned task " + taskDesc.taskId) + logInfo(log"Got assigned task ${MDC(LogKeys.TASK_ID, taskDesc.taskId)}") executor.launchTask(this, taskDesc) } @@ -219,7 +220,7 @@ private[spark] class CoarseGrainedExecutorBackend( }.start() case UpdateDelegationTokens(tokenBytes) => - logInfo(s"Received tokens of ${tokenBytes.length} bytes") + logInfo(log"Received tokens of ${MDC(LogKeys.NUM_BYTES, tokenBytes.length)} bytes") SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf) case DecommissionExecutor => @@ -252,7 +253,8 @@ private[spark] class CoarseGrainedExecutorBackend( override def onDisconnected(remoteAddress: RpcAddress): Unit = { if (stopping.get()) { - logInfo(s"Driver from $remoteAddress disconnected during shutdown") + logInfo(log"Driver from ${MDC(LogKeys.RPC_ADDRESS, remoteAddress)}" + + log" disconnected during shutdown") } else if (driver.exists(_.address == remoteAddress)) { exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.", null, notifyDriver = false) @@ -315,8 +317,7 @@ private[spark] class CoarseGrainedExecutorBackend( log"already started decommissioning.") return } - val msg = s"Decommission executor $executorId." - logInfo(msg) + logInfo(log"Decommission executor ${MDC(LogKeys.EXECUTOR_ID, executorId)}.") try { decommissioned = true val migrationEnabled = env.conf.get(STORAGE_DECOMMISSION_ENABLED) && @@ -369,7 +370,8 @@ private[spark] class CoarseGrainedExecutorBackend( exitExecutor(0, ExecutorLossMessage.decommissionFinished, notifyDriver = true) } } else { - logInfo(s"Blocked from shutdown by ${executor.numRunningTasks} running tasks") + logInfo(log"Blocked from shutdown by" + + log" ${MDC(LogKeys.NUM_TASKS, executor.numRunningTasks)} running tasks") } Thread.sleep(sleep_time) } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 68c38fb6179f..af37d61bfd82 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -81,10 +81,12 @@ private[spark] class Executor( resources: immutable.Map[String, ResourceInformation]) extends Logging { - logInfo(s"Starting executor ID $executorId on host $executorHostname") - logInfo(s"OS info ${System.getProperty("os.name")}, ${System.getProperty("os.version")}, " + - s"${System.getProperty("os.arch")}") - logInfo(s"Java version ${System.getProperty("java.version")}") + logInfo(log"Starting executor ID ${LogMDC(LogKeys.EXECUTOR_ID, executorId)}" + + log" on host ${LogMDC(HOST_NAME, executorHostname)}") + logInfo(log"OS info ${LogMDC(OS_NAME, System.getProperty("os.name"))}," + + log" ${LogMDC(OS_VERSION, System.getProperty("os.version"))}, " + + log"${LogMDC(OS_ARCH, System.getProperty("os.arch"))}") + logInfo(log"Java version ${LogMDC(JAVA_VERSION, System.getProperty("java.version"))}") private val executorShutdown = new AtomicBoolean(false) val stopHookReference = ShutdownHookManager.addShutdownHook( @@ -219,7 +221,7 @@ private[spark] class Executor( if (sessionBasedRoot.isDirectory && sessionBasedRoot.exists()) { Utils.deleteRecursively(sessionBasedRoot) } - logInfo(s"Session evicted: ${state.sessionUUID}") + logInfo(log"Session evicted: ${LogMDC(UUID, state.sessionUUID)}") } }) .build[String, IsolatedSessionState] @@ -501,7 +503,8 @@ private[spark] class Executor( @volatile var task: Task[Any] = _ def kill(interruptThread: Boolean, reason: String): Unit = { - logInfo(s"Executor is trying to kill $taskName, reason: $reason") + logInfo(log"Executor is trying to kill ${LogMDC(TASK_NAME, taskName)}," + + log" reason: ${LogMDC(REASON, reason)}") reasonIfKilled = Some(reason) if (task != null) { synchronized { @@ -572,7 +575,7 @@ private[spark] class Executor( } else 0L Thread.currentThread.setContextClassLoader(isolatedSession.replClassLoader) val ser = env.closureSerializer.newInstance() - logInfo(s"Running $taskName") + logInfo(log"Running ${LogMDC(TASK_NAME, taskName)}") execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) var taskStartTimeNs: Long = 0 var taskStartCpu: Long = 0 @@ -656,10 +659,11 @@ private[spark] class Executor( if (releasedLocks.nonEmpty && !threwException) { val errMsg = - s"${releasedLocks.size} block locks were not released by $taskName\n" + - releasedLocks.mkString("[", ", ", "]") + log"${LogMDC(NUM_RELEASED_LOCKS, releasedLocks.size)} block locks" + + log" were not released by ${LogMDC(TASK_NAME, taskName)}\n" + + log" ${LogMDC(RELEASED_LOCKS, releasedLocks.mkString("[", ", ", "]"))})" if (conf.get(STORAGE_EXCEPTION_PIN_LEAK)) { - throw SparkException.internalError(errMsg, category = "EXECUTOR") + throw SparkException.internalError(errMsg.message, category = "EXECUTOR") } else { logInfo(errMsg) } @@ -747,10 +751,12 @@ private[spark] class Executor( blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER) - logInfo(s"Finished $taskName. $resultSize bytes result sent via BlockManager)") + logInfo(log"Finished ${LogMDC(TASK_NAME, taskName)}." + + log" ${LogMDC(NUM_BYTES, resultSize)} bytes result sent via BlockManager)") ser.serialize(new IndirectTaskResult[Any](blockId, resultSize)) } else { - logInfo(s"Finished $taskName. $resultSize bytes result sent to driver") + logInfo(log"Finished ${LogMDC(TASK_NAME, taskName)}." + + log" ${LogMDC(NUM_BYTES, resultSize)} bytes result sent to driver") // toByteBuffer is safe here, guarded by maxDirectResultSize serializedDirectResult.toByteBuffer } @@ -762,7 +768,8 @@ private[spark] class Executor( execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) } catch { case t: TaskKilledException => - logInfo(s"Executor killed $taskName, reason: ${t.reason}") + logInfo(log"Executor killed ${LogMDC(TASK_NAME, taskName)}," + + log" reason: ${LogMDC(REASON, t.reason)}") val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs) // Here and below, put task metric peaks in an immutable.ArraySeq to expose them as an @@ -775,7 +782,8 @@ private[spark] class Executor( case _: InterruptedException | NonFatal(_) if task != null && task.reasonIfKilled.isDefined => val killReason = task.reasonIfKilled.getOrElse("unknown reason") - logInfo(s"Executor interrupted and killed $taskName, reason: $killReason") + logInfo(log"Executor interrupted and killed ${LogMDC(TASK_NAME, taskName)}," + + log" reason: ${LogMDC(REASON, killReason)}") val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs) val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId).toImmutableArraySeq @@ -1078,8 +1086,9 @@ private[spark] class Executor( private def createClassLoader(urls: Array[URL], useStub: Boolean): MutableURLClassLoader = { logInfo( - s"Starting executor with user classpath (userClassPathFirst = $userClassPathFirst): " + - urls.mkString("'", ",", "'") + log"Starting executor with user classpath" + + log" (userClassPathFirst = ${LogMDC(CLASS_PATH, userClassPathFirst)}): " + + log"${LogMDC(URLS, urls.mkString("'", ",", "'"))}" ) if (useStub) { @@ -1123,12 +1132,13 @@ private[spark] class Executor( sessionUUID: String): ClassLoader = { val classUri = sessionClassUri.getOrElse(conf.get("spark.repl.class.uri", null)) val classLoader = if (classUri != null) { - logInfo("Using REPL class URI: " + classUri) + logInfo(log"Using REPL class URI: ${LogMDC(LogKeys.URI, classUri)}") new ExecutorClassLoader(conf, env, classUri, parent, userClassPathFirst) } else { parent } - logInfo(s"Created or updated repl class loader $classLoader for $sessionUUID.") + logInfo(log"Created or updated repl class loader ${LogMDC(CLASS_LOADER, classLoader)}" + + log" for ${LogMDC(UUID, sessionUUID)}.") classLoader } @@ -1163,14 +1173,16 @@ private[spark] class Executor( // Fetch missing dependencies for ((name, timestamp) <- newFiles if state.currentFiles.getOrElse(name, -1L) < timestamp) { - logInfo(s"Fetching $name with timestamp $timestamp") + logInfo(log"Fetching ${LogMDC(FILE_NAME, name)} with" + + log" timestamp ${LogMDC(TIMESTAMP, timestamp)}") // Fetch file with useCache mode, close cache for local mode. Utils.fetchFile(name, root, conf, hadoopConf, timestamp, useCache = !isLocal) state.currentFiles(name) = timestamp } for ((name, timestamp) <- newArchives if state.currentArchives.getOrElse(name, -1L) < timestamp) { - logInfo(s"Fetching $name with timestamp $timestamp") + logInfo(log"Fetching ${LogMDC(ARCHIVE_NAME, name)} with" + + log" timestamp ${LogMDC(TIMESTAMP, timestamp)}") val sourceURI = new URI(name) val uriToDownload = Utils.getUriBuilder(sourceURI).fragment(null).build() val source = Utils.fetchFile(uriToDownload.toString, Utils.createTempDir(), conf, @@ -1179,7 +1191,9 @@ private[spark] class Executor( root, if (sourceURI.getFragment != null) sourceURI.getFragment else source.getName) logInfo( - s"Unpacking an archive $name from ${source.getAbsolutePath} to ${dest.getAbsolutePath}") + log"Unpacking an archive ${LogMDC(ARCHIVE_NAME, name)}" + + log" from ${LogMDC(SOURCE_PATH, source.getAbsolutePath)}" + + log" to ${LogMDC(DESTINATION_PATH, dest.getAbsolutePath)}") Utils.deleteRecursively(dest) Utils.unpack(source, dest) state.currentArchives(name) = timestamp @@ -1190,7 +1204,8 @@ private[spark] class Executor( .orElse(state.currentJars.get(localName)) .getOrElse(-1L) if (currentTimeStamp < timestamp) { - logInfo(s"Fetching $name with timestamp $timestamp") + logInfo(log"Fetching ${LogMDC(JAR_URL, name)} with" + + log" timestamp ${LogMDC(TIMESTAMP, timestamp)}") // Fetch file with useCache mode, close cache for local mode. Utils.fetchFile(name, root, conf, hadoopConf, timestamp, useCache = !isLocal) @@ -1198,7 +1213,8 @@ private[spark] class Executor( // Add it to our class loader val url = new File(root, localName).toURI.toURL if (!state.urlClassLoader.getURLs().contains(url)) { - logInfo(s"Adding $url to class loader ${state.sessionUUID}") + logInfo(log"Adding ${LogMDC(LogKeys.URL, url)} to" + + log" class loader ${LogMDC(UUID, state.sessionUUID)}") state.urlClassLoader.addURL(url) if (isStubbingEnabledForState(state.sessionUUID)) { renewClassLoader = true diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorLogUrlHandler.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorLogUrlHandler.scala index 0ddeef8e9a82..2202489509fc 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorLogUrlHandler.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorLogUrlHandler.scala @@ -21,7 +21,8 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.util.matching.Regex -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys private[spark] class ExecutorLogUrlHandler(logUrlPattern: Option[String]) extends Logging { import ExecutorLogUrlHandler._ @@ -82,8 +83,10 @@ private[spark] class ExecutorLogUrlHandler(logUrlPattern: Option[String]) extend allPatterns: Set[String], allAttributes: Set[String]): Unit = { if (informedForMissingAttributes.compareAndSet(false, true)) { - logInfo(s"Fail to renew executor log urls: $reason. Required: $allPatterns / " + - s"available: $allAttributes. Falling back to show app's original log urls.") + logInfo(log"Fail to renew executor log urls: ${MDC(LogKeys.REASON, reason)}." + + log" Required: ${MDC(LogKeys.REGEX, allPatterns)} / " + + log"available: ${MDC(LogKeys.ATTRIBUTE_MAP, allAttributes)}." + + log" Falling back to show app's original log urls.") } } } diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapRedCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapRedCommitProtocol.scala index af0aa4151876..44f8d7cd6363 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapRedCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapRedCommitProtocol.scala @@ -20,6 +20,9 @@ package org.apache.spark.internal.io import org.apache.hadoop.mapred._ import org.apache.hadoop.mapreduce.{TaskAttemptContext => NewTaskAttemptContext} +import org.apache.spark.internal.LogKeys +import org.apache.spark.internal.MDC + /** * An [[FileCommitProtocol]] implementation backed by an underlying Hadoop OutputCommitter * (from the old mapred API). @@ -32,7 +35,8 @@ class HadoopMapRedCommitProtocol(jobId: String, path: String) override def setupCommitter(context: NewTaskAttemptContext): OutputCommitter = { val config = context.getConfiguration.asInstanceOf[JobConf] val committer = config.getOutputCommitter - logInfo(s"Using output committer class ${committer.getClass.getCanonicalName}") + logInfo(log"Using output committer class" + + log" ${MDC(LogKeys.CLASS_NAME, committer.getClass.getCanonicalName)}") committer } } diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala index 52529ff1d4e9..f86144d1e889 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala @@ -33,7 +33,7 @@ import org.apache.hadoop.mapreduce.task.{TaskAttemptContextImpl => NewTaskAttemp import org.apache.spark.{SerializableWritable, SparkConf, SparkException, TaskContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys.{JOB_ID, TASK_ATTEMPT_ID} +import org.apache.spark.internal.LogKeys.{JOB_ID, TASK_ATTEMPT_ID, TOTAL_TIME} import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.rdd.{HadoopRDD, RDD} import org.apache.spark.util.{SerializableConfiguration, SerializableJobConf, Utils} @@ -98,10 +98,11 @@ object SparkHadoopWriter extends Logging { iterator = iter) }) - logInfo(s"Start to commit write Job ${jobContext.getJobID}.") + logInfo(log"Start to commit write Job ${MDC(JOB_ID, jobContext.getJobID)}.") val (_, duration) = Utils .timeTakenMs { committer.commitJob(jobContext, ret.toImmutableArraySeq) } - logInfo(s"Write Job ${jobContext.getJobID} committed. Elapsed time: $duration ms.") + logInfo(log"Write Job ${MDC(JOB_ID, jobContext.getJobID)} committed." + + log" Elapsed time: ${MDC(TOTAL_TIME, duration)} ms.") } catch { case cause: Throwable => logError(log"Aborting job ${MDC(JOB_ID, jobContext.getJobID)}.", cause) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 1f44f7e782c4..ac93abf3fe7a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -211,7 +211,7 @@ abstract class RDD[T: ClassTag]( * @return This RDD. */ def unpersist(blocking: Boolean = false): this.type = { - logInfo(s"Removing RDD $id from persistence list") + logInfo(log"Removing RDD ${MDC(RDD_ID, id)} from persistence list") sc.unpersistRDD(id, blocking) storageLevel = StorageLevel.NONE this diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceDiscoveryScriptPlugin.scala b/core/src/main/scala/org/apache/spark/resource/ResourceDiscoveryScriptPlugin.scala index d861e9177167..51de7e2b9ac7 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceDiscoveryScriptPlugin.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceDiscoveryScriptPlugin.scala @@ -23,7 +23,8 @@ import java.util.Optional import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.resource.ResourceDiscoveryPlugin -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys import org.apache.spark.util.Utils.executeAndGetOutput /** @@ -44,7 +45,8 @@ class ResourceDiscoveryScriptPlugin extends ResourceDiscoveryPlugin with Logging val resourceName = request.id.resourceName val result = if (script.isPresent) { val scriptFile = new File(script.get) - logInfo(s"Discovering resources for $resourceName with script: $scriptFile") + logInfo(log"Discovering resources for ${MDC(LogKeys.RESOURCE_NAME, resourceName)}" + + log" with script: ${MDC(LogKeys.PATH, scriptFile)}") // check that script exists and try to execute if (scriptFile.exists()) { val output = executeAndGetOutput(Seq(script.get), new File(".")) diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala index adc1eeeb3127..7dcde35de251 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala @@ -232,7 +232,7 @@ class ResourceProfile( } val limiting = if (taskLimit == -1) "cpu" else s"$limitingResource at $taskLimit tasks per executor" - logInfo(s"Limiting resource is $limiting") + logInfo(log"Limiting resource is ${MDC(RESOURCE, limiting)}") _executorResourceSlotsPerAddr = Some(numPartsPerResourceMap.toMap) _maxTasksPerExecutor = if (taskLimit == -1) Some(1) else Some(taskLimit) _limitingResource = Some(limitingResource) @@ -374,9 +374,9 @@ object ResourceProfile extends Logging { val defProf = new ResourceProfile(executorResources, taskResources) defProf.setToDefaultProfile() defaultProfile = Some(defProf) - logInfo("Default ResourceProfile created, executor resources: " + - s"${defProf.executorResources}, task resources: " + - s"${defProf.taskResources}") + logInfo(log"Default ResourceProfile created, executor resources: " + + log"${MDC(EXECUTOR_RESOURCES, defProf.executorResources)}, task resources: " + + log"${MDC(TASK_RESOURCES, defProf.taskResources)}") defProf } } diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala index 580a5b7bb07a..6a6b5067f70f 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala @@ -23,7 +23,8 @@ import scala.collection.mutable.HashMap import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.annotation.Evolving -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKeys import org.apache.spark.internal.config.Tests._ import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerResourceProfileAdded} import org.apache.spark.util.Utils @@ -140,7 +141,7 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf, if (putNewProfile) { // force the computation of maxTasks and limitingResource now so we don't have cost later rp.limitingResource(sparkConf) - logInfo(s"Added ResourceProfile id: ${rp.id}") + logInfo(log"Added ResourceProfile id: ${MDC(LogKeys.RESOURCE_PROFILE_ID, rp.id)}") listenerBus.post(SparkListenerResourceProfileAdded(rp)) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index cc9ae5eb1ebe..ef9bef788a21 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -535,8 +535,9 @@ private[spark] class DAGScheduler( if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) { // Kind of ugly: need to register RDDs with the cache and map output tracker here // since we can't do it in the RDD constructor because # of partitions is unknown - logInfo(s"Registering RDD ${rdd.id} (${rdd.getCreationSite}) as input to " + - s"shuffle ${shuffleDep.shuffleId}") + logInfo(log"Registering RDD ${MDC(RDD_ID, rdd.id)}" + + log" (${MDC(RDD_CREATION_SITE, rdd.getCreationSite)}) as input to " + + log"shuffle ${MDC(SHUFFLE_ID, shuffleDep.shuffleId)}") mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length, shuffleDep.partitioner.numPartitions) } @@ -992,16 +993,18 @@ private[spark] class DAGScheduler( callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): Unit = { - val start = System.nanoTime + val start = System.currentTimeMillis() val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf) waiter.completionFuture.value.get match { case scala.util.Success(_) => - logInfo("Job %d finished: %s, took %f s".format - (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) + logInfo(log"Job ${MDC(JOB_ID, waiter.jobId)}" + + log" finished: ${MDC(CALL_SITE_SHORT_FORM, callSite.shortForm)}," + + log" took ${MDC(TOTAL_TIME, System.currentTimeMillis() - start)} ms") case scala.util.Failure(exception) => - logInfo("Job %d failed: %s, took %f s".format - (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) + logInfo(log"Job ${MDC(JOB_ID, waiter.jobId)}" + + log" failed: ${MDC(CALL_SITE_SHORT_FORM, callSite.shortForm)}," + + log" took ${MDC(TOTAL_TIME, System.currentTimeMillis() - start)} ms") // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler. val callerStackTrace = Thread.currentThread().getStackTrace.tail exception.setStackTrace(exception.getStackTrace ++ callerStackTrace) @@ -1097,7 +1100,7 @@ private[spark] class DAGScheduler( * Cancel a job that is running or waiting in the queue. */ def cancelJob(jobId: Int, reason: Option[String]): Unit = { - logInfo("Asked to cancel job " + jobId) + logInfo(log"Asked to cancel job ${MDC(JOB_ID, jobId)}") eventProcessLoop.post(JobCancelled(jobId, reason)) } @@ -1106,7 +1109,8 @@ private[spark] class DAGScheduler( * @param cancelFutureJobs if true, future submitted jobs in this job group will be cancelled */ def cancelJobGroup(groupId: String, cancelFutureJobs: Boolean = false): Unit = { - logInfo(s"Asked to cancel job group $groupId with cancelFutureJobs=$cancelFutureJobs") + logInfo(log"Asked to cancel job group ${MDC(GROUP_ID, groupId)}" + + log" with cancelFutureJobs=${MDC(CANCEL_FUTURE_JOBS, cancelFutureJobs)}") eventProcessLoop.post(JobGroupCancelled(groupId, cancelFutureJobs)) } @@ -1115,7 +1119,7 @@ private[spark] class DAGScheduler( */ def cancelJobsWithTag(tag: String): Unit = { SparkContext.throwIfInvalidTag(tag) - logInfo(s"Asked to cancel jobs with tag $tag") + logInfo(log"Asked to cancel jobs with tag ${MDC(JOB_TAG, tag)}") eventProcessLoop.post(JobTagCancelled(tag)) } @@ -1209,7 +1213,7 @@ private[spark] class DAGScheduler( // If cancelFutureJobs is true, store the cancelled job group id into internal states. // When a job belonging to this job group is submitted, skip running it. if (cancelFutureJobs) { - logInfo(s"Add job group $groupId into cancelled job groups") + logInfo(log"Add job group ${MDC(GROUP_ID, groupId)} into cancelled job groups") cancelledJobGroups.add(groupId) } @@ -1314,7 +1318,8 @@ private[spark] class DAGScheduler( if (jobGroupIdOpt.exists(cancelledJobGroups.contains(_))) { listener.jobFailed( SparkCoreErrors.sparkJobCancelledAsPartOfJobGroupError(jobId, jobGroupIdOpt.get)) - logInfo(s"Skip running a job that belongs to the cancelled job group ${jobGroupIdOpt.get}.") + logInfo(log"Skip running a job that belongs to the cancelled" + + log" job group ${MDC(GROUP_ID, jobGroupIdOpt.get)}.") return } @@ -1362,11 +1367,11 @@ private[spark] class DAGScheduler( val job = new ActiveJob(jobId, finalStage, callSite, listener, artifacts, properties) clearCacheLocs() - logInfo("Got job %s (%s) with %d output partitions".format( - job.jobId, callSite.shortForm, partitions.length)) - logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") - logInfo("Parents of final stage: " + finalStage.parents) - logInfo("Missing parents: " + getMissingParentStages(finalStage)) + logInfo(log"Got job ${MDC(JOB_ID, job.jobId)} (${MDC(CALL_SITE_SHORT_FORM, callSite.shortForm)})" + + log" with ${MDC(NUM_PARTITIONS, partitions.length)} output partitions") + logInfo(log"Final stage: ${MDC(STAGE, finalStage)} (${MDC(STAGE_NAME, finalStage.name)})") + logInfo(log"Parents of final stage: ${MDC(STAGE_PARENTS, finalStage.parents)}") + logInfo(log"Missing parents: ${MDC(STAGE_PARENTS, getMissingParentStages(finalStage))}") val jobSubmissionTime = clock.getTimeMillis() jobIdToActiveJob(jobId) = job @@ -1403,11 +1408,11 @@ private[spark] class DAGScheduler( val job = new ActiveJob(jobId, finalStage, callSite, listener, artifacts, properties) clearCacheLocs() - logInfo("Got map stage job %s (%s) with %d output partitions".format( - jobId, callSite.shortForm, dependency.rdd.partitions.length)) - logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") - logInfo("Parents of final stage: " + finalStage.parents) - logInfo("Missing parents: " + getMissingParentStages(finalStage)) + logInfo(log"Got map stage job ${MDC(JOB_ID, jobId)} (${MDC(CALL_SITE_SHORT_FORM, callSite.shortForm)})" + + log" with ${MDC(NUM_PARTITIONS, dependency.rdd.partitions.length)} output partitions") + logInfo(log"Final stage: ${MDC(STAGE, finalStage)} (${MDC(STAGE_NAME, finalStage.name)})") + logInfo(log"Parents of final stage: ${MDC(STAGE_PARENTS, finalStage.parents)}") + logInfo(log"Missing parents: ${MDC(STAGE_PARENTS, getMissingParentStages(finalStage))}") val jobSubmissionTime = clock.getTimeMillis() jobIdToActiveJob(jobId) = job @@ -1444,7 +1449,8 @@ private[spark] class DAGScheduler( val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) if (missing.isEmpty) { - logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") + logInfo(log"Submitting ${MDC(STAGE, stage)} (${MDC(RDD_DESCRIPTION, stage.rdd)})," + + log" which has no missing parents") submitMissingTasks(stage, jobId.get) } else { for (parent <- missing) { @@ -1681,8 +1687,9 @@ private[spark] class DAGScheduler( } if (tasks.nonEmpty) { - logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " + - s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})") + logInfo(log"Submitting ${MDC(NUM_TASKS, tasks.size)} missing tasks" + + log" from ${MDC(STAGE, stage)} (${MDC(RDD_DESCRIPTION, stage.rdd)}) (first 15 " + + log"tasks are for partitions ${MDC(PARTITION_IDS, tasks.take(15).map(_.partitionId))})") val shuffleId = stage match { case s: ShuffleMapStage => Some(s.shuffleDep.shuffleId) case _: ResultStage => None @@ -1926,8 +1933,8 @@ private[spark] class DAGScheduler( try { // killAllTaskAttempts will fail if a SchedulerBackend does not implement // killTask. - logInfo(s"Job ${job.jobId} is finished. Cancelling potential speculative " + - "or zombie tasks for this job") + logInfo(log"Job ${MDC(JOB_ID, job.jobId)} is finished. Cancelling potential speculative " + + log"or zombie tasks for this job") // ResultStage is only used by this job. It's safe to kill speculative or // zombie tasks in this stage. taskScheduler.killAllTaskAttempts( @@ -1954,7 +1961,7 @@ private[spark] class DAGScheduler( } } case None => - logInfo("Ignoring result from " + rt + " because its job has finished") + logInfo(log"Ignoring result from ${MDC(TASK, rt)} because its job has finished") } case smt: ShuffleMapTask => @@ -1969,7 +1976,8 @@ private[spark] class DAGScheduler( logDebug("ShuffleMapTask finished on " + execId) if (executorFailureEpoch.contains(execId) && smt.epoch <= executorFailureEpoch(execId)) { - logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") + logInfo(log"Ignoring possibly bogus ${MDC(TASK, smt)}" + + log" completion from executor ${MDC(EXECUTOR_ID, execId)}") } else { // The epoch of the task is acceptable (i.e., the task was launched after the most // recent failure we're aware of for the executor), so mark the task's output as @@ -1978,7 +1986,7 @@ private[spark] class DAGScheduler( shuffleStage.shuffleDep.shuffleId, smt.partitionId, status) } } else { - logInfo(s"Ignoring $smt completion from an older attempt of indeterminate stage") + logInfo(log"Ignoring ${MDC(TASK, smt)} completion from an older attempt of indeterminate stage") } if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) { @@ -1996,17 +2004,19 @@ private[spark] class DAGScheduler( val mapStage = shuffleIdToMapStage(shuffleId) if (failedStage.latestInfo.attemptNumber() != task.stageAttemptId) { - logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" + - s" ${task.stageAttemptId} and there is a more recent attempt for that stage " + - s"(attempt ${failedStage.latestInfo.attemptNumber()}) running") + logInfo(log"Ignoring fetch failure from ${MDC(TASK, task)} as it's from" + + log" ${MDC(STAGE, failedStage)} attempt ${MDC(STAGE_ATTEMPT_ID, task.stageAttemptId)}" + + log" and there is a more recent attempt for that stage " + + log"(attempt ${MDC(STAGE_ATTEMPT_NUMBER, failedStage.latestInfo.attemptNumber())}) running") } else { val ignoreStageFailure = ignoreDecommissionFetchFailure && isExecutorDecommissioningOrDecommissioned(taskScheduler, bmAddress) if (ignoreStageFailure) { - logInfo(s"Ignoring fetch failure from $task of $failedStage attempt " + - s"${task.stageAttemptId} when count ${config.STAGE_MAX_CONSECUTIVE_ATTEMPTS.key} " + - s"as executor ${bmAddress.executorId} is decommissioned and " + - s" ${config.STAGE_IGNORE_DECOMMISSION_FETCH_FAILURE.key}=true") + logInfo(log"Ignoring fetch failure from ${MDC(TASK, task)} of ${MDC(STAGE, failedStage)}" + + log" attempt ${MDC(STAGE_ATTEMPT_ID, task.stageAttemptId)} when count" + + log" ${MDC(KEY, config.STAGE_MAX_CONSECUTIVE_ATTEMPTS.key)} as executor " + + log"${MDC(EXECUTOR_ID, bmAddress.executorId)} is decommissioned and " + + log" ${MDC(KEY2, config.STAGE_IGNORE_DECOMMISSION_FETCH_FAILURE.key)}=true") } else { failedStage.failedAttemptIds.add(task.stageAttemptId) } @@ -2019,13 +2029,13 @@ private[spark] class DAGScheduler( // multiple tasks running concurrently on different executors). In that case, it is // possible the fetch failure has already been handled by the scheduler. if (runningStages.contains(failedStage)) { - logInfo(s"Marking $failedStage (${failedStage.name}) as failed " + - s"due to a fetch failure from $mapStage (${mapStage.name})") + logInfo(log"Marking ${MDC(STAGE, failedStage)} (${MDC(STAGE_NAME, failedStage.name)}) as failed " + + log"due to a fetch failure from ${MDC(STAGE2, mapStage)} (${MDC(STAGE_NAME2, mapStage.name)})") markStageAsFinished(failedStage, errorMessage = Some(failureMessage), willRetry = !shouldAbortStage) } else { - logDebug(s"Received fetch failure from $task, but it's from $failedStage which is no " + - "longer running") + logDebug(log"Received fetch failure from ${MDC(TASK, task)}," + + log" but it's from ${MDC(STAGE, failedStage)} which is no longer running") } if (mapStage.rdd.isBarrier()) { @@ -2148,9 +2158,9 @@ private[spark] class DAGScheduler( case _ => } - logInfo(s"The shuffle map stage $mapStage with indeterminate output was failed, " + - s"we will roll back and rerun below stages which include itself and all its " + - s"indeterminate child stages: $rollingBackStages") + logInfo(log"The shuffle map stage ${MDC(STAGE, mapStage)} with indeterminate output was failed," + + log" we will roll back and rerun below stages which include itself and all its" + + log" indeterminate child stages: ${MDC(STAGES, rollingBackStages)}") } // We expect one executor failure to trigger many FetchFailures in rapid succession, @@ -2162,8 +2172,8 @@ private[spark] class DAGScheduler( // producing a resubmit for each failed stage makes debugging and logging a little // simpler while not producing an overwhelming number of scheduler events. logInfo( - s"Resubmitting $mapStage (${mapStage.name}) and " + - s"$failedStage (${failedStage.name}) due to fetch failure" + log"Resubmitting ${MDC(STAGE, mapStage)} (${MDC(STAGE_NAME, mapStage.name)}) and " + + log"${MDC(STAGE2, failedStage)} (${MDC(STAGE_NAME, failedStage.name)}) due to fetch failure" ) messageScheduler.schedule( new Runnable { @@ -2223,12 +2233,12 @@ private[spark] class DAGScheduler( // Always fail the current stage and retry all the tasks when a barrier task fail. val failedStage = stageIdToStage(task.stageId) if (failedStage.latestInfo.attemptNumber() != task.stageAttemptId) { - logInfo(s"Ignoring task failure from $task as it's from $failedStage attempt" + - s" ${task.stageAttemptId} and there is a more recent attempt for that stage " + - s"(attempt ${failedStage.latestInfo.attemptNumber()}) running") + logInfo(log"Ignoring task failure from ${MDC(TASK, task)} as it's from ${MDC(STAGE, failedStage)} attempt" + + log" ${MDC(STAGE_ATTEMPT_ID, task.stageAttemptId)} and there is a more recent attempt for that stage " + + log"(attempt ${MDC(STAGE_ATTEMPT_NUMBER, failedStage.latestInfo.attemptNumber())}) running") } else { - logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to a barrier task " + - "failed.") + logInfo(log"Marking ${MDC(STAGE, failedStage)} (${MDC(STAGE_NAME, failedStage.name)})" + + log"as failed due to a barrier task failed.") val message = s"Stage failed because barrier task $task finished unsuccessfully.\n" + failure.toErrorString try { @@ -2283,8 +2293,8 @@ private[spark] class DAGScheduler( val noResubmitEnqueued = !failedStages.contains(failedStage) failedStages += failedStage if (noResubmitEnqueued) { - logInfo(s"Resubmitting $failedStage (${failedStage.name}) due to barrier stage " + - "failure.") + logInfo(log"Resubmitting ${MDC(STAGE, failedStage)} (${MDC(STAGE_NAME, failedStage.name)})" + + log"due to barrier stage failure.") messageScheduler.schedule(new Runnable { override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) @@ -2361,8 +2371,8 @@ private[spark] class DAGScheduler( // delay should be 0 and registerMergeResults should be true. assert(delay == 0 && registerMergeResults) if (task.getDelay(TimeUnit.NANOSECONDS) > 0 && task.cancel(false)) { - logInfo(s"$stage (${stage.name}) scheduled for finalizing shuffle merge immediately " + - s"after cancelling previously scheduled task.") + logInfo(log"${MDC(STAGE, stage)} (${MDC(STAGE_NAME, stage.name)}) scheduled for" + + log" finalizing shuffle merge immediately after cancelling previously scheduled task.") shuffleDep.setFinalizeTask( shuffleMergeFinalizeScheduler.schedule( new Runnable { @@ -2373,13 +2383,14 @@ private[spark] class DAGScheduler( ) ) } else { - logInfo(s"$stage (${stage.name}) existing scheduled task for finalizing shuffle merge" + - s"would either be in-progress or finished. No need to schedule shuffle merge" + - s" finalization again.") + logInfo(log"${MDC(STAGE, stage)} (${MDC(STAGE_NAME, stage.name)}) existing scheduled task" + + log" for finalizing shuffle merge would either be in-progress or finished. No need to" + + log" schedule shuffle merge finalization again.") } case None => // If no previous finalization task is scheduled, schedule the finalization task. - logInfo(s"$stage (${stage.name}) scheduled for finalizing shuffle merge in $delay s") + logInfo(log"${MDC(STAGE, stage)} (${MDC(STAGE_NAME, stage.name)}) scheduled for finalizing" + + log" shuffle merge in ${MDC(DELAY, delay)} s") shuffleDep.setFinalizeTask( shuffleMergeFinalizeScheduler.schedule( new Runnable { @@ -2408,8 +2419,8 @@ private[spark] class DAGScheduler( private[scheduler] def finalizeShuffleMerge( stage: ShuffleMapStage, registerMergeResults: Boolean = true): Unit = { - logInfo(s"$stage (${stage.name}) finalizing the shuffle merge with registering merge " + - s"results set to $registerMergeResults") + logInfo(log"${MDC(STAGE, stage)} (${MDC(STAGE_NAME, stage.name)}) finalizing the shuffle merge" + + log" with registering merge results set to ${MDC(REGISTER_MERGE_RESULT, registerMergeResults)}") val shuffleId = stage.shuffleDep.shuffleId val shuffleMergeId = stage.shuffleDep.shuffleMergeId val numMergers = stage.shuffleDep.getMergerLocs.length @@ -2479,8 +2490,8 @@ private[spark] class DAGScheduler( } catch { case _: TimeoutException => timedOut = true - logInfo(s"Timed out on waiting for merge results from all " + - s"$numMergers mergers for shuffle $shuffleId") + logInfo(log"Timed out on waiting for merge results from all " + + log"${MDC(NUM_MERGERS, numMergers)} mergers for shuffle ${MDC(SHUFFLE_ID, shuffleId)}") } finally { if (timedOut || !registerMergeResults) { cancelFinalizeShuffleMergeFutures(scheduledFutures, @@ -2511,9 +2522,9 @@ private[spark] class DAGScheduler( private def processShuffleMapStageCompletion(shuffleStage: ShuffleMapStage): Unit = { markStageAsFinished(shuffleStage) logInfo("looking for newly runnable stages") - logInfo("running: " + runningStages) - logInfo("waiting: " + waitingStages) - logInfo("failed: " + failedStages) + logInfo(log"running: ${MDC(STAGES, runningStages)}") + logInfo(log"waiting: ${MDC(STAGES, waitingStages)}") + logInfo(log"failed: ${MDC(STAGES, failedStages)}") // This call to increment the epoch may not be strictly necessary, but it is retained // for now in order to minimize the changes in behavior from an earlier version of the @@ -2529,9 +2540,9 @@ private[spark] class DAGScheduler( if (!shuffleStage.isAvailable) { // Some tasks had failed; let's resubmit this shuffleStage. // TODO: Lower-level scheduler should also deal with this - logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name + - ") because some of its tasks had failed: " + - shuffleStage.findMissingPartitions().mkString(", ")) + logInfo(log"Resubmitting ${MDC(STAGE, shuffleStage)} (${MDC(STAGE_NAME, shuffleStage.name)})" + + log" because some of its tasks had failed:" + + log" ${MDC(PARTITION_IDS, shuffleStage.findMissingPartitions().mkString(", "))}") submitStage(shuffleStage) } else { markMapStageJobsAsFinished(shuffleStage) @@ -2603,7 +2614,7 @@ private[spark] class DAGScheduler( } private def handleResubmittedFailure(task: Task[_], stage: Stage): Unit = { - logInfo(s"Resubmitted $task, so marking it as still running.") + logInfo(log"Resubmitted ${MDC(TASK, task)}, so marking it as still running.") stage match { case sms: ShuffleMapStage => sms.pendingPartitions += task.partitionId @@ -2679,7 +2690,7 @@ private[spark] class DAGScheduler( if (!isShuffleMerger && (!executorFailureEpoch.contains(execId) || executorFailureEpoch(execId) < currentEpoch)) { executorFailureEpoch(execId) = currentEpoch - logInfo(s"Executor lost: $execId (epoch $currentEpoch)") + logInfo(log"Executor lost: ${MDC(EXECUTOR_ID, execId)} (epoch ${MDC(EPOCH, currentEpoch)})") if (pushBasedShuffleEnabled) { // Remove fetchFailed host in the shuffle push merger list for push based shuffle hostToUnregisterOutputs.foreach( @@ -2703,10 +2714,10 @@ private[spark] class DAGScheduler( if (remove) { hostToUnregisterOutputs match { case Some(host) => - logInfo(s"Shuffle files lost for host: $host (epoch $currentEpoch)") + logInfo(log"Shuffle files lost for host: ${MDC(HOST, host)} (epoch ${MDC(EPOCH, currentEpoch)})") mapOutputTracker.removeOutputsOnHost(host) case None => - logInfo(s"Shuffle files lost for executor: $execId (epoch $currentEpoch)") + logInfo(log"Shuffle files lost for executor: ${MDC(EXECUTOR_ID, execId)} (epoch ${MDC(EPOCH, currentEpoch)})") mapOutputTracker.removeOutputsOnExecutor(execId) } } @@ -2728,7 +2739,7 @@ private[spark] class DAGScheduler( workerId: String, host: String, message: String): Unit = { - logInfo("Shuffle files lost for worker %s on host %s".format(workerId, host)) + logInfo(log"Shuffle files lost for worker ${MDC(WORKER_ID, workerId)} on host ${MDC(HOST, host)}") mapOutputTracker.removeOutputsOnHost(host) clearCacheLocs() } @@ -2736,7 +2747,7 @@ private[spark] class DAGScheduler( private[scheduler] def handleExecutorAdded(execId: String, host: String): Unit = { // remove from executorFailureEpoch(execId) ? if (executorFailureEpoch.contains(execId)) { - logInfo("Host added was in lost list earlier: " + host) + logInfo(log"Host added was in lost list earlier: ${MDC(HOST, host)}") executorFailureEpoch -= execId } shuffleFileLostEpoch -= execId @@ -2749,10 +2760,10 @@ private[spark] class DAGScheduler( }.foreach { case (_, stage: ShuffleMapStage) => configureShufflePushMergerLocations(stage) if (stage.shuffleDep.getMergerLocs.nonEmpty) { - logInfo(s"Shuffle merge enabled adaptively for $stage with shuffle" + - s" ${stage.shuffleDep.shuffleId} and shuffle merge" + - s" ${stage.shuffleDep.shuffleMergeId} with ${stage.shuffleDep.getMergerLocs.size}" + - s" merger locations") + logInfo(log"Shuffle merge enabled adaptively for ${MDC(STAGE, stage)} with shuffle" + + log" ${MDC(SHUFFLE_ID, stage.shuffleDep.shuffleId)} and shuffle merge" + + log" ${MDC(SHUFFLE_MERGE_ID, stage.shuffleDep.shuffleMergeId)} with" + + log" ${MDC(NUM_MERGER_LOCATIONS, stage.shuffleDep.getMergerLocs.size)} merger locations") } } } @@ -2772,7 +2783,7 @@ private[spark] class DAGScheduler( handleJobCancellation(jobId, Option(reasonStr)) } case None => - logInfo("No active jobs to kill for Stage " + stageId) + logInfo(log"No active jobs to kill for Stage ${MDC(STAGE_ID, stageId)}") } } @@ -2795,11 +2806,11 @@ private[spark] class DAGScheduler( errorMessage: Option[String] = None, willRetry: Boolean = false): Unit = { val serviceTime = stage.latestInfo.submissionTime match { - case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0) + case Some(t) => (clock.getTimeMillis() - t).toString case _ => "Unknown" } if (errorMessage.isEmpty) { - logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) + logInfo(log"${MDC(STAGE, stage)} (${MDC(STAGE_NAME, stage.name)}) finished in ${MDC(TOTAL_TIME, serviceTime)} ms") stage.latestInfo.completionTime = Some(clock.getTimeMillis()) // Clear failure count for this stage, now that it's succeeded. @@ -2809,7 +2820,8 @@ private[spark] class DAGScheduler( stage.clearFailures() } else { stage.latestInfo.stageFailed(errorMessage.get) - logInfo(s"$stage (${stage.name}) failed in $serviceTime s due to ${errorMessage.get}") + logInfo(log"${MDC(STAGE, stage)} (${MDC(STAGE_NAME, stage.name)}) failed in" + + log" ${MDC(TOTAL_TIME, serviceTime)} ms due to ${MDC(EXCEPTION, errorMessage.get)}") } updateStageInfoForPushBasedShuffle(stage) if (!willRetry) { @@ -2855,7 +2867,7 @@ private[spark] class DAGScheduler( failJobAndIndependentStages(job, finalException) } if (dependentJobs.isEmpty) { - logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done") + logInfo(log"Ignoring failure of ${MDC(STAGE, failedStage)} because all jobs depending on it are done") } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 572e69885325..16a2f4fb6cad 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -566,8 +566,9 @@ class ExternalAppendOnlyMap[K, V, C]( if (hasSpilled) { false } else { - logInfo(s"Task ${context.taskAttemptId()} force spilling in-memory map to disk and " + - s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") + logInfo(log"Task ${MDC(TASK_ATTEMPT_ID, context.taskAttemptId())} force spilling" + + log" in-memory map to disk and it will release " + + log"${MDC(NUM_BYTES, org.apache.spark.util.Utils.bytesToString(getUsed()))} memory") val nextUpstream = spillMemoryIteratorToDisk(upstream) assert(!upstream.hasNext) hasSpilled = true From 85a560f9f75c34556c82b7bb4e792b2390aac6c9 Mon Sep 17 00:00:00 2001 From: Tuan Pham Date: Fri, 24 May 2024 08:39:25 +1000 Subject: [PATCH 02/12] Migrate to structured logging --- .../org/apache/spark/internal/LogKey.scala | 14 ++++++ .../spark/scheduler/HealthTracker.scala | 40 +++++++++-------- .../scheduler/OutputCommitCoordinator.scala | 17 +++++--- .../spark/scheduler/SchedulableBuilder.scala | 28 +++++++----- .../spark/scheduler/StatsReportListener.scala | 22 ++++++---- .../spark/scheduler/TaskSetExcludeList.scala | 8 ++-- .../CoarseGrainedSchedulerBackend.scala | 43 +++++++++++-------- .../cluster/StandaloneSchedulerBackend.scala | 18 ++++---- .../scheduler/dynalloc/ExecutorMonitor.scala | 21 +++++---- 9 files changed, 129 insertions(+), 82 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 5e492a2decb8..561f5e49c364 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -152,6 +152,7 @@ object LogKeys { case object DATA_FILE_NUM extends LogKey case object DATA_SOURCE extends LogKey case object DATA_SOURCES extends LogKey + case object DECOMMISSION_INFO extends LogKey case object DEFAULT_COMPACTION_INTERVAL extends LogKey case object DEFAULT_COMPACT_INTERVAL extends LogKey case object DEFAULT_ISOLATION_LEVEL extends LogKey @@ -260,6 +261,7 @@ object LogKeys { case object HASH_JOIN_KEYS extends LogKey case object HASH_MAP_SIZE extends LogKey case object HAS_R_PACKAGE extends LogKey + case object HEADING extends LogKey case object HEARTBEAT extends LogKey case object HEARTBEAT_INTERVAL extends LogKey case object HISTORY_DIR extends LogKey @@ -331,6 +333,7 @@ object LogKeys { case object LOGICAL_PLAN_LEAVES extends LogKey case object LOG_ID extends LogKey case object LOG_KEY_FILE extends LogKey + case object LOG_LEVEL extends LogKey case object LOG_OFFSET extends LogKey case object LOG_TYPE extends LogKey case object LOWER_BOUND extends LogKey @@ -374,6 +377,7 @@ object LogKeys { case object MINI_BATCH_FRACTION extends LogKey case object MIN_COMPACTION_BATCH_ID extends LogKey case object MIN_FREQUENT_PATTERN_COUNT extends LogKey + case object MIN_REGISTER_RATIO extends LogKey case object MIN_POINT_PER_CLUSTER extends LogKey case object MIN_SHARE extends LogKey case object MIN_SIZE extends LogKey @@ -391,6 +395,7 @@ object LogKeys { case object NEW_VALUE extends LogKey case object NEXT_RENEWAL_TIME extends LogKey case object NODES extends LogKey + case object NODE_IDS extends LogKey case object NODE_LOCATION extends LogKey case object NON_BUILT_IN_CONNECTORS extends LogKey case object NORM extends LogKey @@ -421,6 +426,7 @@ object LogKeys { case object NUM_ELEMENTS_SPILL_THRESHOLD extends LogKey case object NUM_EVENTS extends LogKey case object NUM_EXAMPLES extends LogKey + case object NUM_EXECUTORS extends LogKey case object NUM_EXECUTOR_CORES extends LogKey case object NUM_EXECUTOR_CORES_REMAINING extends LogKey case object NUM_EXECUTOR_CORES_TOTAL extends LogKey @@ -431,6 +437,7 @@ object LogKeys { case object NUM_FILES_FAILED_TO_DELETE extends LogKey case object NUM_FILES_REUSED extends LogKey case object NUM_FREQUENT_ITEMS extends LogKey + case object NUM_GRACEFULLY_DECOMMISSIONED extends LogKey case object NUM_INDEX_FILES extends LogKey case object NUM_ITERATIONS extends LogKey case object NUM_LEADING_SINGULAR_VALUES extends LogKey @@ -469,6 +476,8 @@ object LogKeys { case object NUM_SUB_DIRS extends LogKey case object NUM_TASKS extends LogKey case object NUM_TASK_CPUS extends LogKey + case object NUM_UNFINISHED_DECOMMISSION extends LogKey + case object NUM_UNEXPECTEDLY_EXIT extends LogKey case object NUM_VERSIONS_RETAIN extends LogKey case object OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD extends LogKey case object OBJECT_ID extends LogKey @@ -506,6 +515,7 @@ object LogKeys { case object PATHS extends LogKey case object PEER extends LogKey case object PERCENT extends LogKey + case object PERCENTILE_HEADER extends LogKey case object PIPELINE_STAGE_UID extends LogKey case object PLUGIN_NAME extends LogKey case object POD_COUNT extends LogKey @@ -541,6 +551,7 @@ object LogKeys { case object PYTHON_WORKER_MODULE extends LogKey case object PYTHON_WORKER_RESPONSE extends LogKey case object QUERY_CACHE_VALUE extends LogKey + case object QUANTILES extends LogKey case object QUERY_HINT extends LogKey case object QUERY_ID extends LogKey case object QUERY_PLAN extends LogKey @@ -660,12 +671,14 @@ object LogKeys { case object STAGE_NAME extends LogKey case object STAGE_NAME2 extends LogKey case object STAGE_PARENTS extends LogKey + case object STAGE_STATUS extends LogKey case object START_INDEX extends LogKey case object STATEMENT_ID extends LogKey case object STATE_STORE_ID extends LogKey case object STATE_STORE_PROVIDER extends LogKey case object STATE_STORE_VERSION extends LogKey case object STATUS extends LogKey + case object STAT_COUNTER extends LogKey case object STDERR extends LogKey case object STOP_SITE_SHORT_FORM extends LogKey case object STORAGE_LEVEL extends LogKey @@ -700,6 +713,7 @@ object LogKeys { case object TASK_NAME extends LogKey case object TASK_REQUIREMENTS extends LogKey case object TASK_RESOURCES extends LogKey + case object TASK_SET_MANAGER_NAME extends LogKey case object TASK_SET_NAME extends LogKey case object TASK_STATE extends LogKey case object TEMP_FILE extends LogKey diff --git a/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala index cecf5d498ac4..df5190ceb78b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala @@ -111,8 +111,8 @@ private[scheduler] class HealthTracker ( val execsToInclude = executorIdToExcludedStatus.filter(_._2.expiryTime < now).keys if (execsToInclude.nonEmpty) { // Include any executors that have been excluded longer than the excludeOnFailure timeout. - logInfo(s"Removing executors $execsToInclude from exclude list because the " + - s"the executors have reached the timed out") + logInfo(log"Removing executors ${MDC(EXECUTOR_IDS, execsToInclude)} from exclude" + + log" list because the executors have reached the timed out") execsToInclude.foreach { exec => val status = executorIdToExcludedStatus.remove(exec).get val failedExecsOnNode = nodeToExcludedExecs(status.node) @@ -128,8 +128,8 @@ private[scheduler] class HealthTracker ( val nodesToInclude = nodeIdToExcludedExpiryTime.filter(_._2 < now).keys if (nodesToInclude.nonEmpty) { // Include any nodes that have been excluded longer than the excludeOnFailure timeout. - logInfo(s"Removing nodes $nodesToInclude from exclude list because the " + - s"nodes have reached has timed out") + logInfo(log"Removing nodes ${MDC(NODE_IDS, nodesToInclude)} from exclude" + + log" list because the nodes have reached has timed out") nodesToInclude.foreach { node => nodeIdToExcludedExpiryTime.remove(node) // post both to keep backwards compatibility @@ -158,23 +158,23 @@ private[scheduler] class HealthTracker ( private def killExecutor(exec: String, msg: String): Unit = { val fullMsg = if (EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED) { - s"${msg} (actually decommissioning)" + log"${MDC(MESSAGE, msg)} (actually decommissioning)" } else { - msg + log"$MDC(MESSAGE, msg)" } allocationClient match { case Some(a) => logInfo(fullMsg) if (EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED) { - a.decommissionExecutor(exec, ExecutorDecommissionInfo(fullMsg), + a.decommissionExecutor(exec, ExecutorDecommissionInfo(fullMsg.message), adjustTargetNumExecutors = false) } else { a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, countFailures = false, force = true) } case None => - logInfo(s"Not attempting to kill excluded executor id $exec " + - s"since allocation client is not defined.") + logInfo(log"Not attempting to kill excluded executor id ${MDC(EXECUTOR_ID, exec)} " + + log"since allocation client is not defined.") } } @@ -196,14 +196,14 @@ private[scheduler] class HealthTracker ( allocationClient match { case Some(a) => if (EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED) { - logInfo(s"Decommissioning all executors on excluded host $node " + - s"since ${config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key} is set.") + logInfo(log"Decommissioning all executors on excluded host ${MDC(HOST, node)} " + + log"since ${MDC(KEY, config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key)} is set.") if (!a.decommissionExecutorsOnHost(node)) { logError(log"Decommissioning executors on ${MDC(HOST, node)} failed.") } } else { - logInfo(s"Killing all executors on excluded host $node " + - s"since ${config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key} is set.") + logInfo(log"Killing all executors on excluded host ${MDC(HOST, node)} " + + log"since ${MDC(KEY, config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key)} is set.") if (!a.killExecutorsOnHost(node)) { logError(log"Killing executors on node ${MDC(HOST, node)} failed.") } @@ -231,7 +231,8 @@ private[scheduler] class HealthTracker ( if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) { if (!nodeIdToExcludedExpiryTime.contains(host)) { - logInfo(s"excluding node $host due to fetch failure of external shuffle service") + logInfo(log"excluding node ${MDC(HOST, host)} due to fetch failure of" + + log" external shuffle service") nodeIdToExcludedExpiryTime.put(host, expiryTimeForNewExcludes) // post both to keep backwards compatibility @@ -242,7 +243,7 @@ private[scheduler] class HealthTracker ( updateNextExpiryTime() } } else if (!executorIdToExcludedStatus.contains(exec)) { - logInfo(s"Excluding executor $exec due to fetch failure") + logInfo(log"Excluding executor ${MDC(EXECUTOR_ID, exec)} due to fetch failure") executorIdToExcludedStatus.put(exec, ExcludedExecutor(host, expiryTimeForNewExcludes)) // We hardcoded number of failure tasks to 1 for fetch failure, because there's no @@ -280,8 +281,8 @@ private[scheduler] class HealthTracker ( // some of the logic around expiry times a little more confusing. But it also wouldn't be a // problem to re-exclude, with a later expiry time. if (newTotal >= MAX_FAILURES_PER_EXEC && !executorIdToExcludedStatus.contains(exec)) { - logInfo(s"Excluding executor id: $exec because it has $newTotal" + - s" task failures in successful task sets") + logInfo(log"Excluding executor id: ${MDC(EXECUTOR_ID, exec)} because" + + log" it has ${MDC(NUM_FAILURES, newTotal)} task failures in successful task sets") val node = failuresInTaskSet.node executorIdToExcludedStatus.put(exec, ExcludedExecutor(node, expiryTimeForNewExcludes)) // post both to keep backwards compatibility @@ -299,8 +300,9 @@ private[scheduler] class HealthTracker ( // time. if (excludedExecsOnNode.size >= MAX_FAILED_EXEC_PER_NODE && !nodeIdToExcludedExpiryTime.contains(node)) { - logInfo(s"Excluding node $node because it has ${excludedExecsOnNode.size} " + - s"executors excluded: ${excludedExecsOnNode}") + logInfo(log"Excluding node ${MDC(HOST, node)} because it has" + + log" ${MDC(NUM_EXECUTORS, excludedExecsOnNode.size)} " + + log"executors excluded: ${MDC(EXECUTOR_IDS, excludedExecsOnNode)}") nodeIdToExcludedExpiryTime.put(node, expiryTimeForNewExcludes) // post both to keep backwards compatibility listenerBus.post(SparkListenerNodeBlacklisted(now, node, excludedExecsOnNode.size)) diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index cd5d6b8f9c90..755e9886784a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -20,7 +20,8 @@ package org.apache.spark.scheduler import scala.collection.mutable import org.apache.spark._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys._ import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv} import org.apache.spark.util.{RpcUtils, ThreadUtils} @@ -124,7 +125,7 @@ private[spark] class OutputCommitCoordinator( stageStates.get(stage) match { case Some(state) => require(state.authorizedCommitters.length == maxPartitionId + 1) - logInfo(s"Reusing state from previous attempt of stage $stage.") + logInfo(log"Reusing state from previous attempt of stage ${MDC(STAGE, stage)}.") case _ => stageStates(stage) = new StageState(maxPartitionId + 1) @@ -151,8 +152,10 @@ private[spark] class OutputCommitCoordinator( case Success => // The task output has been committed successfully case _: TaskCommitDenied => - logInfo(s"Task was denied committing, stage: $stage.$stageAttempt, " + - s"partition: $partition, attempt: $attemptNumber") + logInfo(log"Task was denied committing, stage:" + + log" ${MDC(STAGE, stage)}.${MDC(STAGE_ATTEMPT, stageAttempt)}, " + + log"partition: ${MDC(PARTITION_ID, partition)}," + + log" attempt: ${MDC(STAGE_ATTEMPT_NUMBER, attemptNumber)}") case _ => // Mark the attempt as failed to exclude from future commit protocol val taskId = TaskIdentifier(stageAttempt, attemptNumber) @@ -182,8 +185,10 @@ private[spark] class OutputCommitCoordinator( attemptNumber: Int): Boolean = synchronized { stageStates.get(stage) match { case Some(state) if attemptFailed(state, stageAttempt, partition, attemptNumber) => - logInfo(s"Commit denied for stage=$stage.$stageAttempt, partition=$partition: " + - s"task attempt $attemptNumber already marked as failed.") + logInfo(log"Commit denied for" + + log" stage=${MDC(STAGE, stage)}.${MDC(STAGE_ATTEMPT, stageAttempt)}," + + log" partition=${MDC(PARTITION_ID, partition)}: " + + log"task attempt ${MDC(STAGE_ATTEMPT_NUMBER, attemptNumber)} already marked as failed.") false case Some(state) => val existing = state.authorizedCommitters(partition) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index b5cc6261cea3..90506ab8357e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -80,20 +80,23 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext fileData = schedulerAllocFile.map { f => val filePath = new Path(f) val fis = filePath.getFileSystem(sc.hadoopConfiguration).open(filePath) - logInfo(s"Creating Fair Scheduler pools from $f") + logInfo(log"Creating Fair Scheduler pools from ${MDC(PATH, f)}") Some((fis, f)) }.getOrElse { val is = Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE) if (is != null) { - logInfo(s"Creating Fair Scheduler pools from default file: $DEFAULT_SCHEDULER_FILE") + logInfo(log"Creating Fair Scheduler pools from default file:" + + log" ${MDC(PATH, DEFAULT_SCHEDULER_FILE)}") Some((is, DEFAULT_SCHEDULER_FILE)) } else { val schedulingMode = SchedulingMode.withName(sc.conf.get(SCHEDULER_MODE)) rootPool.addSchedulable(new Pool( DEFAULT_POOL_NAME, schedulingMode, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) - logInfo("Fair scheduler configuration not found, created default pool: " + - "%s, schedulingMode: %s, minShare: %d, weight: %d".format( - DEFAULT_POOL_NAME, schedulingMode, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) + logInfo(log"Fair scheduler configuration not found, created default pool: " + + log"${MDC(POOL_NAME, DEFAULT_POOL_NAME)}," + + log" schedulingMode: ${MDC(XML_SCHEDULING_MODE, schedulingMode)}," + + log" minShare: ${MDC(MIN_SHARE, DEFAULT_MINIMUM_SHARE)}," + + log" weight: ${MDC(WEIGHT, DEFAULT_WEIGHT)}") None } } @@ -122,8 +125,10 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) rootPool.addSchedulable(pool) - logInfo("Created default pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format( - DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) + logInfo(log"Created default pool: ${MDC(POOL_NAME, DEFAULT_POOL_NAME)}," + + log" schedulingMode: ${MDC(XML_SCHEDULING_MODE, DEFAULT_SCHEDULING_MODE)}," + + log" minShare: ${MDC(MIN_SHARE, DEFAULT_MINIMUM_SHARE)}," + + log" weight: ${MDC(WEIGHT, DEFAULT_WEIGHT)}") } } @@ -142,8 +147,10 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext rootPool.addSchedulable(new Pool(poolName, schedulingMode, minShare, weight)) - logInfo("Created pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format( - poolName, schedulingMode, minShare, weight)) + logInfo(log"Created pool: ${MDC(POOL_NAME, poolName)}," + + log" schedulingMode: ${MDC(XML_SCHEDULING_MODE, schedulingMode)}," + + log" minShare: ${MDC(MIN_SHARE, minShare)}," + + log" weight: ${MDC(WEIGHT, weight)}") } } @@ -220,6 +227,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext log"weight: ${MDC(WEIGHT, DEFAULT_WEIGHT)}") } parentPool.addSchedulable(manager) - logInfo("Added task set " + manager.name + " tasks to pool " + poolName) + logInfo(log"Added task set ${MDC(TASK_SET_MANAGER_NAME, manager.name)} tasks" + + log" to pool ${MDC(POOL_NAME, poolName)}") } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala b/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala index 1f12b46412bc..54677b92e773 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala @@ -21,7 +21,8 @@ import scala.collection.mutable import org.apache.spark.annotation.DeveloperApi import org.apache.spark.executor.TaskMetrics -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC, MessageWithContext} +import org.apache.spark.internal.LogKeys._ import org.apache.spark.util.{Distribution, Utils} @@ -46,7 +47,7 @@ class StatsReportListener extends SparkListener with Logging { override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { implicit val sc = stageCompleted - this.logInfo(s"Finished stage: ${getStatusDetail(stageCompleted.stageInfo)}") + this.logInfo(log"Finished stage: " + getStatusDetail(stageCompleted.stageInfo)) showMillisDistribution("task runtime:", (info, _) => info.duration, taskInfoMetrics.toSeq) // Shuffle write @@ -73,15 +74,18 @@ class StatsReportListener extends SparkListener with Logging { taskInfoMetrics.clear() } - private def getStatusDetail(info: StageInfo): String = { + private def getStatusDetail(info: StageInfo): MessageWithContext = { val failureReason = info.failureReason.map("(" + _ + ")").getOrElse("") val timeTaken = info.submissionTime.map( x => info.completionTime.getOrElse(System.currentTimeMillis()) - x ).getOrElse("-") - s"Stage(${info.stageId}, ${info.attemptNumber()}); Name: '${info.name}'; " + - s"Status: ${info.getStatusString}$failureReason; numTasks: ${info.numTasks}; " + - s"Took: $timeTaken msec" + log"Stage(${MDC(STAGE_ID, info.stageId)}," + + log" ${MDC(STAGE_ATTEMPT_NUMBER, info.attemptNumber())});" + + log" Name: '${MDC(STAGE_NAME, info.name)}'; " + + log"Status: ${MDC(STAGE_STATUS, info.getStatusString)} " + + log"${MDC(REASON, failureReason)}; numTasks: ${MDC(NUM_TASKS, info.numTasks)}; " + + log"Took: ${MDC(TOTAL_TIME, timeTaken)} msec" } } @@ -111,9 +115,9 @@ private[spark] object StatsReportListener extends Logging { def showDistribution(heading: String, d: Distribution, formatNumber: Double => String): Unit = { val stats = d.statCounter val quantiles = d.getQuantiles(probabilities).map(formatNumber) - logInfo(heading + stats) - logInfo(percentilesHeader) - logInfo("\t" + quantiles.mkString("\t")) + logInfo(log"${MDC(HEADING, heading)}${MDC(STAT_COUNTER, stats)}") + logInfo(log"${MDC(PERCENTILE_HEADER, percentilesHeader)}") + logInfo(log"\t${MDC(QUANTILES, quantiles.mkString("\t"))}") } def showDistribution( diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala index f479e5e32bc2..f8dd60b8933e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala @@ -19,7 +19,8 @@ package org.apache.spark.scheduler import scala.collection.mutable.{HashMap, HashSet} import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys.{EXECUTOR_ID, HOST, STAGE_ID} import org.apache.spark.internal.config import org.apache.spark.util.Clock @@ -134,7 +135,8 @@ private[scheduler] class TaskSetExcludelist( val numFailures = execFailures.numUniqueTasksWithFailures if (numFailures >= MAX_FAILURES_PER_EXEC_STAGE) { if (excludedExecs.add(exec)) { - logInfo(s"Excluding executor ${exec} for stage $stageId") + logInfo(log"Excluding executor ${MDC(EXECUTOR_ID, exec)} for" + + log" stage ${MDC(STAGE_ID, stageId)}") // This executor has been excluded for this stage. Let's check if it // the whole node should be excluded. val excludedExecutorsOnNode = @@ -149,7 +151,7 @@ private[scheduler] class TaskSetExcludelist( val numFailExec = excludedExecutorsOnNode.size if (numFailExec >= MAX_FAILED_EXEC_PER_NODE_STAGE) { if (excludedNodes.add(host)) { - logInfo(s"Excluding ${host} for stage $stageId") + logInfo(log"Excluding ${MDC(HOST, host)} for stage ${MDC(STAGE_ID, stageId)}") // SparkListenerNodeBlacklistedForStage is deprecated but post both events // to keep backward compatibility listenerBus.post( diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index d359b65caa93..73cd1d4ba69c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -258,7 +258,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // If the cluster manager gives us an executor on an excluded node (because it // already started allocating those resources before we informed it of our exclusion, // or if it ignored our exclusion), then we reject that executor immediately. - logInfo(s"Rejecting $executorId as it has been excluded.") + logInfo(log"Rejecting ${MDC(LogKeys.EXECUTOR_ID, executorId)} as it has been excluded.") context.sendFailure( new IllegalStateException(s"Executor is excluded due to failures: $executorId")) } else { @@ -269,8 +269,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } else { context.senderAddress } - logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId, " + - s" ResourceProfileId $resourceProfileId") + logInfo(log"Registered executor ${MDC(RPC_ENDPOINT_REF, executorRef)}" + + log" (${MDC(RPC_ADDRESS, executorAddress)})" + + log" with ID ${MDC(LogKeys.EXECUTOR_ID, executorId)}," + + log" ResourceProfileId ${MDC(RESOURCE_PROFILE_ID, resourceProfileId)}") addressToExecutorId(executorAddress) = executorId totalCoreCount.addAndGet(cores) totalRegisteredExecutors.addAndGet(1) @@ -324,7 +326,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case UpdateExecutorsLogLevel(logLevel) => currentLogLevel = Some(logLevel) - logInfo(s"Asking each executor to refresh the log level to $logLevel") + logInfo(log"Asking each executor to refresh the log level to ${MDC(LOG_LEVEL, logLevel)}") for ((_, executorData) <- executorDataMap) { executorData.executorEndpoint.send(UpdateExecutorLogLevel(logLevel)) } @@ -497,7 +499,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // forever. Therefore, we should also post `SparkListenerExecutorRemoved` here. listenerBus.post(SparkListenerExecutorRemoved( System.currentTimeMillis(), executorId, reason.toString)) - logInfo(s"Asked to remove non-existent executor $executorId") + logInfo(log"Asked to remove non-existent executor" + + log" ${MDC(LogKeys.EXECUTOR_ID, executorId)}") } } @@ -526,7 +529,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } if (shouldDisable) { - logInfo(s"Disabling executor $executorId.") + logInfo(log"Disabling executor ${MDC(LogKeys.EXECUTOR_ID, executorId)}.") scheduler.executorLost(executorId, LossReasonPending) } @@ -570,7 +573,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp return executorsToDecommission.toImmutableArraySeq } - logInfo(s"Decommission executors: ${executorsToDecommission.mkString(", ")}") + logInfo(log"Decommission executors:" + + log" ${MDC(EXECUTOR_IDS, executorsToDecommission.mkString(", "))}") // If we don't want to replace the executors we are decommissioning if (adjustTargetNumExecutors) { @@ -589,7 +593,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp if (!triggeredByExecutor) { executorsToDecommission.foreach { executorId => - logInfo(s"Notify executor $executorId to decommission.") + logInfo(log"Notify executor ${MDC(LogKeys.EXECUTOR_ID, executorId)} to decommission.") executorDataMap(executorId).executorEndpoint.send(DecommissionExecutor) } } @@ -601,7 +605,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorsToDecommission.filter(executorsPendingDecommission.contains) } if (stragglers.nonEmpty) { - logInfo(s"${stragglers.toList} failed to decommission in ${cleanupInterval}, killing.") + logInfo(log"${MDC(EXECUTOR_IDS, stragglers.toList)} failed to decommission" + + log" in ${MDC(INTERVAL, cleanupInterval)}, killing.") killExecutors(stragglers.toImmutableArraySeq, false, false, true) } } @@ -718,13 +723,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp override def isReady(): Boolean = { if (sufficientResourcesRegistered()) { - logInfo("SchedulerBackend is ready for scheduling beginning after " + - s"reached minRegisteredResourcesRatio: $minRegisteredRatio") + logInfo(log"SchedulerBackend is ready for scheduling beginning after " + + log"reached minRegisteredResourcesRatio: ${MDC(MIN_REGISTER_RATIO, minRegisteredRatio)}") return true } if ((System.nanoTime() - createTimeNs) >= maxRegisteredWaitingTimeNs) { - logInfo("SchedulerBackend is ready for scheduling beginning after waiting " + - s"maxRegisteredResourcesWaitingTime: $maxRegisteredWaitingTimeNs(ns)") + logInfo(log"SchedulerBackend is ready for scheduling beginning after waiting " + + log"maxRegisteredResourcesWaitingTime: ${MDC(TIME_UNITS, maxRegisteredWaitingTimeNs)}(ns)") return true } false @@ -801,7 +806,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp "Attempted to request a negative number of additional executor(s) " + s"$numAdditionalExecutors from the cluster manager. Please specify a positive number!") } - logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager") + logInfo(log"Requesting ${MDC(NUM_EXECUTORS, numAdditionalExecutors)}" + + log" additional executor(s) from the cluster manager") val response = synchronized { val defaultProf = scheduler.sc.resourceProfileManager.defaultResourceProfile @@ -951,7 +957,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp adjustTargetNumExecutors: Boolean, countFailures: Boolean, force: Boolean): Seq[String] = { - logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}") + logInfo(log"Requesting to kill executor(s) ${MDC(EXECUTOR_IDS, executorIds.mkString(", "))}") val response = withLock { val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains) @@ -966,7 +972,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp .filter { id => force || !scheduler.isExecutorBusy(id) } executorsToKill.foreach { id => executorsPendingToRemove(id) = !countFailures } - logInfo(s"Actual list of executor(s) to be killed is ${executorsToKill.mkString(", ")}") + logInfo(log"Actual list of executor(s) to be killed is" + + log" ${MDC(EXECUTOR_IDS, executorsToKill.mkString(", "))}") // If we do not wish to replace the executors we kill, sync the target number of executors // with the cluster manager to avoid allocating new ones. When computing the new target, @@ -1007,7 +1014,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * @return whether the decommission request is acknowledged. */ final override def decommissionExecutorsOnHost(host: String): Boolean = { - logInfo(s"Requesting to kill any and all executors on host $host") + logInfo(log"Requesting to kill any and all executors on host ${MDC(HOST, host)}") // A potential race exists if a new executor attempts to register on a host // that is on the exclude list and is no longer valid. To avoid this race, // all executor registration and decommissioning happens in the event loop. This way, either @@ -1023,7 +1030,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * @return whether the kill request is acknowledged. */ final override def killExecutorsOnHost(host: String): Boolean = { - logInfo(s"Requesting to kill any and all executors on host $host") + logInfo(log"Requesting to kill any and all executors on host ${MDC(HOST, host)}") // A potential race exists if a new executor attempts to register on a host // that is on the exclude list and is no longer valid. To avoid this race, // all executor registration and killing happens in the event loop. This way, either diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 8f15dec6739a..8ac724c76d86 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -28,7 +28,7 @@ import org.apache.spark.deploy.{ApplicationDescription, Command} import org.apache.spark.deploy.client.{StandaloneAppClient, StandaloneAppClientListener} import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.internal.{config, Logging, MDC} -import org.apache.spark.internal.LogKeys.REASON +import org.apache.spark.internal.LogKeys._ import org.apache.spark.internal.config.EXECUTOR_REMOVE_DELAY import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} @@ -145,7 +145,7 @@ private[spark] class StandaloneSchedulerBackend( } override def connected(appId: String): Unit = { - logInfo("Connected to Spark cluster with app ID " + appId) + logInfo(log"Connected to Spark cluster with app ID ${MDC(APP_ID, appId)}") this.appId = appId notifyContext() launcherBackend.setAppId(appId) @@ -174,8 +174,9 @@ private[spark] class StandaloneSchedulerBackend( override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit = { - logInfo("Granted executor ID %s on hostPort %s with %d core(s), %s RAM".format( - fullId, hostPort, cores, Utils.megabytesToString(memory))) + logInfo(log"Granted executor ID ${MDC(EXECUTOR_ID, fullId)}" + + log" on hostPort ${MDC(HOST_PORT, hostPort)} with ${MDC(NUM_CORES, cores)} core(s)," + + log" ${MDC(MEMORY_SIZE, Utils.megabytesToString(memory))} RAM") } override def executorRemoved( @@ -192,23 +193,24 @@ private[spark] class StandaloneSchedulerBackend( case Some(code) => ExecutorExited(code, exitCausedByApp = true, message) case None => ExecutorProcessLost(message, workerHost, causedByApp = workerHost.isEmpty) } - logInfo("Executor %s removed: %s".format(fullId, message)) + logInfo(log"Executor ${MDC(EXECUTOR_ID, fullId)} removed: ${MDC(MESSAGE, message)}") removeExecutor(fullId.split("/")(1), reason) } override def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = { - logInfo(s"Asked to decommission executor $fullId") + logInfo(log"Asked to decommission executor ${MDC(EXECUTOR_ID, fullId)}") val execId = fullId.split("/")(1) decommissionExecutors( Array((execId, decommissionInfo)), adjustTargetNumExecutors = false, triggeredByExecutor = false) - logInfo("Executor %s decommissioned: %s".format(fullId, decommissionInfo)) + logInfo(log"Executor ${MDC(EXECUTOR_ID, fullId)}" + + log" decommissioned: ${MDC(DECOMMISSION_INFO, decommissionInfo )}") } override def workerRemoved(workerId: String, host: String, message: String): Unit = { - logInfo("Worker %s removed: %s".format(workerId, message)) + logInfo(log"Worker ${MDC(WORKER_ID, workerId)} removed: ${MDC(MESSAGE, message)}") removeWorker(workerId, host, message) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala index c389b0c988f4..16062f152446 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala @@ -19,13 +19,12 @@ package org.apache.spark.scheduler.dynalloc import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic.AtomicLong - import scala.collection.mutable import scala.jdk.CollectionConverters._ - import org.apache.spark._ import org.apache.spark.errors.SparkCoreErrors -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys.{EXECUTOR_ID, NUM_DRIVERS, NUM_EXECUTORS, NUM_GRACEFULLY_DECOMMISSIONED, NUM_UNEXPECTEDLY_EXIT, NUM_UNFINISHED_DECOMMISSION} import org.apache.spark.internal.config._ import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID import org.apache.spark.scheduler._ @@ -342,7 +341,8 @@ private[spark] class ExecutorMonitor( override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = { val exec = ensureExecutorIsTracked(event.executorId, event.executorInfo.resourceProfileId) exec.updateRunningTasks(0) - logInfo(s"New executor ${event.executorId} has registered (new total is ${executors.size()})") + logInfo(log"New executor ${MDC(EXECUTOR_ID, event.executorId)} has registered" + + log" (new total is ${MDC(NUM_EXECUTORS, executors.size())})") } private def decrementExecResourceProfileCount(rpId: Int): Unit = { @@ -365,11 +365,14 @@ private[spark] class ExecutorMonitor( } else { metrics.exitedUnexpectedly.inc() } - logInfo(s"Executor ${event.executorId} is removed. Remove reason statistics: (" + - s"gracefully decommissioned: ${metrics.gracefullyDecommissioned.getCount()}, " + - s"decommision unfinished: ${metrics.decommissionUnfinished.getCount()}, " + - s"driver killed: ${metrics.driverKilled.getCount()}, " + - s"unexpectedly exited: ${metrics.exitedUnexpectedly.getCount()}).") + logInfo(log"Executor ${MDC(EXECUTOR_ID, event.executorId)} is removed. Remove reason" + + log" statistics: (gracefully decommissioned:" + + log" ${MDC(NUM_GRACEFULLY_DECOMMISSIONED, metrics.gracefullyDecommissioned.getCount())}," + + log" decommision unfinished:" + + log" ${MDC(NUM_UNFINISHED_DECOMMISSION, metrics.decommissionUnfinished.getCount())}," + + log" driver killed: ${MDC(NUM_DRIVERS, metrics.driverKilled.getCount())}," + + log" unexpectedly exited:" + + log" ${MDC(NUM_UNEXPECTEDLY_EXIT, metrics.exitedUnexpectedly.getCount())}).") if (!removed.pendingRemoval || !removed.decommissioning) { nextTimeout.set(Long.MinValue) } From 06e7f7efb8be4d4d4815a705bd9dc39b5f3d4d9f Mon Sep 17 00:00:00 2001 From: Tuan Pham Date: Fri, 24 May 2024 09:10:28 +1000 Subject: [PATCH 03/12] Fix formatting --- .../apache/spark/deploy/history/HistoryServerDiskManager.scala | 3 ++- .../org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala index 56faa75962e6..122ed299242f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala @@ -27,6 +27,7 @@ import org.apache.commons.io.FileUtils import org.apache.spark.SparkConf import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys._ +import org.apache.spark.internal.config.History import org.apache.spark.internal.config.History._ import org.apache.spark.internal.config.History.HybridStoreDiskBackend.ROCKSDB import org.apache.spark.status.KVUtils @@ -58,7 +59,7 @@ private class HistoryServerDiskManager( throw new IllegalArgumentException(s"Failed to create app directory ($appStoreDir).") } private val extension = - if (conf.get(HYBRID_STORE_DISK_BACKEND) == ROCKSDB.toString) ".rdb" else ".ldb" + if (conf.get(History.HYBRID_STORE_DISK_BACKEND) == ROCKSDB.toString) ".rdb" else ".ldb" private val tmpStoreDir = new File(path, "temp") if (!tmpStoreDir.isDirectory() && !tmpStoreDir.mkdir()) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala index 16062f152446..f559a55dddc1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala @@ -19,8 +19,10 @@ package org.apache.spark.scheduler.dynalloc import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic.AtomicLong + import scala.collection.mutable import scala.jdk.CollectionConverters._ + import org.apache.spark._ import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.internal.{Logging, MDC} From ed4624e00cf3deb05cb20a8485d8355167d4ee13 Mon Sep 17 00:00:00 2001 From: Tuan Pham Date: Fri, 24 May 2024 10:52:34 +1000 Subject: [PATCH 04/12] Fix formatting --- .../main/scala/org/apache/spark/scheduler/HealthTracker.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala index df5190ceb78b..910afd79673d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala @@ -160,7 +160,7 @@ private[scheduler] class HealthTracker ( val fullMsg = if (EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED) { log"${MDC(MESSAGE, msg)} (actually decommissioning)" } else { - log"$MDC(MESSAGE, msg)" + log"${MDC(MESSAGE, msg)}" } allocationClient match { case Some(a) => From 9ad231bbd75756ab3e576466447387650209f451 Mon Sep 17 00:00:00 2001 From: Tuan Pham Date: Fri, 24 May 2024 11:51:43 +1000 Subject: [PATCH 05/12] Order LogKeys --- .../org/apache/spark/internal/LogKey.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 35db39ec144b..7233b8f55182 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -50,8 +50,8 @@ object LogKeys { case object APP_STATE extends LogKey case object ARCHIVE_NAME extends LogKey case object ARGS extends LogKey - case object AUTH_ENABLED extends LogKey case object ATTRIBUTE_MAP extends LogKey + case object AUTH_ENABLED extends LogKey case object BACKUP_FILE extends LogKey case object BARRIER_EPOCH extends LogKey case object BARRIER_ID extends LogKey @@ -364,9 +364,9 @@ object LogKeys { case object MINI_BATCH_FRACTION extends LogKey case object MIN_COMPACTION_BATCH_ID extends LogKey case object MIN_FREQUENT_PATTERN_COUNT extends LogKey - case object MIN_REGISTER_RATIO extends LogKey case object MIN_NUM_FREQUENT_PATTERN extends LogKey case object MIN_POINT_PER_CLUSTER extends LogKey + case object MIN_REGISTER_RATIO extends LogKey case object MIN_SHARE extends LogKey case object MIN_SIZE extends LogKey case object MIN_TIME extends LogKey @@ -476,9 +476,9 @@ object LogKeys { case object NUM_SUB_DIRS extends LogKey case object NUM_TASKS extends LogKey case object NUM_TASK_CPUS extends LogKey + case object NUM_TRAIN_WORD extends LogKey case object NUM_UNEXPECTEDLY_EXIT extends LogKey case object NUM_UNFINISHED_DECOMMISSION extends LogKey - case object NUM_TRAIN_WORD extends LogKey case object NUM_VERSIONS_RETAIN extends LogKey case object NUM_WEIGHTED_EXAMPLES extends LogKey case object OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD extends LogKey @@ -546,8 +546,8 @@ object LogKeys { case object PYTHON_VERSION extends LogKey case object PYTHON_WORKER_MODULE extends LogKey case object PYTHON_WORKER_RESPONSE extends LogKey - case object QUERY_CACHE_VALUE extends LogKey case object QUANTILES extends LogKey + case object QUERY_CACHE_VALUE extends LogKey case object QUERY_HINT extends LogKey case object QUERY_ID extends LogKey case object QUERY_PLAN extends LogKey @@ -556,8 +556,8 @@ object LogKeys { case object QUERY_PLAN_LENGTH_MAX extends LogKey case object QUERY_RUN_ID extends LogKey case object RANGE extends LogKey - case object RDD_CREATION_SITE extends LogKey case object RDD_CHECKPOINT_DIR extends LogKey + case object RDD_CREATION_SITE extends LogKey case object RDD_DEBUG_STRING extends LogKey case object RDD_DESCRIPTION extends LogKey case object RDD_ID extends LogKey @@ -573,8 +573,8 @@ object LogKeys { case object REDACTED_STATEMENT extends LogKey case object REDUCE_ID extends LogKey case object REGEX extends LogKey - case object REGISTER_MERGE_RESULT extends LogKey case object REGISTERED_EXECUTOR_FILE extends LogKey + case object REGISTER_MERGE_RESULT extends LogKey case object RELATION_NAME extends LogKey case object RELATION_OUTPUT extends LogKey case object RELATIVE_TOLERANCE extends LogKey @@ -592,10 +592,10 @@ object LogKeys { case object RESOURCE_PROFILE_TO_TOTAL_EXECS extends LogKey case object RESPONSE extends LogKey case object RESPONSE_BODY_SIZE extends LogKey + case object REST_PROTOCOL_RESPONSE_MESSAGE_TYPE extends LogKey case object RESULT extends LogKey case object RESULT_SIZE_BYTES extends LogKey case object RESULT_SIZE_BYTES_MAX extends LogKey - case object REST_PROTOCOL_RESPONSE_MESSAGE_TYPE extends LogKey case object RETRY_COUNT extends LogKey case object RETRY_INTERVAL extends LogKey case object RETRY_WAIT_TIME extends LogKey @@ -654,8 +654,8 @@ object LogKeys { case object SQL_TEXT extends LogKey case object SRC_PATH extends LogKey case object STAGE extends LogKey - case object STAGES extends LogKey case object STAGE2 extends LogKey + case object STAGES extends LogKey case object STAGE_ATTEMPT extends LogKey case object STAGE_ATTEMPT_ID extends LogKey case object STAGE_ATTEMPT_NUMBER extends LogKey @@ -758,8 +758,8 @@ object LogKeys { case object URI extends LogKey case object URIS extends LogKey case object URL extends LogKey - case object URLS extends LogKey case object URL2 extends LogKey + case object URLS extends LogKey case object USER_ID extends LogKey case object USER_NAME extends LogKey case object UUID extends LogKey @@ -774,8 +774,8 @@ object LogKeys { case object WEIGHT extends LogKey case object WORKER extends LogKey case object WORKER_HOST extends LogKey - case object WORKER_MEMORY extends LogKey case object WORKER_ID extends LogKey + case object WORKER_MEMORY extends LogKey case object WORKER_PORT extends LogKey case object WORKER_URL extends LogKey case object WRITE_AHEAD_LOG_INFO extends LogKey From cb0b9803674f6554c7d98c1cb6ce05f13144b77c Mon Sep 17 00:00:00 2001 From: Tuan Pham Date: Sat, 25 May 2024 15:56:03 +1000 Subject: [PATCH 06/12] Migrate to structure logging --- .../internal/plugin/PluginContainer.scala | 19 +++++++----- .../spark/status/AppStatusListener.scala | 4 +-- .../apache/spark/status/AppStatusStore.scala | 2 +- .../scala/org/apache/spark/util/Utils.scala | 29 ++++++++++--------- 4 files changed, 29 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala b/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala index 261e016ce9bf..a0c07bd75f88 100644 --- a/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala +++ b/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala @@ -22,7 +22,7 @@ import scala.util.{Either, Left, Right} import org.apache.spark.{SparkContext, SparkEnv, TaskFailedReason} import org.apache.spark.api.plugin._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.config._ import org.apache.spark.resource.ResourceInformation import org.apache.spark.util.Utils @@ -56,7 +56,7 @@ private class DriverPluginContainer( sc.conf.set(s"${PluginContainer.EXTRA_CONF_PREFIX}$name.$k", v) } } - logInfo(s"Initialized driver component for plugin $name.") + logInfo(log"Initialized driver component for plugin ${MDC(LogKeys.CLASS_NAME, name)}.") Some((p.getClass().getName(), driverPlugin, ctx)) } else { None @@ -83,7 +83,7 @@ private class DriverPluginContainer( plugin.shutdown() } catch { case t: Throwable => - logInfo(s"Exception while shutting down plugin $name.", t) + logInfo(log"Exception while shutting down plugin ${MDC(LogKeys.CLASS_NAME, name)}.", t) } } } @@ -125,7 +125,7 @@ private class ExecutorPluginContainer( executorPlugin.init(ctx, extraConf) ctx.registerMetrics() - logInfo(s"Initialized executor component for plugin $name.") + logInfo(log"Initialized executor component for plugin ${MDC(LogKeys.CLASS_NAME, name)}.") Some(p.getClass().getName() -> executorPlugin) } else { None @@ -144,7 +144,7 @@ private class ExecutorPluginContainer( plugin.shutdown() } catch { case t: Throwable => - logInfo(s"Exception while shutting down plugin $name.", t) + logInfo(log"Exception while shutting down plugin ${MDC(LogKeys.CLASS_NAME, name)}.", t) } } } @@ -155,7 +155,8 @@ private class ExecutorPluginContainer( plugin.onTaskStart() } catch { case t: Throwable => - logInfo(s"Exception while calling onTaskStart on plugin $name.", t) + logInfo(log"Exception while calling onTaskStart on" + + log" plugin ${MDC(LogKeys.CLASS_NAME, name)}.", t) } } } @@ -166,7 +167,8 @@ private class ExecutorPluginContainer( plugin.onTaskSucceeded() } catch { case t: Throwable => - logInfo(s"Exception while calling onTaskSucceeded on plugin $name.", t) + logInfo(log"Exception while calling onTaskSucceeded on" + + log" plugin ${MDC(LogKeys.CLASS_NAME, name)}.", t) } } } @@ -177,7 +179,8 @@ private class ExecutorPluginContainer( plugin.onTaskFailed(failureReason) } catch { case t: Throwable => - logInfo(s"Exception while calling onTaskFailed on plugin $name.", t) + logInfo(log"Exception while calling onTaskFailed on" + + log" plugin ${MDC(LogKeys.CLASS_NAME, name)}.", t) } } } diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 24f4ff1bd672..5c93bf4bf77a 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -26,7 +26,7 @@ import scala.jdk.CollectionConverters._ import org.apache.spark._ import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.config.CPUS_PER_TASK import org.apache.spark.internal.config.Status._ import org.apache.spark.resource.ResourceProfile.CPUS @@ -662,7 +662,7 @@ private[spark] class AppStatusListener( case e: TaskFailedReason => // All other failure cases Some(e.toErrorString) case other => - logInfo(s"Unhandled task end reason: $other") + logInfo(log"Unhandled task end reason: ${MDC(LogKeys.REASON, other)}") None } task.errorMessage = errorMessage diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 29e21e7f5ffe..87f876467c30 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -861,7 +861,7 @@ private[spark] object AppStatusStore extends Logging { def createStorePath(rootDir: String): Option[File] = { try { val localDir = Utils.createDirectory(rootDir, "spark-ui") - logInfo(s"Created spark ui store directory at $rootDir") + logInfo(log"Created spark ui store directory at ${MDC(PATH, rootDir)}") Some(localDir) } catch { case e: IOException => diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 9a669021be75..0ac1405abe6c 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -564,10 +564,8 @@ private[spark] object Utils // Do nothing if the file contents are the same, i.e. this file has been copied // previously. logInfo( - "%s has been previously copied to %s".format( - sourceFile.getAbsolutePath, - destFile.getAbsolutePath - ) + log"${MDC(SOURCE_PATH, sourceFile.getAbsolutePath)} has been previously" + + log" copied to ${MDC(DESTINATION_PATH, destFile.getAbsolutePath)}" ) return } @@ -577,7 +575,8 @@ private[spark] object Utils if (removeSourceFile) { Files.move(sourceFile.toPath, destFile.toPath) } else { - logInfo(s"Copying ${sourceFile.getAbsolutePath} to ${destFile.getAbsolutePath}") + logInfo(log"Copying ${MDC(SOURCE_PATH, sourceFile.getAbsolutePath)}" + + log" to ${MDC(DESTINATION_PATH, destFile.getAbsolutePath)}") copyRecursive(sourceFile, destFile) } } @@ -1203,7 +1202,7 @@ private[spark] object Utils val process = builder.start() if (redirectStderr) { val threadName = "redirect stderr for command " + command(0) - def log(s: String): Unit = logInfo(s) + def log(s: String): Unit = logInfo(log"${MDC(LINE, s)}") processStreamByLine(threadName, process.getErrorStream, log) } process @@ -2166,7 +2165,8 @@ private[spark] object Utils } try { val (service, port) = startService(tryPort) - logInfo(s"Successfully started service$serviceString on port $port.") + logInfo(log"Successfully started service${MDC(SERVICE_NAME, serviceString)}" + + log" on port ${MDC(PORT, port)}.") return (service, port) } catch { case e: Exception if isBindCollision(e) => @@ -2541,9 +2541,10 @@ private[spark] object Utils conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS), conf.get(EXECUTOR_INSTANCES).getOrElse(0)).max - logInfo(s"Using initial executors = $initialExecutors, max of " + - s"${DYN_ALLOCATION_INITIAL_EXECUTORS.key}, ${DYN_ALLOCATION_MIN_EXECUTORS.key} and " + - s"${EXECUTOR_INSTANCES.key}") + logInfo(log"Using initial executors = ${MDC(NUM_EXECUTORS, initialExecutors)}, max of " + + log"${MDC(CONFIG, DYN_ALLOCATION_INITIAL_EXECUTORS.key)}," + + log"${MDC(CONFIG2, DYN_ALLOCATION_MIN_EXECUTORS.key)} and" + + log" ${MDC(CONFIG3, EXECUTOR_INSTANCES.key)}") initialExecutors } @@ -2731,7 +2732,7 @@ private[spark] object Utils e.getCause() match { case uoe: UnsupportedOperationException => logDebug(s"Extension $name not being initialized.", uoe) - logInfo(s"Extension $name not being initialized.") + logInfo(log"Extension ${MDC(CLASS_NAME, name)} not being initialized.") None case null => throw e @@ -2755,8 +2756,8 @@ private[spark] object Utils // To handle master URLs, e.g., k8s://host:port. if (!masterWithoutK8sPrefix.contains("://")) { val resolvedURL = s"https://$masterWithoutK8sPrefix" - logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved " + - s"URL is $resolvedURL.") + logInfo(log"No scheme specified for kubernetes master URL, so defaulting to https." + + log" Resolved URL is ${MDC(LogKeys.URL, resolvedURL)}.") return s"k8s://$resolvedURL" } @@ -3009,7 +3010,7 @@ private[spark] object Utils entry = in.getNextEntry() } in.close() // so that any error in closing does not get ignored - logInfo(s"Unzipped from $dfsZipFile\n\t${files.mkString("\n\t")}") + logInfo(log"Unzipped from ${MDC(PATH, dfsZipFile)}\n\t${MDC(PATHS, files.mkString("\n\t"))}") } finally { // Close everything no matter what happened IOUtils.closeQuietly(in) From 6db64bd8b3b35df1b48a4f6e8300f48ae983592f Mon Sep 17 00:00:00 2001 From: Tuan Pham Date: Wed, 29 May 2024 12:57:23 +1000 Subject: [PATCH 07/12] Resolve review --- .../src/main/scala/org/apache/spark/internal/LogKey.scala | 1 + .../main/scala/org/apache/spark/executor/Executor.scala | 7 ++++--- .../org/apache/spark/internal/io/SparkHadoopWriter.scala | 4 ++-- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 1e43d206570c..544176a5f892 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -784,6 +784,7 @@ object LogKeys { case object URL extends LogKey case object URL2 extends LogKey case object URLS extends LogKey + case object EXECUTOR_USER_CLASS_PATH_FIRST extends LogKey case object USER_ID extends LogKey case object USER_NAME extends LogKey case object UUID extends LogKey diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index af37d61bfd82..bc023791fc89 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -221,7 +221,7 @@ private[spark] class Executor( if (sessionBasedRoot.isDirectory && sessionBasedRoot.exists()) { Utils.deleteRecursively(sessionBasedRoot) } - logInfo(log"Session evicted: ${LogMDC(UUID, state.sessionUUID)}") + logInfo(log"Session evicted: ${LogMDC(SESSION_ID, state.sessionUUID)}") } }) .build[String, IsolatedSessionState] @@ -1087,7 +1087,8 @@ private[spark] class Executor( private def createClassLoader(urls: Array[URL], useStub: Boolean): MutableURLClassLoader = { logInfo( log"Starting executor with user classpath" + - log" (userClassPathFirst = ${LogMDC(CLASS_PATH, userClassPathFirst)}): " + + log" (userClassPathFirst =" + + log" ${LogMDC(LogKeys.EXECUTOR_USER_CLASS_PATH_FIRST, userClassPathFirst)}): " + log"${LogMDC(URLS, urls.mkString("'", ",", "'"))}" ) @@ -1138,7 +1139,7 @@ private[spark] class Executor( parent } logInfo(log"Created or updated repl class loader ${LogMDC(CLASS_LOADER, classLoader)}" + - log" for ${LogMDC(UUID, sessionUUID)}.") + log" for ${LogMDC(SESSION_ID, sessionUUID)}.") classLoader } diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala index f86144d1e889..db961b3c42f4 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala @@ -33,7 +33,7 @@ import org.apache.hadoop.mapreduce.task.{TaskAttemptContextImpl => NewTaskAttemp import org.apache.spark.{SerializableWritable, SparkConf, SparkException, TaskContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys.{JOB_ID, TASK_ATTEMPT_ID, TOTAL_TIME} +import org.apache.spark.internal.LogKeys.{DURATION, JOB_ID, TASK_ATTEMPT_ID} import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.rdd.{HadoopRDD, RDD} import org.apache.spark.util.{SerializableConfiguration, SerializableJobConf, Utils} @@ -102,7 +102,7 @@ object SparkHadoopWriter extends Logging { val (_, duration) = Utils .timeTakenMs { committer.commitJob(jobContext, ret.toImmutableArraySeq) } logInfo(log"Write Job ${MDC(JOB_ID, jobContext.getJobID)} committed." + - log" Elapsed time: ${MDC(TOTAL_TIME, duration)} ms.") + log" Elapsed time: ${MDC(DURATION, duration)} ms.") } catch { case cause: Throwable => logError(log"Aborting job ${MDC(JOB_ID, jobContext.getJobID)}.", cause) From 7865b7281e11ce7cd2ee455266ca6ddd48109d84 Mon Sep 17 00:00:00 2001 From: Tuan Pham Date: Wed, 29 May 2024 12:58:54 +1000 Subject: [PATCH 08/12] Fix key ordering --- .../utils/src/main/scala/org/apache/spark/internal/LogKey.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 544176a5f892..2b80b8efef2e 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -206,6 +206,7 @@ object LogKeys { case object EXECUTOR_SHUFFLE_INFO extends LogKey case object EXECUTOR_STATE extends LogKey case object EXECUTOR_TIMEOUT extends LogKey + case object EXECUTOR_USER_CLASS_PATH_FIRST extends LogKey case object EXEC_AMOUNT extends LogKey case object EXISTING_FILE extends LogKey case object EXISTING_PATH extends LogKey @@ -784,7 +785,6 @@ object LogKeys { case object URL extends LogKey case object URL2 extends LogKey case object URLS extends LogKey - case object EXECUTOR_USER_CLASS_PATH_FIRST extends LogKey case object USER_ID extends LogKey case object USER_NAME extends LogKey case object UUID extends LogKey From e970298bca138aecca9f688371511146fd3b3389 Mon Sep 17 00:00:00 2001 From: Tuan Pham Date: Wed, 29 May 2024 14:29:50 +1000 Subject: [PATCH 09/12] fix ambiguous import. Remove unused key --- .../org/apache/spark/internal/LogKey.scala | 22 ------------------- .../org/apache/spark/executor/Executor.scala | 3 ++- 2 files changed, 2 insertions(+), 23 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 2b80b8efef2e..b5acdc434cd2 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -32,7 +32,6 @@ trait LogKey { object LogKeys { case object ACCUMULATOR_ID extends LogKey case object ACL_ENABLED extends LogKey - case object ACTUAL_BROADCAST_OUTPUT_STATUS_SIZE extends LogKey case object ACTUAL_NUM_FILES extends LogKey case object ACTUAL_PARTITION_COLUMN extends LogKey case object ADDED_JARS extends LogKey @@ -151,8 +150,6 @@ object LogKeys { case object DATA_FILE extends LogKey case object DATA_SOURCE extends LogKey case object DATA_SOURCES extends LogKey - case object DECOMMISSION_INFO extends LogKey - case object DEFAULT_COMPACTION_INTERVAL extends LogKey case object DEFAULT_COMPACT_INTERVAL extends LogKey case object DEFAULT_ISOLATION_LEVEL extends LogKey case object DEFAULT_NAME extends LogKey @@ -255,8 +252,6 @@ object LogKeys { case object HADOOP_VERSION extends LogKey case object HASH_JOIN_KEYS extends LogKey case object HASH_MAP_SIZE extends LogKey - case object HAS_R_PACKAGE extends LogKey - case object HEADING extends LogKey case object HEARTBEAT extends LogKey case object HEARTBEAT_INTERVAL extends LogKey case object HISTORY_DIR extends LogKey @@ -281,7 +276,6 @@ object LogKeys { case object INIT extends LogKey case object INITIAL_CAPACITY extends LogKey case object INITIAL_HEARTBEAT_INTERVAL extends LogKey - case object INITIAL_REGISTRATION_RETRIES extends LogKey case object INIT_MODE extends LogKey case object INPUT extends LogKey case object INPUT_SPLIT extends LogKey @@ -294,7 +288,6 @@ object LogKeys { case object JAVA_VERSION extends LogKey case object JAVA_VM_NAME extends LogKey case object JOB_ID extends LogKey - case object JOB_TAG extends LogKey case object JOIN_CONDITION extends LogKey case object JOIN_CONDITION_SUB_EXPR extends LogKey case object JOIN_TYPE extends LogKey @@ -370,10 +363,8 @@ object LogKeys { case object METRIC_NAME extends LogKey case object MINI_BATCH_FRACTION extends LogKey case object MIN_COMPACTION_BATCH_ID extends LogKey - case object MIN_FREQUENT_PATTERN_COUNT extends LogKey case object MIN_NUM_FREQUENT_PATTERN extends LogKey case object MIN_POINT_PER_CLUSTER extends LogKey - case object MIN_REGISTER_RATIO extends LogKey case object MIN_SHARE extends LogKey case object MIN_SIZE extends LogKey case object MIN_TIME extends LogKey @@ -441,7 +432,6 @@ object LogKeys { case object NUM_FILES_FAILED_TO_DELETE extends LogKey case object NUM_FILES_REUSED extends LogKey case object NUM_FREQUENT_ITEMS extends LogKey - case object NUM_GRACEFULLY_DECOMMISSIONED extends LogKey case object NUM_HOST_LOCAL_BLOCKS extends LogKey case object NUM_INDEX_FILE extends LogKey case object NUM_INDEX_FILES extends LogKey @@ -535,7 +525,6 @@ object LogKeys { case object PATHS extends LogKey case object PEER extends LogKey case object PERCENT extends LogKey - case object PERCENTILE_HEADER extends LogKey case object PIPELINE_STAGE_UID extends LogKey case object PLUGIN_NAME extends LogKey case object POD_ID extends LogKey @@ -579,7 +568,6 @@ object LogKeys { case object QUERY_RUN_ID extends LogKey case object RANGE extends LogKey case object RDD_CHECKPOINT_DIR extends LogKey - case object RDD_CREATION_SITE extends LogKey case object RDD_DEBUG_STRING extends LogKey case object RDD_DESCRIPTION extends LogKey case object RDD_ID extends LogKey @@ -613,13 +601,10 @@ object LogKeys { case object RESOURCE_PROFILE_ID extends LogKey case object RESOURCE_PROFILE_IDS extends LogKey case object RESOURCE_PROFILE_TO_TOTAL_EXECS extends LogKey - case object RESPONSE extends LogKey case object RESPONSE_BODY_SIZE extends LogKey - case object REST_PROTOCOL_RESPONSE_MESSAGE_TYPE extends LogKey case object RESULT extends LogKey case object RESULT_SIZE_BYTES extends LogKey case object RESULT_SIZE_BYTES_MAX extends LogKey - case object RETRY_COUNT extends LogKey case object RETRY_INTERVAL extends LogKey case object RETRY_WAIT_TIME extends LogKey case object RIGHT_EXPR extends LogKey @@ -679,8 +664,6 @@ object LogKeys { case object STAGE extends LogKey case object STAGES extends LogKey case object STAGE_ATTEMPT extends LogKey - case object STAGE_ATTEMPT_ID extends LogKey - case object STAGE_ATTEMPT_NUMBER extends LogKey case object STAGE_ID extends LogKey case object STAGE_NAME extends LogKey case object START_INDEX extends LogKey @@ -690,7 +673,6 @@ object LogKeys { case object STATE_STORE_VERSION extends LogKey case object STATS extends LogKey case object STATUS extends LogKey - case object STAT_COUNTER extends LogKey case object STDERR extends LogKey case object STOP_SITE_SHORT_FORM extends LogKey case object STORAGE_LEVEL extends LogKey @@ -719,7 +701,6 @@ object LogKeys { case object TARGET_NUM_EXECUTOR extends LogKey case object TARGET_NUM_EXECUTOR_DELTA extends LogKey case object TARGET_PATH extends LogKey - case object TASK extends LogKey case object TASK_ATTEMPT_ID extends LogKey case object TASK_ID extends LogKey case object TASK_LOCALITY extends LogKey @@ -760,8 +741,6 @@ object LogKeys { case object TOPIC_PARTITION_OFFSET_RANGE extends LogKey case object TOTAL extends LogKey case object TOTAL_EFFECTIVE_TIME extends LogKey - case object TOTAL_RECORDS_READ extends LogKey - case object TOTAL_REGISTRATION_RETRIES extends LogKey case object TOTAL_SIZE extends LogKey case object TOTAL_TIME extends LogKey case object TOTAL_TIME_READ extends LogKey @@ -800,7 +779,6 @@ object LogKeys { case object WORKER extends LogKey case object WORKER_HOST extends LogKey case object WORKER_ID extends LogKey - case object WORKER_MEMORY extends LogKey case object WORKER_PORT extends LogKey case object WORKER_URL extends LogKey case object WRITE_AHEAD_LOG_INFO extends LogKey diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index bc023791fc89..c55020f73b25 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -42,6 +42,7 @@ import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.{Logging, LogKeys, MDC => LogMDC} import org.apache.spark.internal.LogKeys._ +import org.apache.spark.internal.config.{EXECUTOR_USER_CLASS_PATH_FIRST => EXECUTOR_USER_CLASS_PATH_FIRST_CONFIG} import org.apache.spark.internal.config._ import org.apache.spark.internal.plugin.PluginContainer import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager} @@ -171,7 +172,7 @@ private[spark] class Executor( } // Whether to load classes in user jars before those in Spark jars - private val userClassPathFirst = conf.get(EXECUTOR_USER_CLASS_PATH_FIRST) + private val userClassPathFirst = conf.get(EXECUTOR_USER_CLASS_PATH_FIRST_CONFIG) // Whether to monitor killed / interrupted tasks private val taskReaperEnabled = conf.get(TASK_REAPER_ENABLED) From a408ebc9e1f034fdc1ab251589501d9db8f0ab5b Mon Sep 17 00:00:00 2001 From: Tuan Pham Date: Wed, 29 May 2024 15:04:21 +1000 Subject: [PATCH 10/12] Reorder import --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index c55020f73b25..53e84c54079d 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -40,10 +40,10 @@ import org.slf4j.MDC import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.{Logging, LogKeys, MDC => LogMDC} -import org.apache.spark.internal.LogKeys._ import org.apache.spark.internal.config.{EXECUTOR_USER_CLASS_PATH_FIRST => EXECUTOR_USER_CLASS_PATH_FIRST_CONFIG} import org.apache.spark.internal.config._ +import org.apache.spark.internal.{Logging, LogKeys, MDC => LogMDC} +import org.apache.spark.internal.LogKeys._ import org.apache.spark.internal.plugin.PluginContainer import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager} import org.apache.spark.metrics.source.JVMCPUSource From 32d0cd21c1382229104c272ede8f9c0f56eaaa4f Mon Sep 17 00:00:00 2001 From: Tuan Pham Date: Wed, 29 May 2024 16:39:25 +1000 Subject: [PATCH 11/12] Reorder import --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 53e84c54079d..4e5d151468d8 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -40,10 +40,10 @@ import org.slf4j.MDC import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.config.{EXECUTOR_USER_CLASS_PATH_FIRST => EXECUTOR_USER_CLASS_PATH_FIRST_CONFIG} -import org.apache.spark.internal.config._ import org.apache.spark.internal.{Logging, LogKeys, MDC => LogMDC} import org.apache.spark.internal.LogKeys._ +import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.{EXECUTOR_USER_CLASS_PATH_FIRST => EXECUTOR_USER_CLASS_PATH_FIRST_CONFIG} import org.apache.spark.internal.plugin.PluginContainer import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager} import org.apache.spark.metrics.source.JVMCPUSource From f1ed4f230204326c08c7be2fb96a0065ff746dc8 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 29 May 2024 10:07:05 -0700 Subject: [PATCH 12/12] Update common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala --- .../utils/src/main/scala/org/apache/spark/internal/LogKey.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index b5acdc434cd2..c9b4da5919f9 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -382,7 +382,6 @@ object LogKeys { case object NEW_VALUE extends LogKey case object NEXT_RENEWAL_TIME extends LogKey case object NODES extends LogKey - case object NODE_IDS extends LogKey case object NODE_LOCATION extends LogKey case object NON_BUILT_IN_CONNECTORS extends LogKey case object NORM extends LogKey