From 2a191cf84cdc44a0e29c672f6d8d7241843d007d Mon Sep 17 00:00:00 2001 From: ankurgupta Date: Mon, 28 Jan 2019 11:48:37 -0800 Subject: [PATCH 1/3] [SPARK-26753][CORE] Fix for ensuring custom log levels for packages/classes work for spark-shell This fix replaces the Threshold with a Filter for ConsoleAppender which checks to ensure that either the logLevel is greater than thresholdLevel (shell log level) or the log originated from a custom defined logger. In these cases, it lets a log event go through, otherwise it doesn't. Testing Done: 1. Ensured that custom log level works when set by default (via log4j.properties) 2. Ensured that logs are not printed twice when log level is changed by setLogLevel 3. Ensured that custom logs are printed when log level is changed back by setLogLevel --- .../org/apache/spark/internal/Logging.scala | 28 ++++++++----- .../scala/org/apache/spark/util/Utils.scala | 41 ++++++++++++++++++- 2 files changed, 57 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/Logging.scala b/core/src/main/scala/org/apache/spark/internal/Logging.scala index 00db9af846ab9..145d70e06d004 100644 --- a/core/src/main/scala/org/apache/spark/internal/Logging.scala +++ b/core/src/main/scala/org/apache/spark/internal/Logging.scala @@ -25,7 +25,7 @@ import org.apache.log4j._ import org.slf4j.{Logger, LoggerFactory} import org.slf4j.impl.StaticLoggerBinder -import org.apache.spark.util.Utils +import org.apache.spark.util.{SparkShellLoggingFilter, Utils} /** * Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows @@ -156,14 +156,7 @@ trait Logging { } rootLogger.getAllAppenders().asScala.foreach { case ca: ConsoleAppender => - Option(ca.getThreshold()) match { - case Some(t) => - Logging.consoleAppenderToThreshold.put(ca, t) - if (!t.isGreaterOrEqual(replLevel)) { - ca.setThreshold(replLevel) - } - case None => ca.setThreshold(replLevel) - } + ca.addFilter(new SparkShellLoggingFilter(replLevel)) case _ => // no-op } } @@ -182,7 +175,6 @@ private[spark] object Logging { @volatile private var initialized = false @volatile private var defaultRootLevel: Level = null @volatile private var defaultSparkLog4jConfig = false - private val consoleAppenderToThreshold = new ConcurrentHashMap[ConsoleAppender, Priority]() val initLock = new Object() try { @@ -213,7 +205,21 @@ private[spark] object Logging { rootLogger.setLevel(defaultRootLevel) rootLogger.getAllAppenders().asScala.foreach { case ca: ConsoleAppender => - ca.setThreshold(consoleAppenderToThreshold.get(ca)) + // SparkShellLoggingFilter is the last filter + ca.getFirstFilter() match { + case ssf: SparkShellLoggingFilter => + ca.clearFilters() + case f: org.apache.log4j.spi.Filter => + var previous = f + var current = previous.getNext() + while (current != null && !current.isInstanceOf[SparkShellLoggingFilter]) { + previous = current; + current = previous.getNext() + } + if (current != null) { + previous.setNext(current.getNext()) + } + } case _ => // no-op } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 0ad1ffc28b09d..f3d47d65ca85e 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -53,6 +53,8 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.log4j.{Level, LogManager} +import org.apache.log4j.spi.{Filter, LoggingEvent} import org.eclipse.jetty.util.MultiException import org.slf4j.Logger @@ -2285,7 +2287,15 @@ private[spark] object Utils extends Logging { val rootLogger = org.apache.log4j.Logger.getRootLogger() rootLogger.setLevel(l) rootLogger.getAllAppenders().asScala.foreach { - case ca: org.apache.log4j.ConsoleAppender => ca.setThreshold(l) + case ca: org.apache.log4j.ConsoleAppender => + var f = ca.getFirstFilter() + while (f != null) { + f match { + case ssf: SparkShellLoggingFilter => + ssf.setThresholdLevel(l) + } + f = f.getNext() + } case _ => // no-op } } @@ -2991,3 +3001,32 @@ private[spark] class CircularBuffer(sizeInBytes: Int = 10240) extends java.io.Ou new String(nonCircularBuffer, StandardCharsets.UTF_8) } } + +private[spark] class SparkShellLoggingFilter(var thresholdLevel: Level) extends Filter { + + /** + * If log level of event is lower than thresholdLevel, then the decision is made based on + * whether the log came from root or some custom configuration + * @param loggingEvent + * @return decision for accept/deny log event + */ + def decide(loggingEvent: LoggingEvent): Int = { + val rootLevel = LogManager.getRootLogger().getLevel() + if (loggingEvent.getLevel().isGreaterOrEqual(thresholdLevel) || + !loggingEvent.getLevel().eq(rootLevel)) { + return Filter.NEUTRAL + } + var logger = loggingEvent.getLogger() + while(logger.getParent() != null) { + if (logger.getLevel() != null) { + return Filter.NEUTRAL + } + logger = logger.getParent() + } + return Filter.DENY + } + + private[spark] def setThresholdLevel(level: Level): Unit = { + thresholdLevel = level + } +} From 7d8fb20c8f51713904666c4bddf6e00fba10fed2 Mon Sep 17 00:00:00 2001 From: ankurgupta Date: Tue, 29 Jan 2019 10:21:09 -0800 Subject: [PATCH 2/3] Review comments: Part 1 1. Moved SparkShellLoggingFilter from Utils.scala to Logging.scala 2. Moved thresholdLevel from Filter to Logging.scala 3. Added unit test cases --- .../org/apache/spark/internal/Logging.scala | 42 ++++++++++++-- .../scala/org/apache/spark/util/Utils.scala | 45 +-------------- .../apache/spark/internal/LoggingSuite.scala | 57 +++++++++++++++++++ 3 files changed, 96 insertions(+), 48 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/internal/LoggingSuite.scala diff --git a/core/src/main/scala/org/apache/spark/internal/Logging.scala b/core/src/main/scala/org/apache/spark/internal/Logging.scala index 145d70e06d004..b593ac0c605ba 100644 --- a/core/src/main/scala/org/apache/spark/internal/Logging.scala +++ b/core/src/main/scala/org/apache/spark/internal/Logging.scala @@ -17,15 +17,14 @@ package org.apache.spark.internal -import java.util.concurrent.ConcurrentHashMap - import scala.collection.JavaConverters._ import org.apache.log4j._ +import org.apache.log4j.spi.{Filter, LoggingEvent} import org.slf4j.{Logger, LoggerFactory} import org.slf4j.impl.StaticLoggerBinder -import org.apache.spark.util.{SparkShellLoggingFilter, Utils} +import org.apache.spark.util.Utils /** * Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows @@ -154,9 +153,10 @@ trait Logging { System.err.println("To adjust logging level use sc.setLogLevel(newLevel). " + "For SparkR, use setLogLevel(newLevel).") } + Logging.sparkShellThresholdLevel = replLevel rootLogger.getAllAppenders().asScala.foreach { case ca: ConsoleAppender => - ca.addFilter(new SparkShellLoggingFilter(replLevel)) + ca.addFilter(new SparkShellLoggingFilter()) case _ => // no-op } } @@ -175,6 +175,7 @@ private[spark] object Logging { @volatile private var initialized = false @volatile private var defaultRootLevel: Level = null @volatile private var defaultSparkLog4jConfig = false + @volatile private[spark] var sparkShellThresholdLevel: Level = null val initLock = new Object() try { @@ -209,7 +210,7 @@ private[spark] object Logging { ca.getFirstFilter() match { case ssf: SparkShellLoggingFilter => ca.clearFilters() - case f: org.apache.log4j.spi.Filter => + case f: Filter => var previous = f var current = previous.getNext() while (current != null && !current.isInstanceOf[SparkShellLoggingFilter]) { @@ -219,6 +220,7 @@ private[spark] object Logging { if (current != null) { previous.setNext(current.getNext()) } + case _ => // no-op } case _ => // no-op } @@ -235,3 +237,33 @@ private[spark] object Logging { "org.slf4j.impl.Log4jLoggerFactory".equals(binderClass) } } + +private class SparkShellLoggingFilter() extends Filter { + + /** + * If sparkShellThresholdLevel is not defined, this filter is a no-op. + * If log level of event is lower than thresholdLevel, then the decision is made based on + * whether the log came from root or some custom configuration + * @param loggingEvent + * @return decision for accept/deny log event + */ + def decide(loggingEvent: LoggingEvent): Int = { + val thresholdLevel = Logging.sparkShellThresholdLevel + if (thresholdLevel == null) { + return Filter.NEUTRAL + } + val rootLevel = LogManager.getRootLogger().getLevel() + if (loggingEvent.getLevel().isGreaterOrEqual(thresholdLevel) || + !loggingEvent.getLevel().eq(rootLevel)) { + return Filter.NEUTRAL + } + var logger = loggingEvent.getLogger() + while (logger.getParent() != null) { + if (logger.getLevel() != null) { + return Filter.NEUTRAL + } + logger = logger.getParent() + } + return Filter.DENY + } +} diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index f3d47d65ca85e..37b21e2eddba6 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -53,8 +53,6 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.log4j.{Level, LogManager} -import org.apache.log4j.spi.{Filter, LoggingEvent} import org.eclipse.jetty.util.MultiException import org.slf4j.Logger @@ -2286,18 +2284,8 @@ private[spark] object Utils extends Logging { def setLogLevel(l: org.apache.log4j.Level) { val rootLogger = org.apache.log4j.Logger.getRootLogger() rootLogger.setLevel(l) - rootLogger.getAllAppenders().asScala.foreach { - case ca: org.apache.log4j.ConsoleAppender => - var f = ca.getFirstFilter() - while (f != null) { - f match { - case ssf: SparkShellLoggingFilter => - ssf.setThresholdLevel(l) - } - f = f.getNext() - } - case _ => // no-op - } + // Setting threshold to null as rootLevel will define log level for spark-shell + Logging.sparkShellThresholdLevel = null } /** @@ -3001,32 +2989,3 @@ private[spark] class CircularBuffer(sizeInBytes: Int = 10240) extends java.io.Ou new String(nonCircularBuffer, StandardCharsets.UTF_8) } } - -private[spark] class SparkShellLoggingFilter(var thresholdLevel: Level) extends Filter { - - /** - * If log level of event is lower than thresholdLevel, then the decision is made based on - * whether the log came from root or some custom configuration - * @param loggingEvent - * @return decision for accept/deny log event - */ - def decide(loggingEvent: LoggingEvent): Int = { - val rootLevel = LogManager.getRootLogger().getLevel() - if (loggingEvent.getLevel().isGreaterOrEqual(thresholdLevel) || - !loggingEvent.getLevel().eq(rootLevel)) { - return Filter.NEUTRAL - } - var logger = loggingEvent.getLogger() - while(logger.getParent() != null) { - if (logger.getLevel() != null) { - return Filter.NEUTRAL - } - logger = logger.getParent() - } - return Filter.DENY - } - - private[spark] def setThresholdLevel(level: Level): Unit = { - thresholdLevel = level - } -} diff --git a/core/src/test/scala/org/apache/spark/internal/LoggingSuite.scala b/core/src/test/scala/org/apache/spark/internal/LoggingSuite.scala new file mode 100644 index 0000000000000..6c2a78a334eae --- /dev/null +++ b/core/src/test/scala/org/apache/spark/internal/LoggingSuite.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal + +import org.apache.log4j.{Level, Logger} +import org.apache.log4j.spi.{Filter, LoggingEvent} + +import org.apache.spark.SparkFunSuite +import org.apache.spark.util.Utils + +class LoggingSuite extends SparkFunSuite { + + test("spark-shell logging filter") { + val ssf = new SparkShellLoggingFilter() + val rootLogger = Logger.getRootLogger() + val originalLevel = rootLogger.getLevel() + rootLogger.setLevel(Level.INFO) + Logging.sparkShellThresholdLevel = Level.WARN + try { + val logger = Logger.getLogger("a.b.c.D") + val logEvent = new LoggingEvent(logger.getName(), logger, Level.INFO, "Test", null) + assert(ssf.decide(logEvent) === Filter.DENY) + + // log level is less than threshold level but different from root level + val logEvent1 = new LoggingEvent(logger.getName(), logger, Level.DEBUG, "Test", null) + assert(ssf.decide(logEvent1) != Filter.DENY) + + // custom log level configured + val parentLogger = Logger.getLogger("a.b.c") + parentLogger.setLevel(Level.INFO) + assert(ssf.decide(logEvent) != Filter.DENY) + + // log level is greater than or equal to threshold level + val logger2 = Logger.getLogger("a.b.E") + val logEvent2 = new LoggingEvent(logger2.getName(), logger2, Level.INFO, "Test", null) + Utils.setLogLevel(Level.INFO) + assert(ssf.decide(logEvent2) != Filter.DENY) + } finally { + rootLogger.setLevel(originalLevel) + } + } +} From 893ff759cff0df3aa1d2773de21e3f4adcfed0c8 Mon Sep 17 00:00:00 2001 From: ankurgupta Date: Tue, 29 Jan 2019 13:26:13 -0800 Subject: [PATCH 3/3] Review comments: Part 2 1. Changed uninitialize() method of Logging.scala 2. Other minor modifications --- .../org/apache/spark/internal/Logging.scala | 31 +++---------------- .../apache/spark/internal/LoggingSuite.scala | 2 ++ 2 files changed, 7 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/Logging.scala b/core/src/main/scala/org/apache/spark/internal/Logging.scala index b593ac0c605ba..e87720301cd2a 100644 --- a/core/src/main/scala/org/apache/spark/internal/Logging.scala +++ b/core/src/main/scala/org/apache/spark/internal/Logging.scala @@ -204,26 +204,7 @@ private[spark] object Logging { } else { val rootLogger = LogManager.getRootLogger() rootLogger.setLevel(defaultRootLevel) - rootLogger.getAllAppenders().asScala.foreach { - case ca: ConsoleAppender => - // SparkShellLoggingFilter is the last filter - ca.getFirstFilter() match { - case ssf: SparkShellLoggingFilter => - ca.clearFilters() - case f: Filter => - var previous = f - var current = previous.getNext() - while (current != null && !current.isInstanceOf[SparkShellLoggingFilter]) { - previous = current; - current = previous.getNext() - } - if (current != null) { - previous.setNext(current.getNext()) - } - case _ => // no-op - } - case _ => // no-op - } + sparkShellThresholdLevel = null } } this.initialized = false @@ -242,19 +223,17 @@ private class SparkShellLoggingFilter() extends Filter { /** * If sparkShellThresholdLevel is not defined, this filter is a no-op. - * If log level of event is lower than thresholdLevel, then the decision is made based on - * whether the log came from root or some custom configuration + * If log level of event is not equal to root level, the event is allowed. Otherwise, + * the decision is made based on whether the log came from root or some custom configuration * @param loggingEvent * @return decision for accept/deny log event */ def decide(loggingEvent: LoggingEvent): Int = { - val thresholdLevel = Logging.sparkShellThresholdLevel - if (thresholdLevel == null) { + if (Logging.sparkShellThresholdLevel == null) { return Filter.NEUTRAL } val rootLevel = LogManager.getRootLogger().getLevel() - if (loggingEvent.getLevel().isGreaterOrEqual(thresholdLevel) || - !loggingEvent.getLevel().eq(rootLevel)) { + if (!loggingEvent.getLevel().eq(rootLevel)) { return Filter.NEUTRAL } var logger = loggingEvent.getLogger() diff --git a/core/src/test/scala/org/apache/spark/internal/LoggingSuite.scala b/core/src/test/scala/org/apache/spark/internal/LoggingSuite.scala index 6c2a78a334eae..250ac3dafcabc 100644 --- a/core/src/test/scala/org/apache/spark/internal/LoggingSuite.scala +++ b/core/src/test/scala/org/apache/spark/internal/LoggingSuite.scala @@ -30,6 +30,7 @@ class LoggingSuite extends SparkFunSuite { val rootLogger = Logger.getRootLogger() val originalLevel = rootLogger.getLevel() rootLogger.setLevel(Level.INFO) + val originalThreshold = Logging.sparkShellThresholdLevel Logging.sparkShellThresholdLevel = Level.WARN try { val logger = Logger.getLogger("a.b.c.D") @@ -52,6 +53,7 @@ class LoggingSuite extends SparkFunSuite { assert(ssf.decide(logEvent2) != Filter.DENY) } finally { rootLogger.setLevel(originalLevel) + Logging.sparkShellThresholdLevel = originalThreshold } } }