Skip to content

Commit caeac28

Browse files
author
Eric Vandenberg
committed
[SPARK-21447][WEB UI] Spark history server fails to render compressed
inprogress history file in some cases. Add failure handling for EOFException that can be thrown during decompression of an inprogress spark history file, treat same as case where can't parse the last line.
1 parent d3f4a21 commit caeac28

File tree

2 files changed

+86
-3
lines changed

2 files changed

+86
-3
lines changed

core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.scheduler
1919

20-
import java.io.{InputStream, IOException}
20+
import java.io.{EOFException, InputStream, IOException}
2121

2222
import scala.io.Source
2323

@@ -107,6 +107,16 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
107107
}
108108
}
109109
} catch {
110+
case eofe: EOFException =>
111+
// If the history event file is compressed and inprogress, the compressor will throw an
112+
// EOFException if there is not enough to decompress a proper frame. This indicates
113+
// we're at the end of the file so we treat similarly to the JsonParseException case above.
114+
if (!maybeTruncated) {
115+
throw eofe
116+
} else {
117+
logWarning(s"Got EOFException from log file $sourceName" +
118+
s" at line $lineNumber, the file might not have finished writing cleanly.")
119+
}
110120
case ioe: IOException =>
111121
throw ioe
112122
case e: Exception =>

core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,16 @@
1717

1818
package org.apache.spark.scheduler
1919

20-
import java.io.{File, PrintWriter}
20+
import java.io._
2121
import java.net.URI
22+
import java.util.concurrent.atomic.AtomicInteger
2223

2324
import org.json4s.jackson.JsonMethods._
2425
import org.scalatest.BeforeAndAfter
2526

2627
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
2728
import org.apache.spark.deploy.SparkHadoopUtil
28-
import org.apache.spark.io.CompressionCodec
29+
import org.apache.spark.io.{CompressionCodec, LZ4CompressionCodec}
2930
import org.apache.spark.util.{JsonProtocol, JsonProtocolSuite, Utils}
3031

3132
/**
@@ -72,6 +73,59 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
7273
assert(eventMonster.loggedEvents(1) === JsonProtocol.sparkEventToJson(applicationEnd))
7374
}
7475

76+
/**
77+
* Test replaying compressed spark history file that internally throws an EOFException. To
78+
* avoid sensitivity to the compression specifics the test forces an EOFException to occur
79+
* while reading bytes from the underlying stream (such as observed in actual history files
80+
* in some cases) and forces specific failure handling. This validates correctness in both
81+
* cases when maybeTruncated is true or false.
82+
*/
83+
test("Replay compressed inprogress log file succeeding on partial read") {
84+
val buffered = new ByteArrayOutputStream
85+
val codec = new LZ4CompressionCodec(new SparkConf())
86+
val compstream = codec.compressedOutputStream(buffered)
87+
val writer = new PrintWriter(compstream)
88+
89+
val applicationStart = SparkListenerApplicationStart("AppStarts", None,
90+
125L, "Mickey", None)
91+
val applicationEnd = SparkListenerApplicationEnd(1000L)
92+
93+
// scalastyle:off println
94+
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
95+
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
96+
// scalastyle:on printlnø
97+
writer.close()
98+
99+
val logFilePath = Utils.getFilePath(testDir, "events.lz4.inprogress")
100+
val fstream = fileSystem.create(logFilePath)
101+
val bytes = buffered.toByteArray
102+
103+
fstream.write(bytes, 0, buffered.size)
104+
fstream.close
105+
106+
// Read the compressed .inprogress file and verify only first event was parsed.
107+
val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath)
108+
val replayer = new ReplayListenerBus()
109+
110+
val eventMonster = new EventMonster(conf)
111+
replayer.addListener(eventMonster)
112+
113+
// Verify the replay returns the events given the input maybe truncated.
114+
val logData = EventLoggingListener.openEventLog(logFilePath, fileSystem)
115+
val failingStream = new EarlyEOFInputStream(logData, buffered.size - 10)
116+
replayer.replay(failingStream, logFilePath.toString, true)
117+
118+
assert(eventMonster.loggedEvents.size === 1)
119+
assert(failingStream.didFail)
120+
121+
// Verify the replay throws the EOF exception since the input may not be truncated.
122+
val logData2 = EventLoggingListener.openEventLog(logFilePath, fileSystem)
123+
val failingStream2 = new EarlyEOFInputStream(logData2, buffered.size - 10)
124+
intercept[EOFException] {
125+
replayer.replay(failingStream2, logFilePath.toString, false)
126+
}
127+
}
128+
75129
// This assumes the correctness of EventLoggingListener
76130
test("End-to-end replay") {
77131
testApplicationReplay()
@@ -156,4 +210,23 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
156210
override def start() { }
157211

158212
}
213+
214+
/*
215+
* This is a dummy input stream that wraps another input stream but ends prematurely when
216+
* reading at the specified position, throwing an EOFExeption.
217+
*/
218+
private class EarlyEOFInputStream(in: InputStream, failAtPos: Int) extends InputStream {
219+
private val countDown = new AtomicInteger(failAtPos)
220+
221+
def didFail: Boolean = countDown.get == 0
222+
223+
@throws[IOException]
224+
def read: Int = {
225+
if (countDown.get == 0) {
226+
throw new EOFException("Stream ended prematurely")
227+
}
228+
countDown.decrementAndGet()
229+
in.read
230+
}
231+
}
159232
}

0 commit comments

Comments
 (0)