Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,13 @@ private[spark] object SparkConf extends Logging {
DeprecatedConfig("spark.files.userClassPathFirst", "spark.executor.userClassPathFirst",
"1.3"),
DeprecatedConfig("spark.yarn.user.classpath.first", null, "1.3",
"Use spark.{driver,executor}.userClassPathFirst instead."))
"Use spark.{driver,executor}.userClassPathFirst instead."),
DeprecatedConfig("spark.history.fs.updateInterval",
"spark.history.fs.update.interval.seconds",
"1.3", "Use spark.history.fs.update.interval.seconds instead"),
DeprecatedConfig("spark.history.updateInterval",
"spark.history.fs.update.interval.seconds",
"1.3", "Use spark.history.fs.update.interval.seconds instead"))
configs.map { x => (x.oldName, x) }.toMap
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@

package org.apache.spark.deploy.history

import java.io.{BufferedInputStream, FileNotFoundException, InputStream}
import java.io.{IOException, BufferedInputStream, FileNotFoundException, InputStream}
import java.util.concurrent.{Executors, TimeUnit}

import scala.collection.mutable
import scala.concurrent.duration.Duration

import com.google.common.util.concurrent.ThreadFactoryBuilder

import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.fs.permission.AccessControlException
Expand All @@ -44,17 +48,27 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private val NOT_STARTED = "<Not Started>"

// Interval between each check for event log updates
private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval",
conf.getInt("spark.history.updateInterval", 10)) * 1000
private val UPDATE_INTERVAL_MS = conf.getOption("spark.history.fs.update.interval.seconds")
.orElse(conf.getOption(SparkConf.translateConfKey("spark.history.fs.updateInterval", true)))
.orElse(conf.getOption(SparkConf.translateConfKey("spark.history.updateInterval", true)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't call translateConfKey here, but I realize we need to do so if we want to warn the user. I will submit a separate patch to fix this behavior. In general I think the translateConfKey method should be private to SparkConf.

.map(_.toInt)
.getOrElse(10) * 1000

// Interval between each cleaner checks for event logs to delete
private val CLEAN_INTERVAL_MS = conf.getLong("spark.history.fs.cleaner.interval.seconds",
DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S) * 1000

private val logDir = conf.getOption("spark.history.fs.logDirectory")
.map { d => Utils.resolveURI(d).toString }
.getOrElse(DEFAULT_LOG_DIR)

private val fs = Utils.getHadoopFileSystem(logDir, SparkHadoopUtil.get.newConfiguration(conf))

// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheckTimeMs = -1L
// Used by check event thread and clean log thread.
// Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs
// and applications between check task and clean task.
private val pool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about some comment on what this pool is used for

.setNameFormat("spark-history-task-%d").setDaemon(true).build())

// The modification time of the newest log detected during the last scan. This is used
// to ignore logs that are older during subsequent scans, to avoid processing data that
Expand All @@ -73,25 +87,13 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"

/**
* A background thread that periodically checks for event log updates on disk.
*
* If a log check is invoked manually in the middle of a period, this thread re-adjusts the
* time at which it performs the next log check to maintain the same period as before.
*
* TODO: Add a mechanism to update manually.
* Return a runnable that performs the given operation on the event logs.
* This operation is expected to be executed periodically.
*/
private val logCheckingThread = new Thread("LogCheckingThread") {
override def run() = Utils.logUncaughtExceptions {
while (true) {
val now = getMonotonicTimeMs()
if (now - lastLogCheckTimeMs > UPDATE_INTERVAL_MS) {
Thread.sleep(UPDATE_INTERVAL_MS)
} else {
// If the user has manually checked for logs recently, wait until
// UPDATE_INTERVAL_MS after the last check time
Thread.sleep(lastLogCheckTimeMs + UPDATE_INTERVAL_MS - now)
}
checkForLogs()
private def getRunner(operateFun: () => Unit): Runnable = {
new Runnable() {
override def run() = Utils.logUncaughtExceptions {
operateFun()
}
}
}
Expand All @@ -113,12 +115,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
"Logging directory specified is not a directory: %s".format(logDir))
}

checkForLogs()

// Disable the background thread during tests.
if (!conf.contains("spark.testing")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why this was turned off for testing, but it doesn't seem like you want to change that behavior, do you?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got it.

logCheckingThread.setDaemon(true)
logCheckingThread.start()
// A task that periodically checks for event log updates on disk.
pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_MS,
TimeUnit.MILLISECONDS)

if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
// A task that periodically cleans event logs on disk.
pool.scheduleAtFixedRate(getRunner(cleanLogs), 0, CLEAN_INTERVAL_MS,
TimeUnit.MILLISECONDS)
}
}
}

Expand Down Expand Up @@ -163,9 +170,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
* applications that haven't been updated since last time the logs were checked.
*/
private[history] def checkForLogs(): Unit = {
lastLogCheckTimeMs = getMonotonicTimeMs()
logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs))

try {
var newLastModifiedTime = lastModifiedTime
val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
Expand Down Expand Up @@ -230,6 +234,45 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
}
}

/**
* Delete event logs from the log directory according to the clean policy defined by the user.
*/
private def cleanLogs(): Unit = {
try {
val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
.getOrElse(Seq[FileStatus]())
val maxAge = conf.getLong("spark.history.fs.cleaner.maxAge.seconds",
DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000

val now = System.currentTimeMillis()
val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()

applications.values.foreach { info =>
if (now - info.lastUpdated <= maxAge) {
appsToRetain += (info.id -> info)
}
}

applications = appsToRetain

// Scan all logs from the log directory.
// Only directories older than the specified max age will be deleted
statusList.foreach { dir =>
try {
if (now - dir.getModificationTime() > maxAge) {
// if path is a directory and set to true,
// the directory is deleted else throws an exception
fs.delete(dir.getPath, true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a quick comment of what true means here:

fs.delete(dir.getPath, true /* recursive */)

}
} catch {
case t: IOException => logError(s"IOException in cleaning logs of $dir", t)
}
}
} catch {
case t: Exception => logError("Exception in cleaning logs", t)
}
}

/**
* Comparison function that defines the sort order for the application listing.
*
Expand Down Expand Up @@ -336,9 +379,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
}
}

/** Returns the system's mononotically increasing time. */
private def getMonotonicTimeMs(): Long = System.nanoTime() / (1000 * 1000)

/**
* Return true when the application has completed.
*/
Expand All @@ -354,6 +394,12 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis

private object FsHistoryProvider {
val DEFAULT_LOG_DIR = "file:/tmp/spark-events"

// One day
val DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S = Duration(1, TimeUnit.DAYS).toSeconds

// One week
val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = Duration(7, TimeUnit.DAYS).toSeconds
}

private class FsApplicationHistoryInfo(
Expand Down
25 changes: 24 additions & 1 deletion docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ follows:
</td>
</tr>
<tr>
<td>spark.history.fs.updateInterval</td>
<td>spark.history.fs.update.interval.seconds</td>
<td>10</td>
<td>
The period, in seconds, at which information displayed by this history server is updated.
Expand Down Expand Up @@ -145,6 +145,29 @@ follows:
If disabled, no access control checks are made.
</td>
</tr>
<tr>
<td>spark.history.fs.cleaner.enabled</td>
<td>false</td>
<td>
Specifies whether the History Server should periodically clean up event logs from storage.
</td>
</tr>
<tr>
<td>spark.history.fs.cleaner.interval.seconds</td>
<td>86400</td>
<td>
How often the job history cleaner checks for files to delete, in seconds. Defaults to 86400 (one day).
Files are only deleted if they are older than spark.history.fs.cleaner.maxAge.seconds.
</td>
</tr>
<tr>
<td>spark.history.fs.cleaner.maxAge.seconds</td>
<td>3600 * 24 * 7</td>
<td>
Job history files older than this many seconds will be deleted when the history cleaner runs.
Defaults to 3600 * 24 * 7 (1 week).
</td>
</tr>
</table>

Note that in all of these UIs, the tables are sortable by clicking their headers,
Expand Down