Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
None
}
}
.sortBy { info => (-info.endTime, -info.startTime) }
.sortWith(compareAppInfo)

lastModifiedTime = newLastModifiedTime

Expand All @@ -214,7 +214,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
val newIterator = logInfos.iterator.buffered
val oldIterator = applications.values.iterator.buffered
while (newIterator.hasNext && oldIterator.hasNext) {
if (newIterator.head.endTime > oldIterator.head.endTime) {
if (compareAppInfo(newIterator.head, oldIterator.head)) {
addIfAbsent(newIterator.next)
} else {
addIfAbsent(oldIterator.next)
Expand All @@ -230,6 +230,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
}
}

/**
* Comparison function that defines the sort order for the application listing.
*
* @return Whether `i1` should precede `i2`.
*/
private def compareAppInfo(
i1: FsApplicationHistoryInfo,
i2: FsApplicationHistoryInfo): Boolean = {
if (i1.endTime != i2.endTime) i1.endTime >= i2.endTime else i1.startTime >= i2.startTime
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't imagine it probably matters, but this should be > instead of >= I think? or else this says that two equal intervals should both precede the other. I bet it doesn't make the sort wrong, just might affect whether the sort is stable, but who cares here.

}

/**
* Replays the events in the specified log file and returns information about the associated
* application.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,54 +37,50 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers

private var testDir: File = null

private var provider: FsHistoryProvider = null

before {
testDir = Utils.createTempDir()
provider = new FsHistoryProvider(new SparkConf()
.set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
.set("spark.history.fs.updateInterval", "0"))
}

after {
Utils.deleteRecursively(testDir)
}

test("Parse new and old application logs") {
val conf = new SparkConf()
.set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
.set("spark.history.fs.updateInterval", "0")
val provider = new FsHistoryProvider(conf)
val provider = new FsHistoryProvider(createTestConf())

// Write a new-style application log.
val logFile1 = new File(testDir, "new1")
writeFile(logFile1, true, None,
SparkListenerApplicationStart("app1-1", None, 1L, "test"),
SparkListenerApplicationEnd(2L)
val newAppComplete = new File(testDir, "new1")
writeFile(newAppComplete, true, None,
SparkListenerApplicationStart("new-app-complete", None, 1L, "test"),
SparkListenerApplicationEnd(4L)
)

// Write an unfinished app, new-style.
val logFile2 = new File(testDir, "new2" + EventLoggingListener.IN_PROGRESS)
writeFile(logFile2, true, None,
SparkListenerApplicationStart("app2-2", None, 1L, "test")
val newAppIncomplete = new File(testDir, "new2" + EventLoggingListener.IN_PROGRESS)
writeFile(newAppIncomplete, true, None,
SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test")
)

// Write an old-style application log.
val oldLog = new File(testDir, "old1")
oldLog.mkdir()
createEmptyFile(new File(oldLog, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldLog, provider.LOG_PREFIX + "1"), false, None,
SparkListenerApplicationStart("app3", None, 2L, "test"),
val oldAppComplete = new File(testDir, "old1")
oldAppComplete.mkdir()
createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None,
SparkListenerApplicationStart("old-app-complete", None, 2L, "test"),
SparkListenerApplicationEnd(3L)
)
createEmptyFile(new File(oldLog, provider.APPLICATION_COMPLETE))
createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE))

// Check for logs so that we force the older unfinished app to be loaded, to make
// sure unfinished apps are also sorted correctly.
provider.checkForLogs()

// Write an unfinished app, old-style.
val oldLog2 = new File(testDir, "old2")
oldLog2.mkdir()
createEmptyFile(new File(oldLog2, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldLog2, provider.LOG_PREFIX + "1"), false, None,
SparkListenerApplicationStart("app4", None, 2L, "test")
val oldAppIncomplete = new File(testDir, "old2")
oldAppIncomplete.mkdir()
createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None,
SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test")
)

// Force a reload of data from the log directory, and check that both logs are loaded.
Expand All @@ -96,14 +92,14 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
list.size should be (4)
list.count(e => e.completed) should be (2)

list(0) should be (ApplicationHistoryInfo(oldLog.getName(), "app3", 2L, 3L,
oldLog.lastModified(), "test", true))
list(1) should be (ApplicationHistoryInfo(logFile1.getName(), "app1-1", 1L, 2L,
logFile1.lastModified(), "test", true))
list(2) should be (ApplicationHistoryInfo(oldLog2.getName(), "app4", 2L, -1L,
oldLog2.lastModified(), "test", false))
list(3) should be (ApplicationHistoryInfo(logFile2.getName(), "app2-2", 1L, -1L,
logFile2.lastModified(), "test", false))
list(0) should be (ApplicationHistoryInfo(newAppComplete.getName(), "new-app-complete", 1L, 4L,
newAppComplete.lastModified(), "test", true))
list(1) should be (ApplicationHistoryInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
oldAppComplete.lastModified(), "test", true))
list(2) should be (ApplicationHistoryInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L,
-1L, oldAppIncomplete.lastModified(), "test", false))
list(3) should be (ApplicationHistoryInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L,
-1L, newAppIncomplete.lastModified(), "test", false))

// Make sure the UI can be rendered.
list.foreach { case info =>
Expand All @@ -113,6 +109,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
}

test("Parse legacy logs with compression codec set") {
val provider = new FsHistoryProvider(createTestConf())
val testCodecs = List((classOf[LZFCompressionCodec].getName(), true),
(classOf[SnappyCompressionCodec].getName(), true),
("invalid.codec", false))
Expand Down Expand Up @@ -156,10 +153,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
)
logFile2.setReadable(false, false)

val conf = new SparkConf()
.set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
.set("spark.history.fs.updateInterval", "0")
val provider = new FsHistoryProvider(conf)
val provider = new FsHistoryProvider(createTestConf())
provider.checkForLogs()

val list = provider.getListing().toSeq
Expand All @@ -168,10 +162,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
}

test("history file is renamed from inprogress to completed") {
val conf = new SparkConf()
.set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
.set("spark.testing", "true")
val provider = new FsHistoryProvider(conf)
val provider = new FsHistoryProvider(createTestConf())

val logFile1 = new File(testDir, "app1" + EventLoggingListener.IN_PROGRESS)
writeFile(logFile1, true, None,
Expand All @@ -191,9 +182,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
}

test("SPARK-5582: empty log directory") {
val conf = new SparkConf()
.set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
val provider = new FsHistoryProvider(conf)
val provider = new FsHistoryProvider(createTestConf())

val logFile1 = new File(testDir, "app1" + EventLoggingListener.IN_PROGRESS)
writeFile(logFile1, true, None,
Expand Down Expand Up @@ -229,4 +218,8 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
new FileOutputStream(file).close()
}

private def createTestConf(): SparkConf = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could/should this be method be reused in more places now that your other change to this test is merged?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'll merge with master and update the PR.

new SparkConf().set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
}

}