From dcae69b351ac71bf1554b760aeea6b763b2bf4c0 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Sat, 25 Feb 2017 11:46:35 +0800 Subject: [PATCH 1/3] Fix --- .../spark/sql/kafka010/KafkaSource.scala | 15 +++---- .../spark/sql/kafka010/KafkaSourceSuite.scala | 12 +++++- .../streaming/CompactibleFileStreamLog.scala | 9 ++-- .../streaming/FileStreamSinkLog.scala | 4 +- .../streaming/FileStreamSourceLog.scala | 4 +- .../execution/streaming/HDFSMetadataLog.scala | 37 ++++++++++++++++ .../execution/streaming/OffsetSeqLog.scala | 10 ++--- .../CompactibleFileStreamLogSuite.scala | 42 ++++++++++++++++--- .../streaming/FileStreamSinkLogSuite.scala | 8 ++-- .../streaming/HDFSMetadataLogSuite.scala | 28 +++++++++++++ .../streaming/OffsetSeqLogSuite.scala | 20 +++++++++ 11 files changed, 154 insertions(+), 35 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 92b5d91ba435..a9addaf3b03c 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -100,7 +100,8 @@ private[kafka010] class KafkaSource( override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = { out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517) val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8)) - writer.write(VERSION) + writer.write("v" + VERSION) + writer.write("\n") writer.write(metadata.json) writer.flush } @@ -111,13 +112,13 @@ private[kafka010] class KafkaSource( // HDFSMetadataLog guarantees that it never creates a partial file. assert(content.length != 0) if (content(0) == 'v') { - if (content.startsWith(VERSION)) { - KafkaSourceOffset(SerializedOffset(content.substring(VERSION.length))) + val indexOfNewLine = content.indexOf("\n") + if (indexOfNewLine > 0) { + val version = parseVersion(content.substring(0, indexOfNewLine), VERSION) + KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1))) } else { - val versionInFile = content.substring(0, content.indexOf("\n")) throw new IllegalStateException( - s"Unsupported format. Expected version is ${VERSION.stripLineEnd} " + - s"but was $versionInFile. Please upgrade your Spark.") + s"Log file was malformed: failed to detect the log file version line.") } } else { // The log was generated by Spark 2.1.0 @@ -351,7 +352,7 @@ private[kafka010] object KafkaSource { | source option "failOnDataLoss" to "false". """.stripMargin - private val VERSION = "v1\n" + private[kafka010] val VERSION = 1 def getSortedExecutorList(sc: SparkContext): Array[String] = { val bm = sc.env.blockManager diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index 534fb77c9ce1..5db3422c697f 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -204,7 +204,7 @@ class KafkaSourceSuite extends KafkaSourceTest { override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = { out.write(0) val writer = new BufferedWriter(new OutputStreamWriter(out, UTF_8)) - writer.write(s"v0\n${metadata.json}") + writer.write(s"v99999\n${metadata.json}") writer.flush } } @@ -226,7 +226,15 @@ class KafkaSourceSuite extends KafkaSourceTest { source.getOffset.get // Read initial offset } - assert(e.getMessage.contains("Please upgrade your Spark")) + Seq( + s"Failed to read log file ${metadataPath.getCanonicalPath}/0", + s"UnsupportedLogVersion: maximum supported log version is v${KafkaSource.VERSION}, but " + + s"encountered v99999", + "The log file was produced by a newer version of Spark and cannot be read by this version", + "Please upgrade" + ).foreach { message => + assert(e.getMessage.contains(message)) + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 5a6f9e87f6ea..408c8f81f17b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.SparkSession * doing a compaction, it will read all old log files and merge them with the new batch. */ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( - metadataLogVersion: String, + metadataLogVersion: Int, sparkSession: SparkSession, path: String) extends HDFSMetadataLog[Array[T]](sparkSession, path) { @@ -134,7 +134,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( override def serialize(logData: Array[T], out: OutputStream): Unit = { // called inside a try-finally where the underlying stream is closed in the caller - out.write(metadataLogVersion.getBytes(UTF_8)) + out.write(("v" + metadataLogVersion).getBytes(UTF_8)) logData.foreach { data => out.write('\n') out.write(Serialization.write(data).getBytes(UTF_8)) @@ -146,10 +146,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( if (!lines.hasNext) { throw new IllegalStateException("Incomplete log file") } - val version = lines.next() - if (version != metadataLogVersion) { - throw new IllegalStateException(s"Unknown log version: ${version}") - } + val version = parseVersion(lines.next(), metadataLogVersion) lines.map(Serialization.read[T]).toArray } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala index eb6eed87eca7..8d718b2164d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala @@ -77,7 +77,7 @@ object SinkFileStatus { * (drops the deleted files). */ class FileStreamSinkLog( - metadataLogVersion: String, + metadataLogVersion: Int, sparkSession: SparkSession, path: String) extends CompactibleFileStreamLog[SinkFileStatus](metadataLogVersion, sparkSession, path) { @@ -106,7 +106,7 @@ class FileStreamSinkLog( } object FileStreamSinkLog { - val VERSION = "v1" + val VERSION = 1 val DELETE_ACTION = "delete" val ADD_ACTION = "add" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala index 81908c0cefdf..33e6a1d5d6e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry import org.apache.spark.sql.internal.SQLConf class FileStreamSourceLog( - metadataLogVersion: String, + metadataLogVersion: Int, sparkSession: SparkSession, path: String) extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, sparkSession, path) { @@ -120,5 +120,5 @@ class FileStreamSourceLog( } object FileStreamSourceLog { - val VERSION = "v1" + val VERSION = 1 } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index f9e1f7de9ec0..4837409cad18 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.streaming import java.io._ +import java.lang.IllegalStateException import java.nio.charset.StandardCharsets import java.util.{ConcurrentModificationException, EnumSet, UUID} @@ -195,6 +196,11 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: val input = fileManager.open(batchMetadataFile) try { Some(deserialize(input)) + } catch { + case ise: IllegalStateException => + // re-throw the exception with the log file path added + throw new IllegalStateException( + s"Failed to read log file $batchMetadataFile. ${ise.getMessage}") } finally { IOUtils.closeQuietly(input) } @@ -268,6 +274,37 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: new FileSystemManager(metadataPath, hadoopConf) } } + + /** + * Parse the log version from the given `text` -- will throw exception when the parsed version + * exceeds `maxSupportedVersion`, or when `text` is malformed (such as "xyz", "v", "v-1", + * "v123xyz" etc.) + */ + private[sql] def parseVersion(text: String, maxSupportedVersion: Int): Int = { + if (text.length > 0 && text(0) == 'v') { + val version = + try { text.substring(1, text.length).toInt } + catch { + case _: NumberFormatException => + throw new IllegalStateException(s"Log file was malformed: failed to read correct log " + + s"version from $text.") + } + if (version > 0) { + if (version > maxSupportedVersion) { + throw new IllegalStateException(s"UnsupportedLogVersion: maximum supported log version " + + s"is v${maxSupportedVersion}, but encountered v$version. The log file was produced " + + s"by a newer version of Spark and cannot be read by this version. Please upgrade.") + } + else { + return version + } + } + } + + // reaching here means we failed to read the correct log version + throw new IllegalStateException(s"Log file was malformed: failed to read correct log " + + s"version from $text.") + } } object HDFSMetadataLog { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala index 3210d8ad64e2..4f8cd116f610 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala @@ -55,10 +55,8 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String) if (!lines.hasNext) { throw new IllegalStateException("Incomplete log file") } - val version = lines.next() - if (version != OffsetSeqLog.VERSION) { - throw new IllegalStateException(s"Unknown log version: ${version}") - } + + val version = parseVersion(lines.next(), OffsetSeqLog.VERSION) // read metadata val metadata = lines.next().trim match { @@ -70,7 +68,7 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String) override protected def serialize(offsetSeq: OffsetSeq, out: OutputStream): Unit = { // called inside a try-finally where the underlying stream is closed in the caller - out.write(OffsetSeqLog.VERSION.getBytes(UTF_8)) + out.write(("v" + OffsetSeqLog.VERSION).getBytes(UTF_8)) // write metadata out.write('\n') @@ -88,6 +86,6 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String) } object OffsetSeqLog { - private val VERSION = "v1" + private[streaming] val VERSION = 1 private val SERIALIZED_VOID_OFFSET = "-" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala index 24d92a96237e..f35a8ab7e3da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala @@ -122,7 +122,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext defaultMinBatchesToRetain = 1, compactibleLog => { val logs = Array("entry_1", "entry_2", "entry_3") - val expected = s"""${FakeCompactibleFileStreamLog.VERSION} + val expected = s"""v${FakeCompactibleFileStreamLog.VERSION} |"entry_1" |"entry_2" |"entry_3"""".stripMargin @@ -132,7 +132,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext baos.reset() compactibleLog.serialize(Array(), baos) - assert(FakeCompactibleFileStreamLog.VERSION === baos.toString(UTF_8.name())) + assert(s"v${FakeCompactibleFileStreamLog.VERSION}" === baos.toString(UTF_8.name())) }) } @@ -142,7 +142,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext defaultCompactInterval = 3, defaultMinBatchesToRetain = 1, compactibleLog => { - val logs = s"""${FakeCompactibleFileStreamLog.VERSION} + val logs = s"""v${FakeCompactibleFileStreamLog.VERSION} |"entry_1" |"entry_2" |"entry_3"""".stripMargin @@ -152,10 +152,38 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext assert(Nil === compactibleLog.deserialize( - new ByteArrayInputStream(FakeCompactibleFileStreamLog.VERSION.getBytes(UTF_8)))) + new ByteArrayInputStream(s"v${FakeCompactibleFileStreamLog.VERSION}".getBytes(UTF_8)))) }) } + test("deserialization log written by future version") { + withTempDir { dir => + def newFakeCompactibleFileStreamLog(version: Int): FakeCompactibleFileStreamLog = + new FakeCompactibleFileStreamLog( + version, + _fileCleanupDelayMs = Long.MaxValue, // this param does not matter here in this test case + _defaultCompactInterval = 3, // this param does not matter here in this test case + _defaultMinBatchesToRetain = 1, // this param does not matter here in this test case + spark, + dir.getCanonicalPath) + + val writer = newFakeCompactibleFileStreamLog(version = 2) + val reader = newFakeCompactibleFileStreamLog(version = 1) + writer.add(0, Array("entry")) + val e = intercept[IllegalStateException] { + reader.get(0) + } + Seq( + s"Failed to read log file ${dir.getCanonicalPath}/0", + "UnsupportedLogVersion: maximum supported log version is v1, but encountered v2", + "The log file was produced by a newer version of Spark and cannot be read by this version", + "Please upgrade" + ).foreach { message => + assert(e.getMessage.contains(message)) + } + } + } + test("compact") { withFakeCompactibleFileStreamLog( fileCleanupDelayMs = Long.MaxValue, @@ -219,6 +247,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext ): Unit = { withTempDir { file => val compactibleLog = new FakeCompactibleFileStreamLog( + FakeCompactibleFileStreamLog.VERSION, fileCleanupDelayMs, defaultCompactInterval, defaultMinBatchesToRetain, @@ -230,17 +259,18 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext } object FakeCompactibleFileStreamLog { - val VERSION = "test_version" + val VERSION = 1 } class FakeCompactibleFileStreamLog( + metadataLogVersion: Int, _fileCleanupDelayMs: Long, _defaultCompactInterval: Int, _defaultMinBatchesToRetain: Int, sparkSession: SparkSession, path: String) extends CompactibleFileStreamLog[String]( - FakeCompactibleFileStreamLog.VERSION, + metadataLogVersion, sparkSession, path ) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala index 340d2945acd4..dd3a414659c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala @@ -74,7 +74,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { action = FileStreamSinkLog.ADD_ACTION)) // scalastyle:off - val expected = s"""$VERSION + val expected = s"""v$VERSION |{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"} |{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"} |{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin @@ -84,14 +84,14 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { assert(expected === baos.toString(UTF_8.name())) baos.reset() sinkLog.serialize(Array(), baos) - assert(VERSION === baos.toString(UTF_8.name())) + assert(s"v$VERSION" === baos.toString(UTF_8.name())) } } test("deserialize") { withFileStreamSinkLog { sinkLog => // scalastyle:off - val logs = s"""$VERSION + val logs = s"""v$VERSION |{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"} |{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"} |{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin @@ -125,7 +125,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { assert(expected === sinkLog.deserialize(new ByteArrayInputStream(logs.getBytes(UTF_8)))) - assert(Nil === sinkLog.deserialize(new ByteArrayInputStream(VERSION.getBytes(UTF_8)))) + assert(Nil === sinkLog.deserialize(new ByteArrayInputStream(s"v$VERSION".getBytes(UTF_8)))) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index 55750b920298..a543c3d9c6e9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -127,6 +127,34 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { } } + test("HDFSMetadataLog: parseVersion") { + withTempDir { dir => + val metadataLog = new HDFSMetadataLog[String](spark, dir.getAbsolutePath) + def assertLogFileMalformed(func: => Int): Unit = { + val e = intercept[IllegalStateException] { func } + assert(e.getMessage.contains(s"Log file was malformed: failed to read correct log version")) + } + assertLogFileMalformed { metadataLog.parseVersion("", 100) } + assertLogFileMalformed { metadataLog.parseVersion("xyz", 100) } + assertLogFileMalformed { metadataLog.parseVersion("v10.x", 100) } + assertLogFileMalformed { metadataLog.parseVersion("10", 100) } + assertLogFileMalformed { metadataLog.parseVersion("v0", 100) } + assertLogFileMalformed { metadataLog.parseVersion("v-10", 100) } + + assert(metadataLog.parseVersion("v10", 10) === 10) + assert(metadataLog.parseVersion("v10", 100) === 10) + + val e = intercept[IllegalStateException] { metadataLog.parseVersion("v200", 100) } + Seq( + s"UnsupportedLogVersion: maximum supported log version is v100, but encountered v200", + "The log file was produced by a newer version of Spark and cannot be read by this version", + "Please upgrade" + ).foreach { message => + assert(e.getMessage.contains(message)) + } + } + } + test("HDFSMetadataLog: restart") { withTempDir { temp => val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala index 5ae8b2484d2e..159d52b244aa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming import java.io.File import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.util.stringToFile import org.apache.spark.sql.test.SharedSQLContext class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { @@ -70,6 +71,25 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { } } + test("deserialization log written by future version") { + withTempDir { dir => + stringToFile(new File(dir, "0"), "v99999") + val log = new OffsetSeqLog(spark, dir.getCanonicalPath) + val e = intercept[IllegalStateException] { + log.get(0) + } + Seq( + s"Failed to read log file ${dir.getCanonicalPath}/0", + s"UnsupportedLogVersion: maximum supported log version is v${OffsetSeqLog.VERSION}, but " + + s"encountered v99999", + "The log file was produced by a newer version of Spark and cannot be read by this version", + "Please upgrade" + ).foreach { message => + assert(e.getMessage.contains(message)) + } + } + } + test("read Spark 2.1.0 log format") { val (batchId, offsetSeq) = readFromResource("offset-log-version-2.1.0") assert(batchId === 0) From 18f77b0db2ccb9198ac3fd554633fece25e26686 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Sun, 26 Feb 2017 19:38:02 +0800 Subject: [PATCH 2/3] Address Sean's comments --- .../org/apache/spark/sql/kafka010/KafkaSource.scala | 3 +-- .../org/apache/spark/sql/kafka010/KafkaSourceSuite.scala | 7 ++----- .../spark/sql/execution/streaming/HDFSMetadataLog.scala | 9 ++++----- .../streaming/CompactibleFileStreamLogSuite.scala | 6 ++---- .../sql/execution/streaming/HDFSMetadataLogSuite.scala | 5 ++--- .../sql/execution/streaming/OffsetSeqLogSuite.scala | 7 ++----- 6 files changed, 13 insertions(+), 24 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index a9addaf3b03c..1fb0a338299b 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -100,8 +100,7 @@ private[kafka010] class KafkaSource( override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = { out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517) val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8)) - writer.write("v" + VERSION) - writer.write("\n") + writer.write("v" + VERSION + "\n") writer.write(metadata.json) writer.flush } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index 5db3422c697f..8645bd39bc6f 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -227,11 +227,8 @@ class KafkaSourceSuite extends KafkaSourceTest { } Seq( - s"Failed to read log file ${metadataPath.getCanonicalPath}/0", - s"UnsupportedLogVersion: maximum supported log version is v${KafkaSource.VERSION}, but " + - s"encountered v99999", - "The log file was produced by a newer version of Spark and cannot be read by this version", - "Please upgrade" + s"maximum supported log version is v${KafkaSource.VERSION}, but encountered v99999", + "produced by a newer version of Spark and cannot be read by this version" ).foreach { message => assert(e.getMessage.contains(message)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 4837409cad18..8729b62386cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.streaming import java.io._ -import java.lang.IllegalStateException import java.nio.charset.StandardCharsets import java.util.{ConcurrentModificationException, EnumSet, UUID} @@ -283,8 +282,9 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: private[sql] def parseVersion(text: String, maxSupportedVersion: Int): Int = { if (text.length > 0 && text(0) == 'v') { val version = - try { text.substring(1, text.length).toInt } - catch { + try { + text.substring(1, text.length).toInt + } catch { case _: NumberFormatException => throw new IllegalStateException(s"Log file was malformed: failed to read correct log " + s"version from $text.") @@ -294,8 +294,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: throw new IllegalStateException(s"UnsupportedLogVersion: maximum supported log version " + s"is v${maxSupportedVersion}, but encountered v$version. The log file was produced " + s"by a newer version of Spark and cannot be read by this version. Please upgrade.") - } - else { + } else { return version } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala index f35a8ab7e3da..20ac06f048c6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala @@ -174,10 +174,8 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext reader.get(0) } Seq( - s"Failed to read log file ${dir.getCanonicalPath}/0", - "UnsupportedLogVersion: maximum supported log version is v1, but encountered v2", - "The log file was produced by a newer version of Spark and cannot be read by this version", - "Please upgrade" + "maximum supported log version is v1, but encountered v2", + "produced by a newer version of Spark and cannot be read by this version" ).foreach { message => assert(e.getMessage.contains(message)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index a543c3d9c6e9..662c4466b21b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -146,9 +146,8 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { val e = intercept[IllegalStateException] { metadataLog.parseVersion("v200", 100) } Seq( - s"UnsupportedLogVersion: maximum supported log version is v100, but encountered v200", - "The log file was produced by a newer version of Spark and cannot be read by this version", - "Please upgrade" + "maximum supported log version is v100, but encountered v200", + "produced by a newer version of Spark and cannot be read by this version" ).foreach { message => assert(e.getMessage.contains(message)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala index 159d52b244aa..f7f0dade8717 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala @@ -79,11 +79,8 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { log.get(0) } Seq( - s"Failed to read log file ${dir.getCanonicalPath}/0", - s"UnsupportedLogVersion: maximum supported log version is v${OffsetSeqLog.VERSION}, but " + - s"encountered v99999", - "The log file was produced by a newer version of Spark and cannot be read by this version", - "Please upgrade" + s"maximum supported log version is v${OffsetSeqLog.VERSION}, but encountered v99999", + "produced by a newer version of Spark and cannot be read by this version" ).foreach { message => assert(e.getMessage.contains(message)) } From 32c00179feb730b5f30ec5282a41d60357ad322f Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Thu, 16 Mar 2017 15:14:04 +0800 Subject: [PATCH 3/3] Address comments from Ryan --- .../apache/spark/sql/execution/streaming/HDFSMetadataLog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 8729b62386cf..60ce64261c4a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -199,7 +199,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: case ise: IllegalStateException => // re-throw the exception with the log file path added throw new IllegalStateException( - s"Failed to read log file $batchMetadataFile. ${ise.getMessage}") + s"Failed to read log file $batchMetadataFile. ${ise.getMessage}", ise) } finally { IOUtils.closeQuietly(input) }