Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.util.zip.{ZipEntry, ZipOutputStream}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.Try
import scala.xml.Node

import com.fasterxml.jackson.annotation.JsonIgnore
Expand All @@ -39,11 +40,13 @@ import org.fusesource.leveldbjni.internal.NativeDB
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.io.CompressionCodec
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.ReplayListenerBus._
import org.apache.spark.status._
import org.apache.spark.status.KVUtils._
import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo}
import org.apache.spark.status.config._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
import org.apache.spark.util.kvstore._
Expand Down Expand Up @@ -149,6 +152,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}.getOrElse(new InMemoryStore())

private val diskManager = storePath.map { path =>
new HistoryServerDiskManager(conf, path, listing, clock)
}

private val activeUIs = new mutable.HashMap[(String, Option[String]), LoadedAppUI]()

/**
Expand Down Expand Up @@ -219,6 +226,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

private def startPolling(): Unit = {
diskManager.foreach(_.initialize())

// Validate the log directory.
val path = new Path(logDir)
try {
Expand Down Expand Up @@ -299,63 +308,24 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
attempt.adminAclsGroups.getOrElse(""))
secManager.setViewAclsGroups(attempt.viewAclsGroups.getOrElse(""))

val uiStorePath = storePath.map { path => getStorePath(path, appId, attemptId) }
val kvstore = try {
diskManager match {
case Some(sm) =>
loadDiskStore(sm, appId, attempt)

val (kvstore, needReplay) = uiStorePath match {
case Some(path) =>
try {
// The store path is not guaranteed to exist - maybe it hasn't been created, or was
// invalidated because changes to the event log were detected. Need to replay in that
// case.
val _replay = !path.isDirectory()
(createDiskStore(path, conf), _replay)
} catch {
case e: Exception =>
// Get rid of the old data and re-create it. The store is either old or corrupted.
logWarning(s"Failed to load disk store $uiStorePath for $appId.", e)
Utils.deleteRecursively(path)
(createDiskStore(path, conf), true)
}

case _ =>
(new InMemoryStore(), true)
}

val plugins = ServiceLoader.load(
classOf[AppHistoryServerPlugin], Utils.getContextOrSparkClassLoader).asScala
val trackingStore = new ElementTrackingStore(kvstore, conf)
if (needReplay) {
val replayBus = new ReplayListenerBus()
val listener = new AppStatusListener(trackingStore, conf, false,
lastUpdateTime = Some(attempt.info.lastUpdated.getTime()))
replayBus.addListener(listener)
for {
plugin <- plugins
listener <- plugin.createListeners(conf, trackingStore)
} replayBus.addListener(listener)
try {
val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath))
replay(fileStatus, isApplicationCompleted(fileStatus), replayBus)
trackingStore.close(false)
} catch {
case e: Exception =>
Utils.tryLogNonFatalError {
trackingStore.close()
}
uiStorePath.foreach(Utils.deleteRecursively)
if (e.isInstanceOf[FileNotFoundException]) {
return None
} else {
throw e
}
case _ =>
createInMemoryStore(attempt)
}
} catch {
case _: FileNotFoundException =>
return None
}

val ui = SparkUI.create(None, new AppStatusStore(kvstore), conf, secManager, app.info.name,
HistoryServer.getAttemptURI(appId, attempt.info.attemptId),
attempt.info.startTime.getTime(),
attempt.info.appSparkVersion)
plugins.foreach(_.setupUI(ui))
loadPlugins().foreach(_.setupUI(ui))

val loadedUI = LoadedAppUI(ui)

Expand Down Expand Up @@ -417,11 +387,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
loadedUI.lock.writeLock().unlock()
}

// If the UI is not valid, delete its files from disk, if any. This relies on the fact that
// ApplicationCache will never call this method concurrently with getAppUI() for the same
// appId / attemptId.
if (!loadedUI.valid && storePath.isDefined) {
Utils.deleteRecursively(getStorePath(storePath.get, appId, attemptId))
diskManager.foreach { dm =>
// If the UI is not valid, delete its files from disk, if any. This relies on the fact that
// ApplicationCache will never call this method concurrently with getAppUI() for the same
// appId / attemptId.
dm.release(appId, attemptId, delete = !loadedUI.valid)
}
}
}
Expand Down Expand Up @@ -569,12 +539,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

val logPath = fileStatus.getPath()

val bus = new ReplayListenerBus()
val listener = new AppListingListener(fileStatus, clock)
bus.addListener(listener)
replay(fileStatus, bus, eventsFilter = eventsFilter)

replay(fileStatus, isApplicationCompleted(fileStatus), bus, eventsFilter)
listener.applicationInfo.foreach { app =>
// Invalidate the existing UI for the reloaded app attempt, if any. See LoadedAppUI for a
// discussion on the UI lifecycle.
Expand Down Expand Up @@ -651,10 +620,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
*/
private def replay(
eventLog: FileStatus,
appCompleted: Boolean,
bus: ReplayListenerBus,
eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = {
val logPath = eventLog.getPath()
val isCompleted = !logPath.getName().endsWith(EventLoggingListener.IN_PROGRESS)
logInfo(s"Replaying log path: $logPath")
// Note that the eventLog may have *increased* in size since when we grabbed the filestatus,
// and when we read the file here. That is OK -- it may result in an unnecessary refresh
Expand All @@ -664,18 +633,44 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// after it's created, so we get a file size that is no bigger than what is actually read.
val logInput = EventLoggingListener.openEventLog(logPath, fs)
try {
bus.replay(logInput, logPath.toString, !appCompleted, eventsFilter)
logInfo(s"Finished replaying $logPath")
bus.replay(logInput, logPath.toString, !isCompleted, eventsFilter)
logInfo(s"Finished parsing $logPath")
} finally {
logInput.close()
}
}

/**
* Return true when the application has completed.
* Rebuilds the application state store from its event log.
*/
private def isApplicationCompleted(entry: FileStatus): Boolean = {
!entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS)
private def rebuildAppStore(
store: KVStore,
eventLog: FileStatus,
lastUpdated: Long): Unit = {
// Disable async updates, since they cause higher memory usage, and it's ok to take longer
// to parse the event logs in the SHS.
val replayConf = conf.clone().set(ASYNC_TRACKING_ENABLED, false)
val trackingStore = new ElementTrackingStore(store, replayConf)
val replayBus = new ReplayListenerBus()
val listener = new AppStatusListener(trackingStore, replayConf, false,
lastUpdateTime = Some(lastUpdated))
replayBus.addListener(listener)

for {
plugin <- loadPlugins()
listener <- plugin.createListeners(conf, trackingStore)
} replayBus.addListener(listener)

try {
replay(eventLog, replayBus)
trackingStore.close(false)
} catch {
case e: Exception =>
Utils.tryLogNonFatalError {
trackingStore.close()
}
throw e
}
}

/**
Expand Down Expand Up @@ -751,14 +746,58 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
listing.write(newAppInfo)
}

private def createDiskStore(path: File, conf: SparkConf): KVStore = {
private def loadDiskStore(
dm: HistoryServerDiskManager,
appId: String,
attempt: AttemptInfoWrapper): KVStore = {
val metadata = new AppStatusStoreMetadata(AppStatusStore.CURRENT_VERSION)
KVUtils.open(path, metadata)

// First check if the store already exists and try to open it. If that fails, then get rid of
// the existing data.
dm.openStore(appId, attempt.info.attemptId).foreach { path =>
try {
return KVUtils.open(path, metadata)
} catch {
case e: Exception =>
logInfo(s"Failed to open existing store for $appId/${attempt.info.attemptId}.", e)
dm.release(appId, attempt.info.attemptId, delete = true)
}
}

// At this point the disk data either does not exist or was deleted because it failed to
// load, so the event log needs to be replayed.
val status = fs.getFileStatus(new Path(logDir, attempt.logPath))
val isCompressed = EventLoggingListener.codecName(status.getPath()).flatMap { name =>
Try(CompressionCodec.getShortName(name)).toOption
}.isDefined
logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...")
val lease = dm.lease(status.getLen(), isCompressed)
val newStorePath = try {
val store = KVUtils.open(lease.tmpPath, metadata)
try {
rebuildAppStore(store, status, attempt.info.lastUpdated.getTime())
} finally {
store.close()
}
lease.commit(appId, attempt.info.attemptId)
} catch {
case e: Exception =>
lease.rollback()
throw e
}

KVUtils.open(newStorePath, metadata)
}

private def createInMemoryStore(attempt: AttemptInfoWrapper): KVStore = {
val store = new InMemoryStore()
val status = fs.getFileStatus(new Path(logDir, attempt.logPath))
rebuildAppStore(store, status, attempt.info.lastUpdated.getTime())
store
}

private def getStorePath(path: File, appId: String, attemptId: Option[String]): File = {
val fileName = appId + attemptId.map("_" + _).getOrElse("") + ".ldb"
new File(path, fileName)
private def loadPlugins(): Iterable[AppHistoryServerPlugin] = {
ServiceLoader.load(classOf[AppHistoryServerPlugin], Utils.getContextOrSparkClassLoader).asScala
}

/** For testing. Returns internal data about a single attempt. */
Expand Down
Loading