Skip to content

Commit

Permalink
[SPARK-34232][CORE] Redact SparkListenerEnvironmentUpdate event in log
Browse files Browse the repository at this point in the history
  • Loading branch information
warrenzhu25 committed Jan 26, 2021
1 parent d1177b5 commit 618ddc4
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private[spark] class EventLoggingListener(
}

override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = {
logEvent(redactEvent(event))
logEvent(redactEvent(sparkConf, event))
}

// Events that trigger a flush
Expand Down Expand Up @@ -295,8 +295,15 @@ private[spark] class EventLoggingListener(
}
redactedProperties
}
}

private[spark] object EventLoggingListener extends Logging {
val DEFAULT_LOG_DIR = "/tmp/spark-events"
// Dummy stage key used by driver in executor metrics updates
val DRIVER_STAGE_KEY = (-1, -1)

private[spark] def redactEvent(
sparkConf: SparkConf,
event: SparkListenerEnvironmentUpdate): SparkListenerEnvironmentUpdate = {
// environmentDetails maps a string descriptor to a set of properties
// Similar to:
Expand All @@ -312,11 +319,4 @@ private[spark] class EventLoggingListener(
}
SparkListenerEnvironmentUpdate(redactedProps)
}

}

private[spark] object EventLoggingListener extends Logging {
val DEFAULT_LOG_DIR = "/tmp/spark-events"
// Dummy stage key used by driver in executor metrics updates
val DRIVER_STAGE_KEY = (-1, -1)
}
12 changes: 11 additions & 1 deletion core/src/main/scala/org/apache/spark/util/ListenerBus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import com.codahale.metrics.Timer

import org.apache.spark.SparkEnv
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.scheduler.EventLoggingListener
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate

/**
* An event bus which posts events to its listeners.
Expand Down Expand Up @@ -128,7 +130,7 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
if (maybeTimerContext != null) {
val elapsed = maybeTimerContext.stop()
if (logSlowEventEnabled && elapsed > logSlowEventThreshold) {
logInfo(s"Process of event ${event} by listener ${listenerName} took " +
logInfo(s"Process of event ${redactEvent(event)} by listener ${listenerName} took " +
s"${elapsed / 1000000000d}s.")
}
}
Expand All @@ -150,4 +152,12 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq
}

private def redactEvent(e: E): E = {
e match {
case event: SparkListenerEnvironmentUpdate =>
EventLoggingListener.redactEvent(env.conf, event).asInstanceOf[E]
case _ => e
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
val conf = getLoggingConf(testDirPath, None)
.set(key, secretPassword)
val hadoopconf = SparkHadoopUtil.get.newConfiguration(new SparkConf())
val eventLogger = new EventLoggingListener("test", None, testDirPath.toUri(), conf)
val envDetails = SparkEnv.environmentDetails(
conf, hadoopconf, "FIFO", Seq.empty, Seq.empty, Seq.empty)
val event = SparkListenerEnvironmentUpdate(envDetails)
val redactedProps = eventLogger.redactEvent(event).environmentDetails("Spark Properties").toMap
val redactedProps = EventLoggingListener
.redactEvent(conf, event).environmentDetails("Spark Properties").toMap
assert(redactedProps(key) == "*********(redacted)")
}

Expand Down

0 comments on commit 618ddc4

Please sign in to comment.