Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ class SparkContext(config: SparkConf) extends Logging {
def appName: String = _conf.get("spark.app.name")

private[spark] def isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false)
private[spark] def isEventLogAsync: Boolean = _conf.getBoolean("spark.eventLog.async", false)

private[spark] def eventLogDir: Option[URI] = _eventLogDir
private[spark] def eventLogCodec: Option[String] = _eventLogCodec

Expand Down Expand Up @@ -525,9 +527,7 @@ class SparkContext(config: SparkConf) extends Logging {

_eventLogger =
if (isEventLogEnabled) {
val logger =
new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
_conf, _hadoopConfiguration)
val logger = getEventLogger(isEventLogAsync)
logger.start()
listenerBus.addListener(logger)
Some(logger)
Expand Down Expand Up @@ -593,6 +593,22 @@ class SparkContext(config: SparkConf) extends Logging {
}
}

private def getEventLogger(async: Boolean): EventLoggingListener = {
if (async) {
val queueSize = _conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)
new AsynchronousEventLoggingListener(_applicationId,
_applicationAttemptId,
_eventLogDir.get,
_conf,
_hadoopConfiguration,
queueSize)
}
else {
new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
_conf, _hadoopConfiguration)
}
}

/**
* Called by the web UI to obtain executor thread dumps. This method may be expensive.
* Logs an error and returns None if we failed to obtain a thread dump, which could occur due
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ package object config {
.checkValue(_ > 0, "The capacity of listener bus event queue must not be negative")
.createWithDefault(10000)

private[spark] val LISTENER_BUS_EVENT_QUEUE_DROP =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.drop")
.booleanConf
.createWithDefault(true)

// This property sets the root namespace for metrics reporting
private[spark] val METRICS_NAMESPACE = ConfigBuilder("spark.metrics.namespace")
.stringConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io._
import java.net.URI
import java.nio.charset.StandardCharsets
import java.util.Locale
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -131,20 +132,24 @@ private[spark] class EventLoggingListener(
}

/** Log the event as JSON. */
private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) {
protected def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) {
val eventJson = JsonProtocol.sparkEventToJson(event)
// scalastyle:off println
writer.foreach(_.println(compact(render(eventJson))))
// scalastyle:on println
if (flushLogger) {
writer.foreach(_.flush())
hadoopDataStream.foreach(_.hflush())
flush()
}
if (testing) {
loggedEvents += eventJson
}
}

private def flush(): Unit = {
writer.foreach(_.flush())
hadoopDataStream.foreach(_.hflush())
}

// Events that do not trigger a flush
override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = logEvent(event)

Expand Down Expand Up @@ -227,6 +232,7 @@ private[spark] class EventLoggingListener(
* ".inprogress" suffix.
*/
def stop(): Unit = {
flush()
writer.foreach(_.close())

val target = new Path(logPath)
Expand All @@ -250,6 +256,8 @@ private[spark] class EventLoggingListener(
}
}



private[spark] def redactEvent(
event: SparkListenerEnvironmentUpdate): SparkListenerEnvironmentUpdate = {
// environmentDetails maps a string descriptor to a set of properties
Expand All @@ -267,11 +275,89 @@ private[spark] class EventLoggingListener(

}

private[spark] sealed class AsynchronousEventLoggingListener(
appId: String,
appAttemptId : Option[String],
logBaseDir: URI,
sparkConf: SparkConf,
hadoopConf: Configuration,
val bufferSize: Int)
extends EventLoggingListener(appId, appAttemptId, logBaseDir, sparkConf, hadoopConf) {
import EventLoggingListener._

private lazy val eventBuffer = new Array[SparkListenerEvent](bufferSize)

private val numberOfEvents = new AtomicInteger(0)

@volatile private var writeIndex = 0
@volatile private var readIndex = 0
@volatile private var stopThread = false
@volatile private var lastReportTimestamp = 0L
@volatile private var numberOfDrop = 0
@volatile private var lastFlushEvent = 0

private val listenerThread = new Thread(THREAD_NAME) {
setDaemon(true)
override def run(): Unit = {
while (!stopThread || numberOfEvents.get() > 0) {
if (numberOfEvents.get() > 0) {
executelogEvent(eventBuffer(readIndex), lastFlushEvent == FLUSH_FREQUENCY)
numberOfEvents.decrementAndGet()
readIndex = (readIndex + 1) % bufferSize
if (lastFlushEvent == FLUSH_FREQUENCY) {
lastFlushEvent = 0
} else {
lastFlushEvent = lastFlushEvent + 1
}
} else {
Thread.sleep(20) // give more chance for producer thread to be scheduled
}
}
}
}

private def executelogEvent(event: SparkListenerEvent, flushLogger: Boolean) =
super.logEvent(event, flushLogger)

override protected def logEvent(event: SparkListenerEvent, flushLogger: Boolean): Unit = {
if (numberOfEvents.get() < bufferSize) {
eventBuffer(writeIndex) = event
numberOfEvents.incrementAndGet()
writeIndex = (writeIndex + 1) % bufferSize
} else {
numberOfDrop = numberOfDrop + 1
if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
val prevLastReportTimestamp = lastReportTimestamp
lastReportTimestamp = System.currentTimeMillis()
logWarning(
s"dropped $numberOfDrop SparkListenerEvents since " +
new java.util.Date(prevLastReportTimestamp))
numberOfDrop = 0
}
}
}

override def start(): Unit = {
super.start()
listenerThread.start()
}

override def stop(): Unit = {
stopThread = true
listenerThread.join()
super.stop()
}

}

private[spark] object EventLoggingListener extends Logging {
// Suffix applied to the names of files still being written by applications.
val IN_PROGRESS = ".inprogress"
val DEFAULT_LOG_DIR = "/tmp/spark-events"

val THREAD_NAME = "EventLoggingListener"
val FLUSH_FREQUENCY = 200

private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort)

// A cache for compression codecs to avoid creating the same codec many times
Expand Down
Loading