diff --git a/common/utils/pom.xml b/common/utils/pom.xml index d360e041dd64..1dbf2a769fff 100644 --- a/common/utils/pom.xml +++ b/common/utils/pom.xml @@ -98,6 +98,10 @@ org.apache.logging.log4j log4j-1.2-api + + org.apache.logging.log4j + log4j-layout-template-json + target/scala-${scala.binary.version}/classes diff --git a/common/utils/src/main/resources/org/apache/spark/SparkLayout.json b/common/utils/src/main/resources/org/apache/spark/SparkLayout.json new file mode 100644 index 000000000000..b0d8ea27ffbc --- /dev/null +++ b/common/utils/src/main/resources/org/apache/spark/SparkLayout.json @@ -0,0 +1,38 @@ +{ + "ts": { + "$resolver": "timestamp" + }, + "level": { + "$resolver": "level", + "field": "name" + }, + "msg": { + "$resolver": "message", + "stringified": true + }, + "context": { + "$resolver": "mdc" + }, + "exception": { + "class": { + "$resolver": "exception", + "field": "className" + }, + "msg": { + "$resolver": "exception", + "field": "message", + "stringified": true + }, + "stacktrace": { + "$resolver": "exception", + "field": "stackTrace", + "stackTrace": { + "stringified": true + } + } + }, + "logger": { + "$resolver": "logger", + "field": "name" + } +} \ No newline at end of file diff --git a/common/utils/src/main/resources/org/apache/spark/log4j2-defaults.properties b/common/utils/src/main/resources/org/apache/spark/log4j2-defaults.properties index 777c5f2b2591..9be86b650d09 100644 --- a/common/utils/src/main/resources/org/apache/spark/log4j2-defaults.properties +++ b/common/utils/src/main/resources/org/apache/spark/log4j2-defaults.properties @@ -22,8 +22,8 @@ rootLogger.appenderRef.stdout.ref = console appender.console.type = Console appender.console.name = console appender.console.target = SYSTEM_ERR -appender.console.layout.type = PatternLayout -appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex +appender.console.layout.type = JsonTemplateLayout +appender.console.layout.eventTemplateUri = classpath:org/apache/spark/SparkLayout.json # Settings to quiet third party logs that are too verbose logger.jetty.name = org.sparkproject.jetty diff --git a/common/utils/src/main/resources/org/apache/spark/log4j2-pattern-layout-defaults.properties b/common/utils/src/main/resources/org/apache/spark/log4j2-pattern-layout-defaults.properties new file mode 100644 index 000000000000..777c5f2b2591 --- /dev/null +++ b/common/utils/src/main/resources/org/apache/spark/log4j2-pattern-layout-defaults.properties @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the console +rootLogger.level = info +rootLogger.appenderRef.stdout.ref = console + +appender.console.type = Console +appender.console.name = console +appender.console.target = SYSTEM_ERR +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex + +# Settings to quiet third party logs that are too verbose +logger.jetty.name = org.sparkproject.jetty +logger.jetty.level = warn +logger.jetty2.name = org.sparkproject.jetty.util.component.AbstractLifeCycle +logger.jetty2.level = error +logger.repl1.name = org.apache.spark.repl.SparkIMain$exprTyper +logger.repl1.level = info +logger.repl2.name = org.apache.spark.repl.SparkILoop$SparkILoopInterpreter +logger.repl2.level = info + +# Set the default spark-shell log level to WARN. When running the spark-shell, the +# log level for this class is used to overwrite the root logger's log level, so that +# the user can have different defaults for the shell and regular Spark apps. +logger.repl.name = org.apache.spark.repl.Main +logger.repl.level = warn + +# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs +# in SparkSQL with Hive support +logger.metastore.name = org.apache.hadoop.hive.metastore.RetryingHMSHandler +logger.metastore.level = fatal +logger.hive_functionregistry.name = org.apache.hadoop.hive.ql.exec.FunctionRegistry +logger.hive_functionregistry.level = error + +# Parquet related logging +logger.parquet.name = org.apache.parquet.CorruptStatistics +logger.parquet.level = error +logger.parquet2.name = parquet.CorruptStatistics +logger.parquet2.level = error diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala new file mode 100644 index 000000000000..6ab6ac0eb58a --- /dev/null +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.internal + +/** + * Various keys used for mapped diagnostic contexts(MDC) in logging. + * All structured logging keys should be defined here for standardization. + */ +object LogKey extends Enumeration { + val EXECUTOR_ID = Value +} diff --git a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala index 80c622bd5328..d7b66faa6070 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala @@ -17,9 +17,12 @@ package org.apache.spark.internal +import java.util.Locale + import scala.jdk.CollectionConverters._ -import org.apache.logging.log4j.{Level, LogManager} +import org.apache.logging.log4j.{CloseableThreadContext, Level, LogManager} +import org.apache.logging.log4j.CloseableThreadContext.Instance import org.apache.logging.log4j.core.{Filter, LifeCycle, LogEvent, Logger => Log4jLogger, LoggerContext} import org.apache.logging.log4j.core.appender.ConsoleAppender import org.apache.logging.log4j.core.config.DefaultConfiguration @@ -29,6 +32,38 @@ import org.slf4j.{Logger, LoggerFactory} import org.apache.spark.internal.Logging.SparkShellLoggingFilter import org.apache.spark.util.SparkClassUtils +/** + * Mapped Diagnostic Context (MDC) that will be used in log messages. + * The values of the MDC will be inline in the log message, while the key-value pairs will be + * part of the ThreadContext. + */ +case class MDC(key: LogKey.Value, value: String) + +/** + * Wrapper class for log messages that include a logging context. + * This is used as the return type of the string interpolator `LogStringContext`. + */ +case class MessageWithContext(message: String, context: Option[Instance]) + +/** + * Companion class for lazy evaluation of the MessageWithContext instance. + */ +class LogEntry(messageWithContext: => MessageWithContext) { + def message: String = messageWithContext.message + + def context: Option[Instance] = messageWithContext.context +} + +/** + * Companion object for the wrapper to enable implicit conversions + */ +object LogEntry { + import scala.language.implicitConversions + + implicit def from(msgWithCtx: => MessageWithContext): LogEntry = + new LogEntry(msgWithCtx) +} + /** * Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows * logging messages at different levels using methods that only evaluate parameters lazily if the @@ -55,6 +90,33 @@ trait Logging { log_ } + implicit class LogStringContext(val sc: StringContext) { + def log(args: MDC*): MessageWithContext = { + val processedParts = sc.parts.iterator + val sb = new StringBuilder(processedParts.next()) + lazy val map = new java.util.HashMap[String, String]() + + args.foreach { mdc => + sb.append(mdc.value) + if (Logging.isStructuredLoggingEnabled) { + map.put(mdc.key.toString.toLowerCase(Locale.ROOT), mdc.value) + } + + if (processedParts.hasNext) { + sb.append(processedParts.next()) + } + } + + // Create a CloseableThreadContext and apply the context map + val closeableContext = if (Logging.isStructuredLoggingEnabled) { + Some(CloseableThreadContext.putAll(map)) + } else { + None + } + MessageWithContext(sb.toString(), closeableContext) + } + } + // Log methods that take only a String protected def logInfo(msg: => String): Unit = { if (log.isInfoEnabled) log.info(msg) @@ -76,6 +138,20 @@ trait Logging { if (log.isErrorEnabled) log.error(msg) } + protected def logError(entry: LogEntry): Unit = { + if (log.isErrorEnabled) { + log.error(entry.message) + entry.context.map(_.close()) + } + } + + protected def logError(entry: LogEntry, throwable: Throwable): Unit = { + if (log.isErrorEnabled) { + log.error(entry.message, throwable) + entry.context.map(_.close()) + } + } + // Log methods that take Throwables (Exceptions/Errors) too protected def logInfo(msg: => String, throwable: Throwable): Unit = { if (log.isInfoEnabled) log.info(msg, throwable) @@ -132,7 +208,11 @@ trait Logging { // scalastyle:off println if (Logging.islog4j2DefaultConfigured()) { Logging.defaultSparkLog4jConfig = true - val defaultLogProps = "org/apache/spark/log4j2-defaults.properties" + val defaultLogProps = if (Logging.isStructuredLoggingEnabled) { + "org/apache/spark/log4j2-defaults.properties" + } else { + "org/apache/spark/log4j2-pattern-layout-defaults.properties" + } Option(SparkClassUtils.getSparkClassLoader.getResource(defaultLogProps)) match { case Some(url) => val context = LogManager.getContext(false).asInstanceOf[LoggerContext] @@ -190,6 +270,7 @@ private[spark] object Logging { @volatile private var initialized = false @volatile private var defaultRootLevel: Level = null @volatile private var defaultSparkLog4jConfig = false + @volatile private var structuredLoggingEnabled = true @volatile private[spark] var sparkShellThresholdLevel: Level = null @volatile private[spark] var setLogLevelPrinted: Boolean = false @@ -259,6 +340,26 @@ private[spark] object Logging { .getConfiguration.isInstanceOf[DefaultConfiguration]) } + /** + * Enable Structured logging framework. + */ + private[spark] def enableStructuredLogging(): Unit = { + structuredLoggingEnabled = true + } + + /** + * Disable Structured logging framework. + */ + private[spark] def disableStructuredLogging(): Unit = { + structuredLoggingEnabled = false + } + + /** + * Return true if Structured logging framework is enabled. + */ + private[spark] def isStructuredLoggingEnabled: Boolean = { + structuredLoggingEnabled + } private[spark] class SparkShellLoggingFilter extends AbstractFilter { private var status = LifeCycle.State.INITIALIZING diff --git a/common/utils/src/test/resources/log4j2.properties b/common/utils/src/test/resources/log4j2.properties new file mode 100644 index 000000000000..2c7563ec8d3d --- /dev/null +++ b/common/utils/src/test/resources/log4j2.properties @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +rootLogger.level = info +rootLogger.appenderRef.file.ref = ${sys:test.appender:-File} + +appender.file.type = File +appender.file.name = File +appender.file.fileName = target/unit-tests.log +appender.file.layout.type = JsonTemplateLayout +appender.file.layout.eventTemplateUri = classpath:org/apache/spark/SparkLayout.json + +# Structured Logging Appender +appender.structured.type = File +appender.structured.name = structured +appender.structured.fileName = target/structured.log +appender.structured.layout.type = JsonTemplateLayout +appender.structured.layout.eventTemplateUri = classpath:org/apache/spark/SparkLayout.json + +# Pattern Logging Appender +appender.pattern.type = File +appender.pattern.name = pattern +appender.pattern.fileName = target/pattern.log +appender.pattern.layout.type = PatternLayout +appender.pattern.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex + +# Custom loggers +logger.structured.name = org.apache.spark.util.StructuredLoggingSuite +logger.structured.level = info +logger.structured.appenderRefs = structured +logger.structured.appenderRef.structured.ref = structured + +logger.pattern.name = org.apache.spark.util.PatternLoggingSuite +logger.pattern.level = info +logger.pattern.appenderRefs = pattern +logger.pattern.appenderRef.pattern.ref = pattern diff --git a/common/utils/src/test/scala/org/apache/spark/util/PatternLoggingSuite.scala b/common/utils/src/test/scala/org/apache/spark/util/PatternLoggingSuite.scala new file mode 100644 index 000000000000..0c6ed89172e0 --- /dev/null +++ b/common/utils/src/test/scala/org/apache/spark/util/PatternLoggingSuite.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util + +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.EXECUTOR_ID + +class PatternLoggingSuite extends LoggingSuiteBase with BeforeAndAfterAll { + + override protected def logFilePath: String = "target/pattern.log" + + override def beforeAll(): Unit = Logging.disableStructuredLogging() + + override def afterAll(): Unit = Logging.enableStructuredLogging() + + test("Pattern layout logging") { + val msg = "This is a log message" + + val logOutput = captureLogOutput(() => logError(msg)) + // scalastyle:off line.size.limit + val pattern = """.*ERROR PatternLoggingSuite: This is a log message\n""".r + // scalastyle:on + assert(pattern.matches(logOutput)) + } + + test("Pattern layout logging with MDC") { + logError(log"Lost executor ${MDC(EXECUTOR_ID, "1")}.") + + val logOutput = captureLogOutput(() => logError(log"Lost executor ${MDC(EXECUTOR_ID, "1")}.")) + val pattern = """.*ERROR PatternLoggingSuite: Lost executor 1.\n""".r + assert(pattern.matches(logOutput)) + } + + test("Pattern layout exception logging") { + val exception = new RuntimeException("OOM") + + val logOutput = captureLogOutput(() => + logError(log"Error in executor ${MDC(EXECUTOR_ID, "1")}.", exception)) + assert(logOutput.contains("ERROR PatternLoggingSuite: Error in executor 1.")) + assert(logOutput.contains("java.lang.RuntimeException: OOM")) + } +} diff --git a/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala b/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala new file mode 100644 index 000000000000..eef9866a68b1 --- /dev/null +++ b/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util + +import java.io.File +import java.nio.file.Files + +import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite + +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.EXECUTOR_ID + +abstract class LoggingSuiteBase extends AnyFunSuite // scalastyle:ignore funsuite + with Logging { + + protected def logFilePath: String + + protected lazy val logFile: File = { + val pwd = new File(".").getCanonicalPath + new File(pwd + "/" + logFilePath) + } + + // Returns the first line in the log file that contains the given substring. + protected def captureLogOutput(f: () => Unit): String = { + val content = if (logFile.exists()) { + Files.readString(logFile.toPath) + } else { + "" + } + f() + val newContent = Files.readString(logFile.toPath) + newContent.substring(content.length) + } +} + +class StructuredLoggingSuite extends LoggingSuiteBase { + private val className = this.getClass.getName.stripSuffix("$") + override def logFilePath: String = "target/structured.log" + + test("Structured logging") { + val msg = "This is a log message" + val logOutput = captureLogOutput(() => logError(msg)) + + // scalastyle:off line.size.limit + val pattern = s"""\\{"ts":"[^"]+","level":"ERROR","msg":"This is a log message","logger":"$className"}\n""".r + // scalastyle:on + assert(pattern.matches(logOutput)) + } + + test("Structured logging with MDC") { + val logOutput = captureLogOutput(() => logError(log"Lost executor ${MDC(EXECUTOR_ID, "1")}.")) + assert(logOutput.nonEmpty) + // scalastyle:off line.size.limit + val pattern1 = s"""\\{"ts":"[^"]+","level":"ERROR","msg":"Lost executor 1.","context":\\{"executor_id":"1"},"logger":"$className"}\n""".r + // scalastyle:on + assert(pattern1.matches(logOutput)) + } + + test("Structured exception logging with MDC") { + val exception = new RuntimeException("OOM") + val logOutput = captureLogOutput(() => + logError(log"Error in executor ${MDC(EXECUTOR_ID, "1")}.", exception)) + assert(logOutput.nonEmpty) + // scalastyle:off line.size.limit + val pattern = s"""\\{"ts":"[^"]+","level":"ERROR","msg":"Error in executor 1.","context":\\{"executor_id":"1"},"exception":\\{"class":"java.lang.RuntimeException","msg":"OOM","stacktrace":.*},"logger":"$className"}\n""".r + // scalastyle:on + assert(pattern.matches(logOutput)) + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 10c1dbe2054a..2cd326acf585 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -228,6 +228,11 @@ private[spark] class SparkSubmit extends Logging { val childClasspath = new ArrayBuffer[String]() val sparkConf = args.toSparkConf() if (sparkConf.contains("spark.local.connect")) sparkConf.remove("spark.remote") + if (sparkConf.getBoolean(STRUCTURED_LOGGING_ENABLED.key, defaultValue = true)) { + Logging.enableStructuredLogging() + } else { + Logging.disableStructuredLogging() + } var childMainClass = "" // Set the cluster manager diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 49f24dfbd826..29d13cbf89c5 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -141,6 +141,16 @@ package object config { "Ensure that memory overhead is a double greater than 0") .createWithDefault(0.1) + private[spark] val STRUCTURED_LOGGING_ENABLED = + ConfigBuilder("spark.log.structuredLogging.enabled") + .doc("When true, the default log4j output format is structured JSON lines, and there will " + + "be Mapped Diagnostic Context (MDC) from Spark added to the logs. This is useful for log " + + "aggregation and analysis tools. When false, the default log4j output will be plain " + + "text and no MDC from Spark will be set.") + .version("4.0.0") + .booleanConf + .createWithDefault(true) + private[spark] val DRIVER_LOG_LOCAL_DIR = ConfigBuilder("spark.driver.log.localDir") .doc("Specifies a local directory to write driver logs and enable Driver Log UI Tab.") diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3 index 903c7a245af3..f990fd57d96f 100644 --- a/dev/deps/spark-deps-hadoop-3-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3-hive-2.3 @@ -188,6 +188,7 @@ libthrift/0.12.0//libthrift-0.12.0.jar log4j-1.2-api/2.22.1//log4j-1.2-api-2.22.1.jar log4j-api/2.22.1//log4j-api-2.22.1.jar log4j-core/2.22.1//log4j-core-2.22.1.jar +log4j-layout-template-json/2.22.1//log4j-layout-template-json-2.22.1.jar log4j-slf4j2-impl/2.22.1//log4j-slf4j2-impl-2.22.1.jar logging-interceptor/3.12.12//logging-interceptor-3.12.12.jar lz4-java/1.8.0//lz4-java-1.8.0.jar diff --git a/docs/core-migration-guide.md b/docs/core-migration-guide.md index f42dfadb2a2a..8baab5ec082b 100644 --- a/docs/core-migration-guide.md +++ b/docs/core-migration-guide.md @@ -42,6 +42,8 @@ license: | - Since Spark 4.0, Spark uses the external shuffle service for deleting shuffle blocks for deallocated executors when the shuffle is no longer needed. To restore the legacy behavior, you can set `spark.shuffle.service.removeShuffle` to `false`. +- Since Spark 4.0, the default log4j output has shifted from plain text to JSON lines to enhance analyzability. To revert to plain text output, you can either set `spark.log.structuredLogging.enabled` to `false`, or use a custom log4j configuration. + ## Upgrading from Core 3.4 to 3.5 - Since Spark 3.5, `spark.yarn.executor.failuresValidityInterval` is deprecated. Use `spark.executor.failuresValidityInterval` instead. diff --git a/pom.xml b/pom.xml index 637aa50f0314..c53a47f701fd 100644 --- a/pom.xml +++ b/pom.xml @@ -757,6 +757,11 @@ log4j-core ${log4j.version} + + org.apache.logging.log4j + log4j-layout-template-json + ${log4j.version} + org.apache.logging.log4j