diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 481f6c93c6a8d..9a3db07aa6394 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.history -import java.io.FileNotFoundException +import java.io.{IOException, FileNotFoundException} import scala.collection.mutable @@ -38,6 +38,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval", conf.getInt("spark.history.updateInterval", 10)) * 1000 + // Interval between each clean for event logs + private val CLEAN_INTERVAL_MS = conf.getLong("spark.history.fs.cleanInterval", 86400) * 1000 + private val logDir = conf.get("spark.history.fs.logDirectory", null) private val resolvedLogDir = Option(logDir) .map { d => Utils.resolveURI(d) } @@ -54,6 +57,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis // is already known. private var lastModifiedTime = -1L + // A timestamp of when the disk was last clean for logs + private var lastLogCleanTimeMs = -1L + // Mapping of application IDs to their metadata, in descending end time order. Apps are inserted // into the map in order, so the LinkedHashMap maintains the correct ordering. @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo] @@ -83,6 +89,25 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } } + /** + * A background thread that periodically clean event log on disk. + * + * TODO: Add a mechanism to delete manually. + */ + private val logCleaningThread = new Thread("LogCleaningThread") { + override def run() = Utils.logUncaughtExceptions { + while (true) { + val now = System.currentTimeMillis() + if (now - lastLogCleanTimeMs > CLEAN_INTERVAL_MS) { + Thread.sleep(CLEAN_INTERVAL_MS) + } else { + Thread.sleep(lastLogCleanTimeMs + CLEAN_INTERVAL_MS - now) + } + cleanLogs() + } + } + } + initialize() private def initialize() { @@ -100,6 +125,12 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis checkForLogs() logCheckingThread.setDaemon(true) logCheckingThread.start() + + if(conf.getBoolean("spark.history.cleaner.enable", false)) { + cleanLogs() + logCleaningThread.setDaemon(true) + logCleaningThread.start() + } } override def getListing() = applications.values @@ -214,6 +245,27 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } } + /** + * Deleting apps if setting cleaner. + */ + private def cleanLogs() = { + lastLogCleanTimeMs = System.currentTimeMillis() + logDebug("Cleaning logs. Time is now %d.".format(lastLogCleanTimeMs)) + try { + val logStatus = fs.listStatus(new Path(resolvedLogDir)) + val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]() + val maxAge = conf.getLong("spark.history.fs.maxAge", 604800) * 1000 + logDirs.foreach{ + case dir: FileStatus => + if(System.currentTimeMillis() - getModificationTime(dir) > maxAge) { + fs.delete(dir.getPath, true) + } + } + } catch { + case t: IOException => logError("Exception in cleaning logs", t) + } + } + private def createReplayBus(logDir: FileStatus): (ReplayListenerBus, ApplicationEventListener) = { val path = logDir.getPath() val elogInfo = EventLoggingListener.parseLoggingInfo(path, fs)