Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions common/utils/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-layout-template-json</artifactId>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
38 changes: 38 additions & 0 deletions common/utils/src/main/resources/org/apache/spark/SparkLayout.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"ts": {
"$resolver": "timestamp"
},
"level": {
"$resolver": "level",
"field": "name"
},
"msg": {
"$resolver": "message",
"stringified": true
},
"context": {
"$resolver": "mdc"
},
"exception": {
"class": {
"$resolver": "exception",
"field": "className"
},
"msg": {
"$resolver": "exception",
"field": "message",
"stringified": true
},
"stacktrace": {
"$resolver": "exception",
"field": "stackTrace",
"stackTrace": {
"stringified": true
}
}
},
"logger": {
"$resolver": "logger",
"field": "name"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ rootLogger.appenderRef.stdout.ref = console
appender.console.type = Console
appender.console.name = console
appender.console.target = SYSTEM_ERR
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex
appender.console.layout.type = JsonTemplateLayout
appender.console.layout.eventTemplateUri = classpath:org/apache/spark/SparkLayout.json

# Settings to quiet third party logs that are too verbose
logger.jetty.name = org.sparkproject.jetty
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Set everything to be logged to the console
rootLogger.level = info
rootLogger.appenderRef.stdout.ref = console

appender.console.type = Console
appender.console.name = console
appender.console.target = SYSTEM_ERR
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex

# Settings to quiet third party logs that are too verbose
logger.jetty.name = org.sparkproject.jetty
logger.jetty.level = warn
logger.jetty2.name = org.sparkproject.jetty.util.component.AbstractLifeCycle
logger.jetty2.level = error
logger.repl1.name = org.apache.spark.repl.SparkIMain$exprTyper
logger.repl1.level = info
logger.repl2.name = org.apache.spark.repl.SparkILoop$SparkILoopInterpreter
logger.repl2.level = info

# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
logger.repl.name = org.apache.spark.repl.Main
logger.repl.level = warn

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs
# in SparkSQL with Hive support
logger.metastore.name = org.apache.hadoop.hive.metastore.RetryingHMSHandler
logger.metastore.level = fatal
logger.hive_functionregistry.name = org.apache.hadoop.hive.ql.exec.FunctionRegistry
logger.hive_functionregistry.level = error

# Parquet related logging
logger.parquet.name = org.apache.parquet.CorruptStatistics
logger.parquet.level = error
logger.parquet2.name = parquet.CorruptStatistics
logger.parquet2.level = error
25 changes: 25 additions & 0 deletions common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.internal

/**
* Various keys used for mapped diagnostic contexts(MDC) in logging.
* All structured logging keys should be defined here for standardization.
*/
object LogKey extends Enumeration {
val EXECUTOR_ID = Value
}
105 changes: 103 additions & 2 deletions common/utils/src/main/scala/org/apache/spark/internal/Logging.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.spark.internal

import java.util.Locale

import scala.jdk.CollectionConverters._

import org.apache.logging.log4j.{Level, LogManager}
import org.apache.logging.log4j.{CloseableThreadContext, Level, LogManager}
import org.apache.logging.log4j.CloseableThreadContext.Instance
import org.apache.logging.log4j.core.{Filter, LifeCycle, LogEvent, Logger => Log4jLogger, LoggerContext}
import org.apache.logging.log4j.core.appender.ConsoleAppender
import org.apache.logging.log4j.core.config.DefaultConfiguration
Expand All @@ -29,6 +32,38 @@ import org.slf4j.{Logger, LoggerFactory}
import org.apache.spark.internal.Logging.SparkShellLoggingFilter
import org.apache.spark.util.SparkClassUtils

/**
* Mapped Diagnostic Context (MDC) that will be used in log messages.
* The values of the MDC will be inline in the log message, while the key-value pairs will be
* part of the ThreadContext.
*/
case class MDC(key: LogKey.Value, value: String)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would probably be more readable fully spelled-out (MappedDiagnosticContext) rather than an acronym, no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we are putting the class name into the log message. I am trying to avoid a long class name here.


/**
* Wrapper class for log messages that include a logging context.
* This is used as the return type of the string interpolator `LogStringContext`.
*/
case class MessageWithContext(message: String, context: Option[Instance])

/**
* Companion class for lazy evaluation of the MessageWithContext instance.
*/
class LogEntry(messageWithContext: => MessageWithContext) {
def message: String = messageWithContext.message

def context: Option[Instance] = messageWithContext.context
}

/**
* Companion object for the wrapper to enable implicit conversions
*/
object LogEntry {
import scala.language.implicitConversions

implicit def from(msgWithCtx: => MessageWithContext): LogEntry =
new LogEntry(msgWithCtx)
}

/**
* Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows
* logging messages at different levels using methods that only evaluate parameters lazily if the
Expand All @@ -55,6 +90,33 @@ trait Logging {
log_
}

implicit class LogStringContext(val sc: StringContext) {
def log(args: MDC*): MessageWithContext = {
val processedParts = sc.parts.iterator
val sb = new StringBuilder(processedParts.next())
lazy val map = new java.util.HashMap[String, String]()

args.foreach { mdc =>
sb.append(mdc.value)
if (Logging.isStructuredLoggingEnabled) {
map.put(mdc.key.toString.toLowerCase(Locale.ROOT), mdc.value)
}

if (processedParts.hasNext) {
sb.append(processedParts.next())
}
}

// Create a CloseableThreadContext and apply the context map
val closeableContext = if (Logging.isStructuredLoggingEnabled) {
Some(CloseableThreadContext.putAll(map))
} else {
None
}
MessageWithContext(sb.toString(), closeableContext)
}
}

// Log methods that take only a String
protected def logInfo(msg: => String): Unit = {
if (log.isInfoEnabled) log.info(msg)
Expand All @@ -76,6 +138,20 @@ trait Logging {
if (log.isErrorEnabled) log.error(msg)
}

protected def logError(entry: LogEntry): Unit = {
if (log.isErrorEnabled) {
log.error(entry.message)
entry.context.map(_.close())
}
}

protected def logError(entry: LogEntry, throwable: Throwable): Unit = {
if (log.isErrorEnabled) {
log.error(entry.message, throwable)
entry.context.map(_.close())
}
}

// Log methods that take Throwables (Exceptions/Errors) too
protected def logInfo(msg: => String, throwable: Throwable): Unit = {
if (log.isInfoEnabled) log.info(msg, throwable)
Expand Down Expand Up @@ -132,7 +208,11 @@ trait Logging {
// scalastyle:off println
if (Logging.islog4j2DefaultConfigured()) {
Logging.defaultSparkLog4jConfig = true
val defaultLogProps = "org/apache/spark/log4j2-defaults.properties"
val defaultLogProps = if (Logging.isStructuredLoggingEnabled) {
"org/apache/spark/log4j2-defaults.properties"
} else {
"org/apache/spark/log4j2-pattern-layout-defaults.properties"
}
Option(SparkClassUtils.getSparkClassLoader.getResource(defaultLogProps)) match {
case Some(url) =>
val context = LogManager.getContext(false).asInstanceOf[LoggerContext]
Expand Down Expand Up @@ -190,6 +270,7 @@ private[spark] object Logging {
@volatile private var initialized = false
@volatile private var defaultRootLevel: Level = null
@volatile private var defaultSparkLog4jConfig = false
@volatile private var structuredLoggingEnabled = true
@volatile private[spark] var sparkShellThresholdLevel: Level = null
@volatile private[spark] var setLogLevelPrinted: Boolean = false

Expand Down Expand Up @@ -259,6 +340,26 @@ private[spark] object Logging {
.getConfiguration.isInstanceOf[DefaultConfiguration])
}

/**
* Enable Structured logging framework.
*/
private[spark] def enableStructuredLogging(): Unit = {
structuredLoggingEnabled = true
}

/**
* Disable Structured logging framework.
*/
private[spark] def disableStructuredLogging(): Unit = {
structuredLoggingEnabled = false
}

/**
* Return true if Structured logging framework is enabled.
*/
private[spark] def isStructuredLoggingEnabled: Boolean = {
structuredLoggingEnabled
}

private[spark] class SparkShellLoggingFilter extends AbstractFilter {
private var status = LifeCycle.State.INITIALIZING
Expand Down
50 changes: 50 additions & 0 deletions common/utils/src/test/resources/log4j2.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

rootLogger.level = info
rootLogger.appenderRef.file.ref = ${sys:test.appender:-File}

appender.file.type = File
appender.file.name = File
appender.file.fileName = target/unit-tests.log
appender.file.layout.type = JsonTemplateLayout
appender.file.layout.eventTemplateUri = classpath:org/apache/spark/SparkLayout.json

# Structured Logging Appender
appender.structured.type = File
appender.structured.name = structured
appender.structured.fileName = target/structured.log
appender.structured.layout.type = JsonTemplateLayout
appender.structured.layout.eventTemplateUri = classpath:org/apache/spark/SparkLayout.json

# Pattern Logging Appender
appender.pattern.type = File
appender.pattern.name = pattern
appender.pattern.fileName = target/pattern.log
appender.pattern.layout.type = PatternLayout
appender.pattern.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex

# Custom loggers
logger.structured.name = org.apache.spark.util.StructuredLoggingSuite
logger.structured.level = info
logger.structured.appenderRefs = structured
logger.structured.appenderRef.structured.ref = structured

logger.pattern.name = org.apache.spark.util.PatternLoggingSuite
logger.pattern.level = info
logger.pattern.appenderRefs = pattern
logger.pattern.appenderRef.pattern.ref = pattern
Loading