From bdb5955b7e0c9a1eeee886021567d89ec876da9e Mon Sep 17 00:00:00 2001 From: Rohit Agarwal Date: Wed, 12 Aug 2015 17:20:53 -0700 Subject: [PATCH 1/4] [SPARK-9924] [WEB UI] Don't schedule checkForLogs while some of them are already running. --- .../spark/deploy/history/FsHistoryProvider.scala | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 53c18ca3ff50c..a025e12287f45 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.history import java.io.{BufferedInputStream, FileNotFoundException, InputStream, IOException, OutputStream} -import java.util.concurrent.{ExecutorService, Executors, TimeUnit} +import java.util.concurrent.{Future, ExecutorService, Executors, TimeUnit} import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.mutable @@ -126,11 +126,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // Disable the background thread during tests. if (!conf.contains("spark.testing")) { // A task that periodically checks for event log updates on disk. - pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS) + pool.scheduleWithFixedDelay(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS) if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) { // A task that periodically cleans event logs on disk. - pool.scheduleAtFixedRate(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS) + pool.scheduleWithFixedDelay(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS) } } } @@ -204,13 +204,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) mod1 >= mod2 } + val tasks = mutable.ListBuffer.empty[Future[_]] logInfos.sliding(20, 20).foreach { batch => - replayExecutor.submit(new Runnable { + tasks += replayExecutor.submit(new Runnable { override def run(): Unit = mergeApplicationListing(batch) }) } lastModifiedTime = newLastModifiedTime + + for (task <- tasks) { + // Wait for all tasks to finish. This makes sure that checkForLogs is + // not scheduled again while some tasks are already running in the + // replayExecutor. + task.get() + } } catch { case e: Exception => logError("Exception in checking for event log updates", e) } From 249f4ef3b6fdc9f81ccd35660e1d05a0b5c9083e Mon Sep 17 00:00:00 2001 From: Rohit Agarwal Date: Thu, 13 Aug 2015 10:26:45 -0700 Subject: [PATCH 2/4] [SPARK-9924] [WEB UI] Avoid mutability. Use grouped(x) instead of sliding(x, x). --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index a025e12287f45..dfc0eb9aaf3a6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -204,9 +204,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) mod1 >= mod2 } - val tasks = mutable.ListBuffer.empty[Future[_]] - logInfos.sliding(20, 20).foreach { batch => - tasks += replayExecutor.submit(new Runnable { + val tasks: Iterator[Future[_]] = logInfos.grouped(20).map { batch => + replayExecutor.submit(new Runnable { override def run(): Unit = mergeApplicationListing(batch) }) } From 3e22b6c423c632e67deac34ae10174ef36db5acd Mon Sep 17 00:00:00 2001 From: Rohit Agarwal Date: Fri, 14 Aug 2015 01:40:02 -0700 Subject: [PATCH 3/4] [SPARK-9924] [WEB UI] Catch exceptions to avoid reverting to the old behavior of piling up executions. --- .../deploy/history/FsHistoryProvider.scala | 31 ++++++++++++------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index dfc0eb9aaf3a6..3b178dd492f85 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -204,20 +204,27 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) mod1 >= mod2 } - val tasks: Iterator[Future[_]] = logInfos.grouped(20).map { batch => - replayExecutor.submit(new Runnable { - override def run(): Unit = mergeApplicationListing(batch) - }) - } + logInfos.grouped(20) + .map { batch => + replayExecutor.submit(new Runnable { + override def run(): Unit = mergeApplicationListing(batch) + }) + } + .foreach { task => + try { + // Wait for all tasks to finish. This makes sure that checkForLogs + // is not scheduled again while some tasks are already running in + // the replayExecutor. + task.get() + } catch { + case e: InterruptedException => + throw e + case e: Exception => + logError("Exception while merging application listings", e) + } + } lastModifiedTime = newLastModifiedTime - - for (task <- tasks) { - // Wait for all tasks to finish. This makes sure that checkForLogs is - // not scheduled again while some tasks are already running in the - // replayExecutor. - task.get() - } } catch { case e: Exception => logError("Exception in checking for event log updates", e) } From cd1ef90cd34b27d7c2cd98b54bceeb5936ce60e9 Mon Sep 17 00:00:00 2001 From: Rohit Agarwal Date: Fri, 14 Aug 2015 02:01:01 -0700 Subject: [PATCH 4/4] [SPARK-9924] [WEB UI] Remove unneeded import. --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 3b178dd492f85..e573ff16c50a3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.history import java.io.{BufferedInputStream, FileNotFoundException, InputStream, IOException, OutputStream} -import java.util.concurrent.{Future, ExecutorService, Executors, TimeUnit} +import java.util.concurrent.{ExecutorService, Executors, TimeUnit} import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.mutable