Skip to content

Commit 874d033

Browse files
committed
[SPARK-47574][INFRA] Introduce Structured Logging Framework
### What changes were proposed in this pull request? Introduce Structured Logging Framework as per [SPIP: Structured Logging Framework for Apache Spark](https://docs.google.com/document/d/1rATVGmFLNVLmtxSpWrEceYm7d-ocgu8ofhryVs4g3XU/edit?usp=sharing) . * The default logging output format will be json lines. For example ``` { "ts":"2023-03-12T12:02:46.661-0700", "level":"ERROR", "msg":"Cannot determine whether executor 289 is alive or not", "context":{ "executor_id":"289" }, "exception":{ "class":"org.apache.spark.SparkException", "msg":"Exception thrown in awaitResult", "stackTrace":"..." }, "source":"BlockManagerMasterEndpoint" } ``` * Introduce a new configuration `spark.log.structuredLogging.enabled` to set the default log4j configuration. It is true by default. Users can disable it to get plain text log outputs. * The change will start with the `logError` method. Example changes on the API: from ``` logError(s"Cannot determine whether executor $executorId is alive or not.", e) ``` to ``` logError(log"Cannot determine whether executor ${MDC(EXECUTOR_ID, executorId)} is alive or not.", e) ``` ### Why are the changes needed? To enhance Apache Spark's logging system by implementing structured logging. This transition will change the format of the default log output from plain text to JSON lines, making it more analyzable. ### Does this PR introduce _any_ user-facing change? Yes, the default log output format will be json lines instead of plain text. User can restore the default plain text output when disabling configuration `spark.log.structuredLogging.enabled`. If a user is a customized log4j configuration, there is no changes in the log output. ### How was this patch tested? New Unit tests ### Was this patch authored or co-authored using generative AI tooling? Yes, some of the code comments are from github copilot Closes #45729 from gengliangwang/LogInterpolator. Authored-by: Gengliang Wang <gengliang@apache.org> Signed-off-by: Gengliang Wang <gengliang@apache.org>
1 parent a8b247e commit 874d033

File tree

14 files changed

+441
-4
lines changed

14 files changed

+441
-4
lines changed

common/utils/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,10 @@
9898
<groupId>org.apache.logging.log4j</groupId>
9999
<artifactId>log4j-1.2-api</artifactId>
100100
</dependency>
101+
<dependency>
102+
<groupId>org.apache.logging.log4j</groupId>
103+
<artifactId>log4j-layout-template-json</artifactId>
104+
</dependency>
101105
</dependencies>
102106
<build>
103107
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
{
2+
"ts": {
3+
"$resolver": "timestamp"
4+
},
5+
"level": {
6+
"$resolver": "level",
7+
"field": "name"
8+
},
9+
"msg": {
10+
"$resolver": "message",
11+
"stringified": true
12+
},
13+
"context": {
14+
"$resolver": "mdc"
15+
},
16+
"exception": {
17+
"class": {
18+
"$resolver": "exception",
19+
"field": "className"
20+
},
21+
"msg": {
22+
"$resolver": "exception",
23+
"field": "message",
24+
"stringified": true
25+
},
26+
"stacktrace": {
27+
"$resolver": "exception",
28+
"field": "stackTrace",
29+
"stackTrace": {
30+
"stringified": true
31+
}
32+
}
33+
},
34+
"logger": {
35+
"$resolver": "logger",
36+
"field": "name"
37+
}
38+
}

common/utils/src/main/resources/org/apache/spark/log4j2-defaults.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ rootLogger.appenderRef.stdout.ref = console
2222
appender.console.type = Console
2323
appender.console.name = console
2424
appender.console.target = SYSTEM_ERR
25-
appender.console.layout.type = PatternLayout
26-
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex
25+
appender.console.layout.type = JsonTemplateLayout
26+
appender.console.layout.eventTemplateUri = classpath:org/apache/spark/SparkLayout.json
2727

2828
# Settings to quiet third party logs that are too verbose
2929
logger.jetty.name = org.sparkproject.jetty
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
# Set everything to be logged to the console
19+
rootLogger.level = info
20+
rootLogger.appenderRef.stdout.ref = console
21+
22+
appender.console.type = Console
23+
appender.console.name = console
24+
appender.console.target = SYSTEM_ERR
25+
appender.console.layout.type = PatternLayout
26+
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex
27+
28+
# Settings to quiet third party logs that are too verbose
29+
logger.jetty.name = org.sparkproject.jetty
30+
logger.jetty.level = warn
31+
logger.jetty2.name = org.sparkproject.jetty.util.component.AbstractLifeCycle
32+
logger.jetty2.level = error
33+
logger.repl1.name = org.apache.spark.repl.SparkIMain$exprTyper
34+
logger.repl1.level = info
35+
logger.repl2.name = org.apache.spark.repl.SparkILoop$SparkILoopInterpreter
36+
logger.repl2.level = info
37+
38+
# Set the default spark-shell log level to WARN. When running the spark-shell, the
39+
# log level for this class is used to overwrite the root logger's log level, so that
40+
# the user can have different defaults for the shell and regular Spark apps.
41+
logger.repl.name = org.apache.spark.repl.Main
42+
logger.repl.level = warn
43+
44+
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs
45+
# in SparkSQL with Hive support
46+
logger.metastore.name = org.apache.hadoop.hive.metastore.RetryingHMSHandler
47+
logger.metastore.level = fatal
48+
logger.hive_functionregistry.name = org.apache.hadoop.hive.ql.exec.FunctionRegistry
49+
logger.hive_functionregistry.level = error
50+
51+
# Parquet related logging
52+
logger.parquet.name = org.apache.parquet.CorruptStatistics
53+
logger.parquet.level = error
54+
logger.parquet2.name = parquet.CorruptStatistics
55+
logger.parquet2.level = error
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.internal
18+
19+
/**
20+
* Various keys used for mapped diagnostic contexts(MDC) in logging.
21+
* All structured logging keys should be defined here for standardization.
22+
*/
23+
object LogKey extends Enumeration {
24+
val EXECUTOR_ID = Value
25+
}

common/utils/src/main/scala/org/apache/spark/internal/Logging.scala

Lines changed: 103 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@
1717

1818
package org.apache.spark.internal
1919

20+
import java.util.Locale
21+
2022
import scala.jdk.CollectionConverters._
2123

22-
import org.apache.logging.log4j.{Level, LogManager}
24+
import org.apache.logging.log4j.{CloseableThreadContext, Level, LogManager}
25+
import org.apache.logging.log4j.CloseableThreadContext.Instance
2326
import org.apache.logging.log4j.core.{Filter, LifeCycle, LogEvent, Logger => Log4jLogger, LoggerContext}
2427
import org.apache.logging.log4j.core.appender.ConsoleAppender
2528
import org.apache.logging.log4j.core.config.DefaultConfiguration
@@ -29,6 +32,38 @@ import org.slf4j.{Logger, LoggerFactory}
2932
import org.apache.spark.internal.Logging.SparkShellLoggingFilter
3033
import org.apache.spark.util.SparkClassUtils
3134

35+
/**
36+
* Mapped Diagnostic Context (MDC) that will be used in log messages.
37+
* The values of the MDC will be inline in the log message, while the key-value pairs will be
38+
* part of the ThreadContext.
39+
*/
40+
case class MDC(key: LogKey.Value, value: String)
41+
42+
/**
43+
* Wrapper class for log messages that include a logging context.
44+
* This is used as the return type of the string interpolator `LogStringContext`.
45+
*/
46+
case class MessageWithContext(message: String, context: Option[Instance])
47+
48+
/**
49+
* Companion class for lazy evaluation of the MessageWithContext instance.
50+
*/
51+
class LogEntry(messageWithContext: => MessageWithContext) {
52+
def message: String = messageWithContext.message
53+
54+
def context: Option[Instance] = messageWithContext.context
55+
}
56+
57+
/**
58+
* Companion object for the wrapper to enable implicit conversions
59+
*/
60+
object LogEntry {
61+
import scala.language.implicitConversions
62+
63+
implicit def from(msgWithCtx: => MessageWithContext): LogEntry =
64+
new LogEntry(msgWithCtx)
65+
}
66+
3267
/**
3368
* Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows
3469
* logging messages at different levels using methods that only evaluate parameters lazily if the
@@ -55,6 +90,33 @@ trait Logging {
5590
log_
5691
}
5792

93+
implicit class LogStringContext(val sc: StringContext) {
94+
def log(args: MDC*): MessageWithContext = {
95+
val processedParts = sc.parts.iterator
96+
val sb = new StringBuilder(processedParts.next())
97+
lazy val map = new java.util.HashMap[String, String]()
98+
99+
args.foreach { mdc =>
100+
sb.append(mdc.value)
101+
if (Logging.isStructuredLoggingEnabled) {
102+
map.put(mdc.key.toString.toLowerCase(Locale.ROOT), mdc.value)
103+
}
104+
105+
if (processedParts.hasNext) {
106+
sb.append(processedParts.next())
107+
}
108+
}
109+
110+
// Create a CloseableThreadContext and apply the context map
111+
val closeableContext = if (Logging.isStructuredLoggingEnabled) {
112+
Some(CloseableThreadContext.putAll(map))
113+
} else {
114+
None
115+
}
116+
MessageWithContext(sb.toString(), closeableContext)
117+
}
118+
}
119+
58120
// Log methods that take only a String
59121
protected def logInfo(msg: => String): Unit = {
60122
if (log.isInfoEnabled) log.info(msg)
@@ -76,6 +138,20 @@ trait Logging {
76138
if (log.isErrorEnabled) log.error(msg)
77139
}
78140

141+
protected def logError(entry: LogEntry): Unit = {
142+
if (log.isErrorEnabled) {
143+
log.error(entry.message)
144+
entry.context.map(_.close())
145+
}
146+
}
147+
148+
protected def logError(entry: LogEntry, throwable: Throwable): Unit = {
149+
if (log.isErrorEnabled) {
150+
log.error(entry.message, throwable)
151+
entry.context.map(_.close())
152+
}
153+
}
154+
79155
// Log methods that take Throwables (Exceptions/Errors) too
80156
protected def logInfo(msg: => String, throwable: Throwable): Unit = {
81157
if (log.isInfoEnabled) log.info(msg, throwable)
@@ -132,7 +208,11 @@ trait Logging {
132208
// scalastyle:off println
133209
if (Logging.islog4j2DefaultConfigured()) {
134210
Logging.defaultSparkLog4jConfig = true
135-
val defaultLogProps = "org/apache/spark/log4j2-defaults.properties"
211+
val defaultLogProps = if (Logging.isStructuredLoggingEnabled) {
212+
"org/apache/spark/log4j2-defaults.properties"
213+
} else {
214+
"org/apache/spark/log4j2-pattern-layout-defaults.properties"
215+
}
136216
Option(SparkClassUtils.getSparkClassLoader.getResource(defaultLogProps)) match {
137217
case Some(url) =>
138218
val context = LogManager.getContext(false).asInstanceOf[LoggerContext]
@@ -190,6 +270,7 @@ private[spark] object Logging {
190270
@volatile private var initialized = false
191271
@volatile private var defaultRootLevel: Level = null
192272
@volatile private var defaultSparkLog4jConfig = false
273+
@volatile private var structuredLoggingEnabled = true
193274
@volatile private[spark] var sparkShellThresholdLevel: Level = null
194275
@volatile private[spark] var setLogLevelPrinted: Boolean = false
195276

@@ -259,6 +340,26 @@ private[spark] object Logging {
259340
.getConfiguration.isInstanceOf[DefaultConfiguration])
260341
}
261342

343+
/**
344+
* Enable Structured logging framework.
345+
*/
346+
private[spark] def enableStructuredLogging(): Unit = {
347+
structuredLoggingEnabled = true
348+
}
349+
350+
/**
351+
* Disable Structured logging framework.
352+
*/
353+
private[spark] def disableStructuredLogging(): Unit = {
354+
structuredLoggingEnabled = false
355+
}
356+
357+
/**
358+
* Return true if Structured logging framework is enabled.
359+
*/
360+
private[spark] def isStructuredLoggingEnabled: Boolean = {
361+
structuredLoggingEnabled
362+
}
262363

263364
private[spark] class SparkShellLoggingFilter extends AbstractFilter {
264365
private var status = LifeCycle.State.INITIALIZING
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
rootLogger.level = info
19+
rootLogger.appenderRef.file.ref = ${sys:test.appender:-File}
20+
21+
appender.file.type = File
22+
appender.file.name = File
23+
appender.file.fileName = target/unit-tests.log
24+
appender.file.layout.type = JsonTemplateLayout
25+
appender.file.layout.eventTemplateUri = classpath:org/apache/spark/SparkLayout.json
26+
27+
# Structured Logging Appender
28+
appender.structured.type = File
29+
appender.structured.name = structured
30+
appender.structured.fileName = target/structured.log
31+
appender.structured.layout.type = JsonTemplateLayout
32+
appender.structured.layout.eventTemplateUri = classpath:org/apache/spark/SparkLayout.json
33+
34+
# Pattern Logging Appender
35+
appender.pattern.type = File
36+
appender.pattern.name = pattern
37+
appender.pattern.fileName = target/pattern.log
38+
appender.pattern.layout.type = PatternLayout
39+
appender.pattern.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex
40+
41+
# Custom loggers
42+
logger.structured.name = org.apache.spark.util.StructuredLoggingSuite
43+
logger.structured.level = info
44+
logger.structured.appenderRefs = structured
45+
logger.structured.appenderRef.structured.ref = structured
46+
47+
logger.pattern.name = org.apache.spark.util.PatternLoggingSuite
48+
logger.pattern.level = info
49+
logger.pattern.appenderRefs = pattern
50+
logger.pattern.appenderRef.pattern.ref = pattern

0 commit comments

Comments
 (0)