diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index d4e22d739098f..c57894b9f4f8f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -130,7 +130,7 @@ private[spark] class EventLoggingListener( } override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = { - logEvent(redactEvent(event)) + logEvent(redactEvent(sparkConf, event)) } // Events that trigger a flush @@ -295,8 +295,15 @@ private[spark] class EventLoggingListener( } redactedProperties } +} + +private[spark] object EventLoggingListener extends Logging { + val DEFAULT_LOG_DIR = "/tmp/spark-events" + // Dummy stage key used by driver in executor metrics updates + val DRIVER_STAGE_KEY = (-1, -1) private[spark] def redactEvent( + sparkConf: SparkConf, event: SparkListenerEnvironmentUpdate): SparkListenerEnvironmentUpdate = { // environmentDetails maps a string descriptor to a set of properties // Similar to: @@ -312,11 +319,4 @@ private[spark] class EventLoggingListener( } SparkListenerEnvironmentUpdate(redactedProps) } - -} - -private[spark] object EventLoggingListener extends Logging { - val DEFAULT_LOG_DIR = "/tmp/spark-events" - // Dummy stage key used by driver in executor metrics updates - val DRIVER_STAGE_KEY = (-1, -1) } diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala index 51cd7d1284ff3..3520fa870c91b 100644 --- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala @@ -27,6 +27,8 @@ import com.codahale.metrics.Timer import org.apache.spark.SparkEnv import org.apache.spark.internal.{config, Logging} +import org.apache.spark.scheduler.EventLoggingListener +import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate /** * An event bus which posts events to its listeners. @@ -128,7 +130,7 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { if (maybeTimerContext != null) { val elapsed = maybeTimerContext.stop() if (logSlowEventEnabled && elapsed > logSlowEventThreshold) { - logInfo(s"Process of event ${event} by listener ${listenerName} took " + + logInfo(s"Process of event ${redactEvent(event)} by listener ${listenerName} took " + s"${elapsed / 1000000000d}s.") } } @@ -150,4 +152,12 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq } + private def redactEvent(e: E): E = { + e match { + case event: SparkListenerEnvironmentUpdate => + EventLoggingListener.redactEvent(env.conf, event).asInstanceOf[E] + case _ => e + } + } + } diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 7acb8451e3b38..240774d854c92 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -90,11 +90,11 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit val conf = getLoggingConf(testDirPath, None) .set(key, secretPassword) val hadoopconf = SparkHadoopUtil.get.newConfiguration(new SparkConf()) - val eventLogger = new EventLoggingListener("test", None, testDirPath.toUri(), conf) val envDetails = SparkEnv.environmentDetails( conf, hadoopconf, "FIFO", Seq.empty, Seq.empty, Seq.empty) val event = SparkListenerEnvironmentUpdate(envDetails) - val redactedProps = eventLogger.redactEvent(event).environmentDetails("Spark Properties").toMap + val redactedProps = EventLoggingListener + .redactEvent(conf, event).environmentDetails("Spark Properties").toMap assert(redactedProps(key) == "*********(redacted)") }