Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 33 additions & 16 deletions core/src/main/scala/org/apache/spark/internal/Logging.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@

package org.apache.spark.internal

import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._

import org.apache.log4j._
import org.apache.log4j.spi.{Filter, LoggingEvent}
import org.slf4j.{Logger, LoggerFactory}
import org.slf4j.impl.StaticLoggerBinder

Expand Down Expand Up @@ -154,16 +153,10 @@ trait Logging {
System.err.println("To adjust logging level use sc.setLogLevel(newLevel). " +
"For SparkR, use setLogLevel(newLevel).")
}
Logging.sparkShellThresholdLevel = replLevel
rootLogger.getAllAppenders().asScala.foreach {
case ca: ConsoleAppender =>
Option(ca.getThreshold()) match {
case Some(t) =>
Logging.consoleAppenderToThreshold.put(ca, t)
if (!t.isGreaterOrEqual(replLevel)) {
ca.setThreshold(replLevel)
}
case None => ca.setThreshold(replLevel)
}
ca.addFilter(new SparkShellLoggingFilter())
case _ => // no-op
}
}
Expand All @@ -182,7 +175,7 @@ private[spark] object Logging {
@volatile private var initialized = false
@volatile private var defaultRootLevel: Level = null
@volatile private var defaultSparkLog4jConfig = false
private val consoleAppenderToThreshold = new ConcurrentHashMap[ConsoleAppender, Priority]()
@volatile private[spark] var sparkShellThresholdLevel: Level = null

val initLock = new Object()
try {
Expand Down Expand Up @@ -211,11 +204,7 @@ private[spark] object Logging {
} else {
val rootLogger = LogManager.getRootLogger()
rootLogger.setLevel(defaultRootLevel)
rootLogger.getAllAppenders().asScala.foreach {
case ca: ConsoleAppender =>
ca.setThreshold(consoleAppenderToThreshold.get(ca))
case _ => // no-op
}
sparkShellThresholdLevel = null
}
}
this.initialized = false
Expand All @@ -229,3 +218,31 @@ private[spark] object Logging {
"org.slf4j.impl.Log4jLoggerFactory".equals(binderClass)
}
}

private class SparkShellLoggingFilter() extends Filter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove ()


/**
* If sparkShellThresholdLevel is not defined, this filter is a no-op.
* If log level of event is not equal to root level, the event is allowed. Otherwise,
* the decision is made based on whether the log came from root or some custom configuration
* @param loggingEvent
* @return decision for accept/deny log event
*/
def decide(loggingEvent: LoggingEvent): Int = {
if (Logging.sparkShellThresholdLevel == null) {
return Filter.NEUTRAL
}
val rootLevel = LogManager.getRootLogger().getLevel()
if (!loggingEvent.getLevel().eq(rootLevel)) {
return Filter.NEUTRAL
}
var logger = loggingEvent.getLogger()
while (logger.getParent() != null) {
if (logger.getLevel() != null) {
return Filter.NEUTRAL
}
logger = logger.getParent()
}
return Filter.DENY
}
}
6 changes: 2 additions & 4 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2284,10 +2284,8 @@ private[spark] object Utils extends Logging {
def setLogLevel(l: org.apache.log4j.Level) {
val rootLogger = org.apache.log4j.Logger.getRootLogger()
rootLogger.setLevel(l)
rootLogger.getAllAppenders().asScala.foreach {
case ca: org.apache.log4j.ConsoleAppender => ca.setThreshold(l)
case _ => // no-op
}
// Setting threshold to null as rootLevel will define log level for spark-shell
Logging.sparkShellThresholdLevel = null
}

/**
Expand Down
59 changes: 59 additions & 0 deletions core/src/test/scala/org/apache/spark/internal/LoggingSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.internal

import org.apache.log4j.{Level, Logger}
import org.apache.log4j.spi.{Filter, LoggingEvent}

import org.apache.spark.SparkFunSuite
import org.apache.spark.util.Utils

class LoggingSuite extends SparkFunSuite {

test("spark-shell logging filter") {
val ssf = new SparkShellLoggingFilter()
val rootLogger = Logger.getRootLogger()
val originalLevel = rootLogger.getLevel()
rootLogger.setLevel(Level.INFO)
val originalThreshold = Logging.sparkShellThresholdLevel
Logging.sparkShellThresholdLevel = Level.WARN
try {
val logger = Logger.getLogger("a.b.c.D")
val logEvent = new LoggingEvent(logger.getName(), logger, Level.INFO, "Test", null)
assert(ssf.decide(logEvent) === Filter.DENY)

// log level is less than threshold level but different from root level
val logEvent1 = new LoggingEvent(logger.getName(), logger, Level.DEBUG, "Test", null)
assert(ssf.decide(logEvent1) != Filter.DENY)

// custom log level configured
val parentLogger = Logger.getLogger("a.b.c")
parentLogger.setLevel(Level.INFO)
assert(ssf.decide(logEvent) != Filter.DENY)

// log level is greater than or equal to threshold level
val logger2 = Logger.getLogger("a.b.E")
val logEvent2 = new LoggingEvent(logger2.getName(), logger2, Level.INFO, "Test", null)
Utils.setLogLevel(Level.INFO)
assert(ssf.decide(logEvent2) != Filter.DENY)
} finally {
rootLogger.setLevel(originalLevel)
Logging.sparkShellThresholdLevel = originalThreshold
}
}
}