diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index fbe39b27649f..553bf3cb945a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -25,7 +25,8 @@ private[spark] case class ApplicationHistoryInfo( startTime: Long, endTime: Long, lastUpdated: Long, - sparkUser: String) + sparkUser: String, + completed: Boolean = false) private[spark] abstract class ApplicationHistoryProvider { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 792d15b99ea0..bd8e5492c6eb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -173,20 +173,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis val logInfos = statusList .filter { entry => try { - val isFinishedApplication = - if (isLegacyLogDirectory(entry)) { - fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE)) - } else { - !entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS) - } - - if (isFinishedApplication) { - val modTime = getModificationTime(entry) - newLastModifiedTime = math.max(newLastModifiedTime, modTime) - modTime >= lastModifiedTime - } else { - false - } + val modTime = getModificationTime(entry) + newLastModifiedTime = math.max(newLastModifiedTime, modTime) + modTime >= lastModifiedTime } catch { case e: AccessControlException => // Do not use "logInfo" since these messages can get pretty noisy if printed on @@ -204,7 +193,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis None } } - .sortBy { info => -info.endTime } + .sortBy { info => (-info.endTime, -info.startTime) } lastModifiedTime = newLastModifiedTime @@ -261,7 +250,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis appListener.startTime.getOrElse(-1L), appListener.endTime.getOrElse(-1L), getModificationTime(eventLog), - appListener.sparkUser.getOrElse(NOT_STARTED)) + appListener.sparkUser.getOrElse(NOT_STARTED), + isApplicationCompleted(eventLog)) } finally { logInput.close() } @@ -329,6 +319,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis /** Returns the system's mononotically increasing time. */ private def getMonotonicTimeMs(): Long = System.nanoTime() / (1000 * 1000) + /** + * Return true when the application already completed. + */ + private def isApplicationCompleted(entry: FileStatus): Boolean = { + if (isLegacyLogDirectory(entry)) { + fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE)) + } else { + !entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS) + } + } + } private object FsHistoryProvider { @@ -342,5 +343,6 @@ private class FsApplicationHistoryInfo( startTime: Long, endTime: Long, lastUpdated: Long, - sparkUser: String) - extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser) + sparkUser: String, + completed: Boolean = true) + extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser, completed) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index 0d5dcfb1ddff..e4e7bc221601 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -31,8 +31,10 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { def render(request: HttpServletRequest): Seq[Node] = { val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt val requestedFirst = (requestedPage - 1) * pageSize + val requestedIncomplete = + Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean - val allApps = parent.getApplicationList() + val allApps = parent.getApplicationList().filter(_.completed != requestedIncomplete) val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0 val apps = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allApps.size)) @@ -65,25 +67,26 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {