From adcfe869863cd5f73b06143d41f138ea3b8d145f Mon Sep 17 00:00:00 2001 From: xukun 00228947 Date: Tue, 27 Jan 2015 09:30:41 +0800 Subject: [PATCH 1/7] Periodic cleanup event logs --- .../deploy/history/FsHistoryProvider.scala | 117 +++++++++++++----- docs/monitoring.md | 25 +++- 2 files changed, 107 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 0ae45f4ad913..ee6709fa11ae 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -17,9 +17,13 @@ package org.apache.spark.deploy.history -import java.io.{BufferedInputStream, FileNotFoundException, InputStream} +import java.io.{IOException, BufferedInputStream, FileNotFoundException, InputStream} +import java.util.concurrent.{Executors, TimeUnit} import scala.collection.mutable +import scala.concurrent.duration.Duration + +import com.google.common.util.concurrent.ThreadFactoryBuilder import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.fs.permission.AccessControlException @@ -43,9 +47,31 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private val NOT_STARTED = "" + // One day + private val DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S = Duration(1, TimeUnit.DAYS).toSeconds + + // One week + private val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = Duration(7, TimeUnit.DAYS).toSeconds + + private def warnUpdateInterval(key: String, value: String): String = { + logWarning(s"Using $key to set interval " + + "between each check for event log updates is deprecated, " + + "please use spark.history.fs.update.interval.seconds instead.") + value + } + // Interval between each check for event log updates - private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval", - conf.getInt("spark.history.updateInterval", 10)) * 1000 + private val UPDATE_INTERVAL_MS = conf.getOption("spark.history.fs.update.interval.seconds") + .orElse(conf.getOption("spark.history.fs.updateInterval") + .map(warnUpdateInterval("spark.history.fs.updateInterval", _))) + .orElse(conf.getOption("spark.history.updateInterval") + .map(warnUpdateInterval("spark.history.updateInterval", _))) + .map(_.toInt) + .getOrElse(10) * 1000 + + // Interval between each cleaner checks for event logs to delete + private val CLEAN_INTERVAL_MS = conf.getLong("spark.history.fs.cleaner.interval.seconds", + DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S) * 1000 private val logDir = conf.getOption("spark.history.fs.logDirectory") .map { d => Utils.resolveURI(d).toString } @@ -53,8 +79,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private val fs = Utils.getHadoopFileSystem(logDir, SparkHadoopUtil.get.newConfiguration(conf)) - // A timestamp of when the disk was last accessed to check for log updates - private var lastLogCheckTimeMs = -1L + // The schedule thread pool size must be one, otherwise it will have concurrent issues about fs + // and applications between check task and clean task.. + private val pool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() + .setNameFormat("spark-history-task-%d").setDaemon(true).build()) // The modification time of the newest log detected during the last scan. This is used // to ignore logs that are older during subsequent scans, to avoid processing data that @@ -73,27 +101,15 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE" /** - * A background thread that periodically checks for event log updates on disk. - * - * If a log check is invoked manually in the middle of a period, this thread re-adjusts the - * time at which it performs the next log check to maintain the same period as before. - * - * TODO: Add a mechanism to update manually. + * A background thread that periodically do something about event log. */ - private val logCheckingThread = new Thread("LogCheckingThread") { - override def run() = Utils.logUncaughtExceptions { - while (true) { - val now = getMonotonicTimeMs() - if (now - lastLogCheckTimeMs > UPDATE_INTERVAL_MS) { - Thread.sleep(UPDATE_INTERVAL_MS) - } else { - // If the user has manually checked for logs recently, wait until - // UPDATE_INTERVAL_MS after the last check time - Thread.sleep(lastLogCheckTimeMs + UPDATE_INTERVAL_MS - now) - } - checkForLogs() + private def getRunner(operateFun: () => Unit): Runnable = { + val runnable = new Runnable() { + override def run() = Utils.logUncaughtExceptions { + operateFun() } } + runnable } initialize() @@ -113,12 +129,12 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis "Logging directory specified is not a directory: %s".format(logDir)) } - checkForLogs() + // A task that periodically checks for event log updates on disk. + pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_MS, TimeUnit.MILLISECONDS) - // Disable the background thread during tests. - if (!conf.contains("spark.testing")) { - logCheckingThread.setDaemon(true) - logCheckingThread.start() + if (conf.getBoolean("spark.history.fs.cleaner.enable", false)) { + // A task that periodically cleans event logs on disk. + pool.scheduleAtFixedRate(getRunner(cleanLogs), 0, CLEAN_INTERVAL_MS, TimeUnit.MILLISECONDS) } } @@ -163,9 +179,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis * applications that haven't been updated since last time the logs were checked. */ private[history] def checkForLogs(): Unit = { - lastLogCheckTimeMs = getMonotonicTimeMs() - logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs)) - try { var newLastModifiedTime = lastModifiedTime val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) @@ -229,6 +242,45 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } } + /** + * Delete event logs from the log directory according to the clean policy defined by the user. + */ + private def cleanLogs() = { + try { + val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) + .getOrElse(Seq[FileStatus]()) + val maxAge = conf.getLong("spark.history.fs.cleaner.maxAge.seconds", + DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000 + + val now = System.currentTimeMillis() + val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() + def addIfNotExpire(info: FsApplicationHistoryInfo) = { + if (now - info.lastUpdated <= maxAge) { + newApps += (info.id -> info) + } + } + + val oldIterator = applications.values.iterator.buffered + oldIterator.foreach(addIfNotExpire) + + applications = newApps + + // Scan all logs from the log directory. + // Only directories older than now maxAge milliseconds mill will be deleted + statusList.foreach { dir => + try { + if (now - getModificationTime(dir) > maxAge) { + fs.delete(dir.getPath, true) + } + } catch { + case t: IOException => logError(s"IOException in cleaning logs of $dir", t) + } + } + } catch { + case t: Exception => logError("Exception in cleaning logs", t) + } + } + /** * Replays the events in the specified log file and returns information about the associated * application. @@ -318,9 +370,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } } - /** Returns the system's mononotically increasing time. */ - private def getMonotonicTimeMs(): Long = System.nanoTime() / (1000 * 1000) - /** * Return true when the application has completed. */ diff --git a/docs/monitoring.md b/docs/monitoring.md index f32cdef240d3..42bf49b06469 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -85,7 +85,7 @@ follows: - spark.history.fs.updateInterval + spark.history.fs.interval.seconds 10 The period, in seconds, at which information displayed by this history server is updated. @@ -144,6 +144,29 @@ follows: If disabled, no access control checks are made. + + spark.history.fs.cleaner.enable + false + + Specifies whether the History Server should periodically clean up event logs from storage. + + + + spark.history.fs.cleaner.interval.seconds + 86400 + + How often the job history cleaner checks for files to delete, in seconds. Defaults to 864000 (one day). + Files are only deleted if they are older than spark.history.fs.cleaner.maxAge.seconds. + + + + spark.history.fs.cleaner.maxAge.seconds + 604800 + + Job history files older than this many seconds will be deleted when the history cleaner runs. + Defaults to 604800 (1 week). + + Note that in all of these UIs, the tables are sortable by clicking their headers, From 70c28d671ee60bd04554b6709956df54aab8b64a Mon Sep 17 00:00:00 2001 From: xukun 00228947 Date: Thu, 29 Jan 2015 16:03:43 +0800 Subject: [PATCH 2/7] fix issues --- .../deploy/history/FsHistoryProvider.scala | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index ee6709fa11ae..f25406edf3ca 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -60,12 +60,14 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis value } + private def getDeprecatedConfig(conf: SparkConf, key: String): Option[String] = { + conf.getOption(key).map(warnUpdateInterval(key, _)) + } + // Interval between each check for event log updates private val UPDATE_INTERVAL_MS = conf.getOption("spark.history.fs.update.interval.seconds") - .orElse(conf.getOption("spark.history.fs.updateInterval") - .map(warnUpdateInterval("spark.history.fs.updateInterval", _))) - .orElse(conf.getOption("spark.history.updateInterval") - .map(warnUpdateInterval("spark.history.updateInterval", _))) + .orElse(getDeprecatedConfig(conf, "spark.history.fs.updateInterval")) + .orElse(getDeprecatedConfig(conf, "spark.history.updateInterval")) .map(_.toInt) .getOrElse(10) * 1000 @@ -79,8 +81,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private val fs = Utils.getHadoopFileSystem(logDir, SparkHadoopUtil.get.newConfiguration(conf)) - // The schedule thread pool size must be one, otherwise it will have concurrent issues about fs - // and applications between check task and clean task.. + // Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs + // and applications between check task and clean task. private val pool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() .setNameFormat("spark-history-task-%d").setDaemon(true).build()) @@ -129,12 +131,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis "Logging directory specified is not a directory: %s".format(logDir)) } - // A task that periodically checks for event log updates on disk. - pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_MS, TimeUnit.MILLISECONDS) + // Disable the background thread during tests. + if (!conf.contains("spark.testing")) { + // A task that periodically checks for event log updates on disk. + pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_MS, + TimeUnit.MILLISECONDS) - if (conf.getBoolean("spark.history.fs.cleaner.enable", false)) { - // A task that periodically cleans event logs on disk. - pool.scheduleAtFixedRate(getRunner(cleanLogs), 0, CLEAN_INTERVAL_MS, TimeUnit.MILLISECONDS) + if (conf.getBoolean("spark.history.fs.cleaner.enable", false)) { + // A task that periodically cleans event logs on disk. + pool.scheduleAtFixedRate(getRunner(cleanLogs), 0, CLEAN_INTERVAL_MS, + TimeUnit.MILLISECONDS) + } } } From 71782b5d506681ff51fd73550800e79e690c681c Mon Sep 17 00:00:00 2001 From: xukun 00228947 Date: Wed, 25 Feb 2015 16:41:00 +0800 Subject: [PATCH 3/7] fix issue --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 90b6b37de4e0..7f2f16c6c466 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -277,7 +277,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis // Only directories older than now maxAge milliseconds mill will be deleted statusList.foreach { dir => try { - if (now - getModificationTime(dir) > maxAge) { + if (now - dir.getModificationTime() > maxAge) { fs.delete(dir.getPath, true) } } catch { From 373f3b942e088b53df6a3bf4547a27a0d4001621 Mon Sep 17 00:00:00 2001 From: xukun 00228947 Date: Wed, 25 Feb 2015 17:36:01 +0800 Subject: [PATCH 4/7] fix issue --- .../scala/org/apache/spark/SparkConf.scala | 6 +- .../deploy/history/FsHistoryProvider.scala | 56 ++++++++----------- docs/monitoring.md | 10 ++-- 3 files changed, 33 insertions(+), 39 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 0dbd26146cb1..249dfc04077a 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -362,7 +362,11 @@ private[spark] object SparkConf extends Logging { DeprecatedConfig("spark.files.userClassPathFirst", "spark.executor.userClassPathFirst", "1.3"), DeprecatedConfig("spark.yarn.user.classpath.first", null, "1.3", - "Use spark.{driver,executor}.userClassPathFirst instead.")) + "Use spark.{driver,executor}.userClassPathFirst instead."), + DeprecatedConfig("spark.history.fs.updateInterval", "spark.history.fs.update.interval.seconds", + "1.3", "Use spark.history.fs.update.interval.seconds instead"), + DeprecatedConfig("spark.history.updateInterval", "spark.history.fs.update.interval.seconds", + "1.3", "Use spark.history.fs.update.interval.seconds instead")) configs.map { x => (x.oldName, x) }.toMap } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 7f2f16c6c466..bf7b9f97c00a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -47,27 +47,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private val NOT_STARTED = "" - // One day - private val DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S = Duration(1, TimeUnit.DAYS).toSeconds - - // One week - private val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = Duration(7, TimeUnit.DAYS).toSeconds - - private def warnUpdateInterval(key: String, value: String): String = { - logWarning(s"Using $key to set interval " + - "between each check for event log updates is deprecated, " + - "please use spark.history.fs.update.interval.seconds instead.") - value - } - - private def getDeprecatedConfig(conf: SparkConf, key: String): Option[String] = { - conf.getOption(key).map(warnUpdateInterval(key, _)) - } - // Interval between each check for event log updates private val UPDATE_INTERVAL_MS = conf.getOption("spark.history.fs.update.interval.seconds") - .orElse(getDeprecatedConfig(conf, "spark.history.fs.updateInterval")) - .orElse(getDeprecatedConfig(conf, "spark.history.updateInterval")) + .orElse(Some(SparkConf.translateConfKey("spark.history.fs.updateInterval", true))) + .orElse(Some(SparkConf.translateConfKey("spark.history.updateInterval", true))) .map(_.toInt) .getOrElse(10) * 1000 @@ -81,6 +64,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private val fs = Utils.getHadoopFileSystem(logDir, SparkHadoopUtil.get.newConfiguration(conf)) + // Used by check event thread and clean log thread. // Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs // and applications between check task and clean task. private val pool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() @@ -103,15 +87,15 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE" /** - * A background thread that periodically do something about event log. + * Return a runnable that performs the given operation on the event logs. + * This operation is expected to be executed periodically. */ private def getRunner(operateFun: () => Unit): Runnable = { - val runnable = new Runnable() { + new Runnable() { override def run() = Utils.logUncaughtExceptions { operateFun() } } - runnable } initialize() @@ -137,7 +121,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_MS, TimeUnit.MILLISECONDS) - if (conf.getBoolean("spark.history.fs.cleaner.enable", false)) { + if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) { // A task that periodically cleans event logs on disk. pool.scheduleAtFixedRate(getRunner(cleanLogs), 0, CLEAN_INTERVAL_MS, TimeUnit.MILLISECONDS) @@ -253,7 +237,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis /** * Delete event logs from the log directory according to the clean policy defined by the user. */ - private def cleanLogs() = { + private def cleanLogs(): Unit = { try { val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) .getOrElse(Seq[FileStatus]()) @@ -261,27 +245,27 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000 val now = System.currentTimeMillis() - val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() - def addIfNotExpire(info: FsApplicationHistoryInfo) = { + val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() + + applications.values.foreach { if (now - info.lastUpdated <= maxAge) { - newApps += (info.id -> info) + appsToRetain += (info.id -> info) } } - val oldIterator = applications.values.iterator.buffered - oldIterator.foreach(addIfNotExpire) - - applications = newApps + applications = appsToRetain // Scan all logs from the log directory. - // Only directories older than now maxAge milliseconds mill will be deleted + // Only directories older than the specified max age will be deleted statusList.foreach { dir => try { if (now - dir.getModificationTime() > maxAge) { + // if path is a directory and set to true, + // the directory is deleted else throws an exception fs.delete(dir.getPath, true) } } catch { - case t: IOException => logError(s"IOException in cleaning logs of $dir", t) + case t: IOException => logError(s"IOException in cleaning logs of $dir", t) } } } catch { @@ -410,6 +394,12 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private object FsHistoryProvider { val DEFAULT_LOG_DIR = "file:/tmp/spark-events" + + // One day + val DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S = Duration(1, TimeUnit.DAYS).toSeconds + + // One week + val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = Duration(7, TimeUnit.DAYS).toSeconds } private class FsApplicationHistoryInfo( diff --git a/docs/monitoring.md b/docs/monitoring.md index 35a30a826466..37ede476c187 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -86,7 +86,7 @@ follows: - spark.history.fs.interval.seconds + spark.history.fs.update.interval.seconds 10 The period, in seconds, at which information displayed by this history server is updated. @@ -146,7 +146,7 @@ follows: - spark.history.fs.cleaner.enable + spark.history.fs.cleaner.enabled false Specifies whether the History Server should periodically clean up event logs from storage. @@ -156,16 +156,16 @@ follows: spark.history.fs.cleaner.interval.seconds 86400 - How often the job history cleaner checks for files to delete, in seconds. Defaults to 864000 (one day). + How often the job history cleaner checks for files to delete, in seconds. Defaults to 86400 (one day). Files are only deleted if they are older than spark.history.fs.cleaner.maxAge.seconds. spark.history.fs.cleaner.maxAge.seconds - 604800 + 3600 * 24 * 7 Job history files older than this many seconds will be deleted when the history cleaner runs. - Defaults to 604800 (1 week). + Defaults to 3600 * 24 * 7 (1 week). From 6e3d06b0f282585f45eb72fa3b03a09ffe343729 Mon Sep 17 00:00:00 2001 From: xukun 00228947 Date: Wed, 25 Feb 2015 17:40:30 +0800 Subject: [PATCH 5/7] fix issue --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index bf7b9f97c00a..cbec1a849640 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -247,7 +247,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis val now = System.currentTimeMillis() val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() - applications.values.foreach { + applications.values.foreach { info => if (now - info.lastUpdated <= maxAge) { appsToRetain += (info.id -> info) } From 31674ee308689c29da96c0325adb2774bf72c353 Mon Sep 17 00:00:00 2001 From: xukun 00228947 Date: Thu, 26 Feb 2015 09:02:25 +0800 Subject: [PATCH 6/7] fix issue --- core/src/main/scala/org/apache/spark/SparkConf.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 249dfc04077a..0f4922ab4e31 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -363,9 +363,11 @@ private[spark] object SparkConf extends Logging { "1.3"), DeprecatedConfig("spark.yarn.user.classpath.first", null, "1.3", "Use spark.{driver,executor}.userClassPathFirst instead."), - DeprecatedConfig("spark.history.fs.updateInterval", "spark.history.fs.update.interval.seconds", + DeprecatedConfig("spark.history.fs.updateInterval", + "spark.history.fs.update.interval.seconds", "1.3", "Use spark.history.fs.update.interval.seconds instead"), - DeprecatedConfig("spark.history.updateInterval", "spark.history.fs.update.interval.seconds", + DeprecatedConfig("spark.history.updateInterval", + "spark.history.fs.update.interval.seconds", "1.3", "Use spark.history.fs.update.interval.seconds instead")) configs.map { x => (x.oldName, x) }.toMap } From 7a5b9c5c5337bd6a445b5a6551bb5bff6201f066 Mon Sep 17 00:00:00 2001 From: xukun 00228947 Date: Thu, 26 Feb 2015 11:52:41 +0800 Subject: [PATCH 7/7] fix issue --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index cbec1a849640..1aaa7b72735a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -49,8 +49,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis // Interval between each check for event log updates private val UPDATE_INTERVAL_MS = conf.getOption("spark.history.fs.update.interval.seconds") - .orElse(Some(SparkConf.translateConfKey("spark.history.fs.updateInterval", true))) - .orElse(Some(SparkConf.translateConfKey("spark.history.updateInterval", true))) + .orElse(conf.getOption(SparkConf.translateConfKey("spark.history.fs.updateInterval", true))) + .orElse(conf.getOption(SparkConf.translateConfKey("spark.history.updateInterval", true))) .map(_.toInt) .getOrElse(10) * 1000