diff --git a/core/src/main/scala/org/apache/spark/internal/Logging.scala b/core/src/main/scala/org/apache/spark/internal/Logging.scala index 00db9af846ab9..e87720301cd2a 100644 --- a/core/src/main/scala/org/apache/spark/internal/Logging.scala +++ b/core/src/main/scala/org/apache/spark/internal/Logging.scala @@ -17,11 +17,10 @@ package org.apache.spark.internal -import java.util.concurrent.ConcurrentHashMap - import scala.collection.JavaConverters._ import org.apache.log4j._ +import org.apache.log4j.spi.{Filter, LoggingEvent} import org.slf4j.{Logger, LoggerFactory} import org.slf4j.impl.StaticLoggerBinder @@ -154,16 +153,10 @@ trait Logging { System.err.println("To adjust logging level use sc.setLogLevel(newLevel). " + "For SparkR, use setLogLevel(newLevel).") } + Logging.sparkShellThresholdLevel = replLevel rootLogger.getAllAppenders().asScala.foreach { case ca: ConsoleAppender => - Option(ca.getThreshold()) match { - case Some(t) => - Logging.consoleAppenderToThreshold.put(ca, t) - if (!t.isGreaterOrEqual(replLevel)) { - ca.setThreshold(replLevel) - } - case None => ca.setThreshold(replLevel) - } + ca.addFilter(new SparkShellLoggingFilter()) case _ => // no-op } } @@ -182,7 +175,7 @@ private[spark] object Logging { @volatile private var initialized = false @volatile private var defaultRootLevel: Level = null @volatile private var defaultSparkLog4jConfig = false - private val consoleAppenderToThreshold = new ConcurrentHashMap[ConsoleAppender, Priority]() + @volatile private[spark] var sparkShellThresholdLevel: Level = null val initLock = new Object() try { @@ -211,11 +204,7 @@ private[spark] object Logging { } else { val rootLogger = LogManager.getRootLogger() rootLogger.setLevel(defaultRootLevel) - rootLogger.getAllAppenders().asScala.foreach { - case ca: ConsoleAppender => - ca.setThreshold(consoleAppenderToThreshold.get(ca)) - case _ => // no-op - } + sparkShellThresholdLevel = null } } this.initialized = false @@ -229,3 +218,31 @@ private[spark] object Logging { "org.slf4j.impl.Log4jLoggerFactory".equals(binderClass) } } + +private class SparkShellLoggingFilter() extends Filter { + + /** + * If sparkShellThresholdLevel is not defined, this filter is a no-op. + * If log level of event is not equal to root level, the event is allowed. Otherwise, + * the decision is made based on whether the log came from root or some custom configuration + * @param loggingEvent + * @return decision for accept/deny log event + */ + def decide(loggingEvent: LoggingEvent): Int = { + if (Logging.sparkShellThresholdLevel == null) { + return Filter.NEUTRAL + } + val rootLevel = LogManager.getRootLogger().getLevel() + if (!loggingEvent.getLevel().eq(rootLevel)) { + return Filter.NEUTRAL + } + var logger = loggingEvent.getLogger() + while (logger.getParent() != null) { + if (logger.getLevel() != null) { + return Filter.NEUTRAL + } + logger = logger.getParent() + } + return Filter.DENY + } +} diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 0ad1ffc28b09d..37b21e2eddba6 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2284,10 +2284,8 @@ private[spark] object Utils extends Logging { def setLogLevel(l: org.apache.log4j.Level) { val rootLogger = org.apache.log4j.Logger.getRootLogger() rootLogger.setLevel(l) - rootLogger.getAllAppenders().asScala.foreach { - case ca: org.apache.log4j.ConsoleAppender => ca.setThreshold(l) - case _ => // no-op - } + // Setting threshold to null as rootLevel will define log level for spark-shell + Logging.sparkShellThresholdLevel = null } /** diff --git a/core/src/test/scala/org/apache/spark/internal/LoggingSuite.scala b/core/src/test/scala/org/apache/spark/internal/LoggingSuite.scala new file mode 100644 index 0000000000000..250ac3dafcabc --- /dev/null +++ b/core/src/test/scala/org/apache/spark/internal/LoggingSuite.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal + +import org.apache.log4j.{Level, Logger} +import org.apache.log4j.spi.{Filter, LoggingEvent} + +import org.apache.spark.SparkFunSuite +import org.apache.spark.util.Utils + +class LoggingSuite extends SparkFunSuite { + + test("spark-shell logging filter") { + val ssf = new SparkShellLoggingFilter() + val rootLogger = Logger.getRootLogger() + val originalLevel = rootLogger.getLevel() + rootLogger.setLevel(Level.INFO) + val originalThreshold = Logging.sparkShellThresholdLevel + Logging.sparkShellThresholdLevel = Level.WARN + try { + val logger = Logger.getLogger("a.b.c.D") + val logEvent = new LoggingEvent(logger.getName(), logger, Level.INFO, "Test", null) + assert(ssf.decide(logEvent) === Filter.DENY) + + // log level is less than threshold level but different from root level + val logEvent1 = new LoggingEvent(logger.getName(), logger, Level.DEBUG, "Test", null) + assert(ssf.decide(logEvent1) != Filter.DENY) + + // custom log level configured + val parentLogger = Logger.getLogger("a.b.c") + parentLogger.setLevel(Level.INFO) + assert(ssf.decide(logEvent) != Filter.DENY) + + // log level is greater than or equal to threshold level + val logger2 = Logger.getLogger("a.b.E") + val logEvent2 = new LoggingEvent(logger2.getName(), logger2, Level.INFO, "Test", null) + Utils.setLogLevel(Level.INFO) + assert(ssf.decide(logEvent2) != Filter.DENY) + } finally { + rootLogger.setLevel(originalLevel) + Logging.sparkShellThresholdLevel = originalThreshold + } + } +}