Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy.history

import java.io.FileNotFoundException
import java.io.{IOException, FileNotFoundException}

import scala.collection.mutable

Expand All @@ -38,6 +38,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval",
conf.getInt("spark.history.updateInterval", 10)) * 1000

// Interval between each clean for event logs
private val CLEAN_INTERVAL_MS = conf.getLong("spark.history.fs.cleanInterval", 86400) * 1000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is 86400? Could you use Duration instead so that it's clearer what the default value means?


private val logDir = conf.get("spark.history.fs.logDirectory", null)
private val resolvedLogDir = Option(logDir)
.map { d => Utils.resolveURI(d) }
Expand All @@ -54,6 +57,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
// is already known.
private var lastModifiedTime = -1L

// A timestamp of when the disk was last clean for logs
private var lastLogCleanTimeMs = -1L

// Mapping of application IDs to their metadata, in descending end time order. Apps are inserted
// into the map in order, so the LinkedHashMap maintains the correct ordering.
@volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
Expand Down Expand Up @@ -83,6 +89,25 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
}
}

/**
* A background thread that periodically clean event log on disk.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: cleans, logs

*
* TODO: Add a mechanism to delete manually.
*/
private val logCleaningThread = new Thread("LogCleaningThread") {
override def run() = Utils.logUncaughtExceptions {
while (true) {
val now = System.currentTimeMillis()
if (now - lastLogCleanTimeMs > CLEAN_INTERVAL_MS) {
Thread.sleep(CLEAN_INTERVAL_MS)
} else {
Thread.sleep(lastLogCleanTimeMs + CLEAN_INTERVAL_MS - now)
}
cleanLogs()
}
}
}

initialize()

private def initialize() {
Expand All @@ -100,6 +125,12 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
checkForLogs()
logCheckingThread.setDaemon(true)
logCheckingThread.start()

if(conf.getBoolean("spark.history.cleaner.enable", false)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if (

Also, conf option should start with spark.history.fs..

cleanLogs()
logCleaningThread.setDaemon(true)
logCleaningThread.start()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty sure you don't want to clean logs on initialization. Otherwise your history server will always display nothing!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes,you are right. it is no need to clean it on initialization. And i can not change it, because of unknown repository. And i will submit other commit.

}

override def getListing() = applications.values
Expand Down Expand Up @@ -214,6 +245,27 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
}
}

/**
* Deleting apps if setting cleaner.
*/
private def cleanLogs() = {
lastLogCleanTimeMs = System.currentTimeMillis()
logDebug("Cleaning logs. Time is now %d.".format(lastLogCleanTimeMs))
try {
val logStatus = fs.listStatus(new Path(resolvedLogDir))
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
val maxAge = conf.getLong("spark.history.fs.maxAge", 604800) * 1000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, could you use a Duration instead of 604800?

logDirs.foreach{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: foreach {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, usually we use this style for this kind of code:

logDirs.foreach { dir =>
  blah
}

case dir: FileStatus =>
if(System.currentTimeMillis() - getModificationTime(dir) > maxAge) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if (

fs.delete(dir.getPath, true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be better to try each delete separately; if someone writes a directory with wrong permissions to the log dir, they may cause the cleaner thread to never clean up anything.

(That would be better if there were a more descriptive exception for those errors, but I think all FileSystem will throw are IOExceptions and InterruptedExceptions.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FileSystem.listStatus() will throw FileNotFoundException and IOException; FileSystem.delete will throw IOException. And I will change it.

}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this line, and the 2 following it, should be indented 2 less spaces.

} catch {
case t: IOException => logError("Exception in cleaning logs", t)
}
}

private def createReplayBus(logDir: FileStatus): (ReplayListenerBus, ApplicationEventListener) = {
val path = logDir.getPath()
val elogInfo = EventLoggingListener.parseLoggingInfo(path, fs)
Expand Down