Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
ae0b8b2
SPARK-7889 cache with initial unit tests
steveloughran Jun 22, 2015
72945e2
SPARK-7889 cache tests of refresh interval working.
steveloughran Jun 22, 2015
6764f49
SPARK-7889 switch to explicit scan of SparkUI attempts to determine c…
steveloughran Jun 22, 2015
bd47afb
SPARK-7889 use spark.history.cache.interval as config option; set def…
steveloughran Jun 22, 2015
aaadf64
SPARK-7889 resync with trunk & slightly improve code layout
steveloughran Nov 23, 2015
fe6d9e0
SPARK-7889 scalastyle
steveloughran Nov 23, 2015
f69d391
SPARK-7889 scalastyle on ApplicationCacheSuite
steveloughran Nov 23, 2015
334fc91
SPARK-7889 cleanup of comments and imports & the like
steveloughran Nov 23, 2015
c19fee2
SPARK-7889 intermin checkpoint on changes; about to move cache map fr…
steveloughran Nov 24, 2015
ec5b652
SPARK-7889 Cache designed to ask history server/provider for updates;…
steveloughran Nov 24, 2015
feda232
SPARK-7889 playing losing battles with a test
steveloughran Nov 24, 2015
04d8c64
SPARK-7889 test working, verified other tests in package work, review…
steveloughran Nov 24, 2015
9a7ca9f
SPARK-7889 - stylecheck on tests apparently skipped on mvn
steveloughran Nov 24, 2015
a1024aa
SPARK-7889 style and javadoc only
steveloughran Nov 25, 2015
6e0e26d
SPARK-7889 pull out metrics into own class, make visible for testing
steveloughran Nov 25, 2015
b3c7069
history server web UI update test from @squito
steveloughran Nov 25, 2015
ea2afbb
SPARK-7889 more on the cache suite tests: there is no evidence that e…
steveloughran Nov 25, 2015
d113c5b
SPARK-7889: tests to make sure app eviction is taking place and that …
steveloughran Nov 25, 2015
a128d8c
SPARK-7889 code style check
steveloughran Nov 25, 2015
f1c7fe5
SPARK-7889: adding return type of tests embedded inside a test method…
steveloughran Nov 25, 2015
07f1af4
SPARK-7889 starting to add web filters
steveloughran Dec 1, 2015
a33bdd7
SPARK-7889 ongoing test dev. Looks like the history provider isn't pi…
steveloughran Dec 1, 2015
9166ba6
SPARK-7889: looks like the test is failing because a new history isn'…
steveloughran Dec 1, 2015
523390a
SPARK-7889 address scalastyle warnings
steveloughran Dec 1, 2015
9831ad4
SPARK-7889 : we aren't getting an updated log file on the second para…
steveloughran Dec 1, 2015
6fdaab1
SPARK-7889: filesize update time included in probe; update thread als…
steveloughran Dec 2, 2015
163e218
SPARK-7889: not all filesystems update modtime on a rename; the Event…
steveloughran Dec 2, 2015
bc3a2e3
SPARK-7889 cache update works in tests -unreliably. Traces imply that…
steveloughran Dec 3, 2015
f81cfe1
SPARK-7889 still looking at a race condition in the test. This adds m…
steveloughran Dec 3, 2015
78a463e
SPARK-7889 moved off time differences to a simple generational counte…
steveloughran Dec 3, 2015
61e2e8b
SPARK-7889: this extends the test with
steveloughran Dec 3, 2015
aab129f
SPARK-7889 remove obsolete method that had been commented out
steveloughran Dec 3, 2015
76e7a27
SPARK-7889 review work, plus update documentation for this change and…
steveloughran Dec 4, 2015
5e583db
SPARK-7889 review changes
steveloughran Dec 8, 2015
cfa080c
SPARK-7889 review suggestions, especially removing all timestamp info…
steveloughran Dec 8, 2015
f6bd84d
[SPARK-7889] add a check of the REST API's app/jobs link too
steveloughran Dec 23, 2015
de940d7
[SPARK-7889]: provider is required to return a probe to be invoked d…
steveloughran Dec 23, 2015
4cf3b23
[SPARK-7889] review comments
steveloughran Dec 23, 2015
e99af36
SPARK-7889 lookup with handling for NoSuchElement moved into Applicat…
steveloughran Jan 13, 2016
5091e92
SPARK-7889 javadocs
steveloughran Jan 15, 2016
9f2318d
SPARK-7889 update probe is now a simple () => Boolean function
steveloughran Jan 20, 2016
d258445
[SPARK-7889] cleanup up merges after rebase. Test now failing
steveloughran Feb 4, 2016
c0250e6
[SPARK-7889] minor diagnostics enhancements; cause of test failures i…
steveloughran Feb 4, 2016
2e461e8
[SPARK-7889] working on tests
steveloughran Feb 4, 2016
80fbcf2
[SPARK-7889] switch to JSON API for listing incomplete/complete apps
steveloughran Feb 5, 2016
728b12c
[SPARK-7889] add squito's code & test for redirection on ? parameters
steveloughran Feb 5, 2016
6308a8e
simplify FSHistoryProvider
squito Feb 8, 2016
6be8faa
style
squito Feb 8, 2016
bfbf348
just some cleanup
squito Feb 8, 2016
d4740bc
mima update
squito Feb 8, 2016
488da80
refresh apps based on the log size, not on their modtime, since that …
squito Feb 8, 2016
bb737ec
cleanup from comments
squito Feb 11, 2016
2286aa8
more cleanup
squito Feb 11, 2016
04f5385
remove 'spark.history.cache.window' since those updates are now cheap
squito Feb 11, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,42 @@ private[spark] case class ApplicationAttemptInfo(
private[spark] case class ApplicationHistoryInfo(
id: String,
name: String,
attempts: List[ApplicationAttemptInfo])
attempts: List[ApplicationAttemptInfo]) {

/**
* Has this application completed?
* @return true if the most recent attempt has completed
*/
def completed: Boolean = {
attempts.nonEmpty && attempts.head.completed
}
}

/**
* A probe which can be invoked to see if a loaded Web UI has been updated.
* The probe is expected to be relative purely to that of the UI returned
* in the same [[LoadedAppUI]] instance. That is, whenever a new UI is loaded,
* the probe returned with it is the one that must be used to check for it
* being out of date; previous probes must be discarded.
*/
private[history] abstract class HistoryUpdateProbe {
/**
* Return true if the history provider has a later version of the application
* attempt than the one against this probe was constructed.
* @return
*/
def isUpdated(): Boolean
}

/**
* All the information returned from a call to `getAppUI()`: the new UI
* and any required update state.
* @param ui Spark UI
* @param updateProbe probe to call to check on the update state of this application attempt
*/
private[history] case class LoadedAppUI(
ui: SparkUI,
updateProbe: () => Boolean)

private[history] abstract class ApplicationHistoryProvider {

Expand All @@ -49,9 +84,10 @@ private[history] abstract class ApplicationHistoryProvider {
*
* @param appId The application ID.
* @param attemptId The application attempt ID (or None if there is no attempt ID).
* @return The application's UI, or None if application is not found.
* @return a [[LoadedAppUI]] instance containing the application's UI and any state information
* for update probes, or `None` if the application/attempt is not found.
*/
def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI]
def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI]

/**
* Called when the server is shutting down.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy.history

import java.io.{BufferedInputStream, FileNotFoundException, InputStream, IOException, OutputStream}
import java.io.{FileNotFoundException, IOException, OutputStream}
import java.util.UUID
import java.util.concurrent.{Executors, ExecutorService, TimeUnit}
import java.util.zip.{ZipEntry, ZipOutputStream}
Expand All @@ -33,7 +33,6 @@ import org.apache.hadoop.security.AccessControlException

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
Expand All @@ -42,6 +41,31 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
* A class that provides application history from event logs stored in the file system.
* This provider checks for new finished applications in the background periodically and
* renders the history application UI by parsing the associated event logs.
*
* == How new and updated attempts are detected ==
*
* - New attempts are detected in [[checkForLogs]]: the log dir is scanned, and any
* entries in the log dir whose modification time is greater than the last scan time
* are considered new or updated. These are replayed to create a new [[FsApplicationAttemptInfo]]
* entry and update or create a matching [[FsApplicationHistoryInfo]] element in the list
* of applications.
* - Updated attempts are also found in [[checkForLogs]] -- if the attempt's log file has grown, the
* [[FsApplicationAttemptInfo]] is replaced by another one with a larger log size.
* - When [[updateProbe()]] is invoked to check if a loaded [[SparkUI]]
* instance is out of date, the log size of the cached instance is checked against the app last
* loaded by [[checkForLogs]].
*
* The use of log size, rather than simply relying on modification times, is needed to
* address the following issues
* - some filesystems do not appear to update the `modtime` value whenever data is flushed to
* an open file output stream. Changes to the history may not be picked up.
* - the granularity of the `modtime` field may be 2+ seconds. Rapid changes to the FS can be
* missed.
*
* Tracking filesize works given the following invariant: the logs get bigger
* as new events are added. If a format was used in which this did not hold, the mechanism would
* break. Simple streaming of JSON-formatted events, as is implemented today, implicitly
* maintains this invariant.
*/
private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
extends ApplicationHistoryProvider with Logging {
Expand Down Expand Up @@ -77,16 +101,17 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private val pool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
.setNameFormat("spark-history-task-%d").setDaemon(true).build())

// The modification time of the newest log detected during the last scan. This is used
// to ignore logs that are older during subsequent scans, to avoid processing data that
// is already known.
// The modification time of the newest log detected during the last scan. Currently only
// used for logging msgs (logs are re-scanned based on file size, rather than modtime)
private var lastScanTime = -1L

// Mapping of application IDs to their metadata, in descending end time order. Apps are inserted
// into the map in order, so the LinkedHashMap maintains the correct ordering.
@volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
= new mutable.LinkedHashMap()

val fileToAppInfo = new mutable.HashMap[Path, FsApplicationAttemptInfo]()

// List of application logs to be deleted by event log cleaner.
private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]

Expand Down Expand Up @@ -176,18 +201,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// Disable the background thread during tests.
if (!conf.contains("spark.testing")) {
// A task that periodically checks for event log updates on disk.
logDebug(s"Scheduling update thread every $UPDATE_INTERVAL_S seconds")
pool.scheduleWithFixedDelay(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS)

if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
// A task that periodically cleans event logs on disk.
pool.scheduleWithFixedDelay(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS)
}
} else {
logDebug("Background update thread disabled for testing")
}
}

override def getListing(): Iterable[FsApplicationHistoryInfo] = applications.values

override def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI] = {
override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = {
try {
applications.get(appId).flatMap { appInfo =>
appInfo.attempts.find(_.attemptId == attemptId).flatMap { attempt =>
Expand All @@ -210,7 +238,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
ui.getSecurityManager.setViewAcls(attempt.sparkUser,
appListener.viewAcls.getOrElse(""))
ui
LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize))
}
}
}
Expand Down Expand Up @@ -243,12 +271,15 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private[history] def checkForLogs(): Unit = {
try {
val newLastScanTime = getNewLastScanTime()
logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
.getOrElse(Seq[FileStatus]())
// scan for modified applications, replay and merge them
val logInfos: Seq[FileStatus] = statusList
.filter { entry =>
try {
!entry.isDirectory() && (entry.getModificationTime() >= lastScanTime)
val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
!entry.isDirectory() && prevFileSize < entry.getLen()
} catch {
case e: AccessControlException =>
// Do not use "logInfo" since these messages can get pretty noisy if printed on
Expand All @@ -262,6 +293,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
entry1.getModificationTime() >= entry2.getModificationTime()
}

if (logInfos.nonEmpty) {
logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}")
}
logInfos.grouped(20)
.map { batch =>
replayExecutor.submit(new Runnable {
Expand Down Expand Up @@ -356,7 +390,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val bus = new ReplayListenerBus()
val res = replay(fileStatus, bus)
res match {
case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully.")
case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully: $r")
case None => logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
"The application may have not started.")
}
Expand Down Expand Up @@ -511,6 +545,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
val logPath = eventLog.getPath()
logInfo(s"Replaying log path: $logPath")
// Note that the eventLog may have *increased* in size since when we grabbed the filestatus,
// and when we read the file here. That is OK -- it may result in an unnecessary refresh
// when there is no update, but will not result in missing an update. We *must* prevent
// an error the other way -- if we report a size bigger (ie later) than the file that is
// actually read, we may never refresh the app. FileStatus is guaranteed to be static
// after it's created, so we get a file size that is no bigger than what is actually read.
val logInput = EventLoggingListener.openEventLog(logPath, fs)
try {
val appListener = new ApplicationEventListener
Expand All @@ -521,7 +561,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// Without an app ID, new logs will render incorrectly in the listing page, so do not list or
// try to show their UI.
if (appListener.appId.isDefined) {
Some(new FsApplicationAttemptInfo(
val attemptInfo = new FsApplicationAttemptInfo(
logPath.getName(),
appListener.appName.getOrElse(NOT_STARTED),
appListener.appId.getOrElse(logPath.getName()),
Expand All @@ -530,7 +570,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
appListener.endTime.getOrElse(-1L),
eventLog.getModificationTime(),
appListener.sparkUser.getOrElse(NOT_STARTED),
appCompleted))
appCompleted,
eventLog.getLen()
)
fileToAppInfo(logPath) = attemptInfo
Some(attemptInfo)
} else {
None
}
Expand Down Expand Up @@ -564,12 +608,77 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_GET)
}

/**
* String description for diagnostics
* @return a summary of the component state
*/
override def toString: String = {
val header = s"""
| FsHistoryProvider: logdir=$logDir,
| last scan time=$lastScanTime
| Cached application count =${applications.size}}
""".stripMargin
val sb = new StringBuilder(header)
applications.foreach(entry => sb.append(entry._2).append("\n"))
sb.toString
}

/**
* Look up an application attempt
* @param appId application ID
* @param attemptId Attempt ID, if set
* @return the matching attempt, if found
*/
def lookup(appId: String, attemptId: Option[String]): Option[FsApplicationAttemptInfo] = {
applications.get(appId).flatMap { appInfo =>
appInfo.attempts.find(_.attemptId == attemptId)
}
}

/**
* Return true iff a newer version of the UI is available. The check is based on whether the
* fileSize for the currently loaded UI is smaller than the file size the last time
* the logs were loaded.
*
* This is a very cheap operation -- the work of loading the new attempt was already done
* by [[checkForLogs]].
* @param appId application to probe
* @param attemptId attempt to probe
* @param prevFileSize the file size of the logs for the currently displayed UI
*/
private def updateProbe(
appId: String,
attemptId: Option[String],
prevFileSize: Long)(): Boolean = {
lookup(appId, attemptId) match {
case None =>
logDebug(s"Application Attempt $appId/$attemptId not found")
false
case Some(latest) =>
prevFileSize < latest.fileSize
}
}
}

private[history] object FsHistoryProvider {
val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
}

/**
* Application attempt information.
*
* @param logPath path to the log file, or, for a legacy log, its directory
* @param name application name
* @param appId application ID
* @param attemptId optional attempt ID
* @param startTime start time (from playback)
* @param endTime end time (from playback). -1 if the application is incomplete.
* @param lastUpdated the modification time of the log file when this entry was built by replaying
* the history.
* @param sparkUser user running the application
* @param completed flag to indicate whether or not the application has completed.
* @param fileSize the size of the log file the last time the file was scanned for changes
*/
private class FsApplicationAttemptInfo(
val logPath: String,
val name: String,
Expand All @@ -579,10 +688,24 @@ private class FsApplicationAttemptInfo(
endTime: Long,
lastUpdated: Long,
sparkUser: String,
completed: Boolean = true)
completed: Boolean,
val fileSize: Long)
extends ApplicationAttemptInfo(
attemptId, startTime, endTime, lastUpdated, sparkUser, completed)
attemptId, startTime, endTime, lastUpdated, sparkUser, completed) {

/** extend the superclass string value with the extra attributes of this class */
override def toString: String = {
s"FsApplicationAttemptInfo($name, $appId," +
s" ${super.toString}, source=$logPath, size=$fileSize"
}
}

/**
* Application history information
* @param id application ID
* @param name application name
* @param attempts list of attempts, most recent first.
*/
private class FsApplicationHistoryInfo(
id: String,
override val name: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean

val allApps = parent.getApplicationList()
.filter(_.attempts.head.completed != requestedIncomplete)
.filter(_.completed != requestedIncomplete)
val allAppsSize = allApps.size

val providerConfig = parent.getProviderConfig()
Expand Down
Loading