Skip to content

Commit acab31e

Browse files
author
Tyson Condie
committed
resolving broken tests
1 parent 8ee336d commit acab31e

File tree

12 files changed

+85
-54
lines changed

12 files changed

+85
-54
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,6 @@ private[kafka010]
2929
case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends Offset {
3030

3131
override val json = JsonUtils.partitionOffsets(partitionToOffsets)
32-
33-
override def toString(): String = {
34-
partitionToOffsets.toSeq.sortBy(_._1.toString).mkString("[", ", ", "]")
35-
}
3632
}
3733

3834
/** Companion object of the [[KafkaSourceOffset]] */

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,8 @@ class FileStreamSource(
131131
* Returns the data that is between the offsets (`start`, `end`].
132132
*/
133133
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
134-
val startId = start.map(LongOffset.convert(_).offset).getOrElse(-1L)
135-
val endId = LongOffset.convert(end).offset
134+
val startId = start.flatMap(LongOffset.convert(_)).getOrElse(LongOffset(-1L)).offset
135+
val endId = LongOffset.convert(end).getOrElse(LongOffset(0)).offset
136136

137137
assert(startId <= endId)
138138
val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ case class LongOffset(offset: Long) extends Offset {
2626

2727
def +(increment: Long): LongOffset = new LongOffset(offset + increment)
2828
def -(decrement: Long): LongOffset = new LongOffset(offset - decrement)
29-
30-
override def toString: String = s"#$offset"
3129
}
3230

3331
object LongOffset {
@@ -42,8 +40,9 @@ object LongOffset {
4240
* Convert generic Offset to LongOffset if possible.
4341
* @return converted LongOffset
4442
*/
45-
def convert(offset: Offset): LongOffset = offset match {
46-
case lo: LongOffset => lo
47-
case so: SerializedOffset => LongOffset(so)
43+
def convert(offset: Offset): Option[LongOffset] = offset match {
44+
case lo: LongOffset => Some(lo)
45+
case so: SerializedOffset => Some(LongOffset(so))
46+
case _ => None
4847
}
4948
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,23 @@ package org.apache.spark.sql.execution.streaming
2525
*/
2626
abstract class Offset {
2727

28+
/**
29+
* Equality based on JSON string representation.
30+
*/
31+
override final def equals(obj: Any): Boolean = obj match {
32+
case o: Offset => this.json == o.json
33+
case _ => false
34+
}
35+
36+
override def hashCode(): Int = this.json.hashCode
37+
38+
override def toString(): String = this.json.toString
39+
2840
/**
2941
* A JSON-serialized representation of an Offset that is
3042
* used for saving offsets to the offset log.
43+
* Note: We assume that equivalent/equal offsets serialize to
44+
* identical JSON strings.
3145
*
3246
* @return JSON string encoding
3347
*/

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ case class OffsetSeq(offsets: Seq[Option[Offset]]) {
3838
}
3939

4040
override def toString: String =
41-
offsets.map(_.map(_.toString).getOrElse("-")).mkString("[", ", ", "]")
41+
offsets.map(_.map(_.json).getOrElse("-")).mkString("[", ", ", "]")
4242
}
4343

4444
object OffsetSeq {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,31 @@ import scala.io.{Source => IOSource}
2525

2626
import org.apache.spark.sql.SparkSession
2727

28+
/**
29+
* This class is used to log offsets to persistent files in HDFS.
30+
* Each file corresponds to a specific batch of offsets. The file
31+
* format contain a version string in the first line, followed
32+
* by a the JSON string representation of the offsets separated
33+
* by a newline character. If a source offset is missing, then
34+
* that line will contain a string value defined in the
35+
* SERIALIZED_VOID_OFFSET variable in [[OffsetSeqLog]] companion object.
36+
* For instance, when dealine wiht [[LongOffset]] types:
37+
* v1 // version 1
38+
* {0} // LongOffset 0
39+
* {3} // LongOffset 3
40+
* - // No offset for this source i.e., an invalid JSON string
41+
* {2} // LongOffset 2
42+
* ...
43+
*/
2844
class OffsetSeqLog(sparkSession: SparkSession, path: String)
2945
extends HDFSMetadataLog[OffsetSeq](sparkSession, path) {
3046

3147
override protected def deserialize(in: InputStream): OffsetSeq = {
3248
// called inside a try-finally where the underlying stream is closed in the caller
49+
def parseOffset(value: String): Offset = value match {
50+
case OffsetSeqLog.SERIALIZED_VOID_OFFSET => null
51+
case json => SerializedOffset(json)
52+
}
3353
val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
3454
if (!lines.hasNext) {
3555
throw new IllegalStateException("Incomplete log file")
@@ -38,19 +58,23 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String)
3858
if (version != OffsetSeqLog.VERSION) {
3959
throw new IllegalStateException(s"Unknown log version: ${version}")
4060
}
41-
OffsetSeq.fill(lines.map(offset => SerializedOffset(offset)).toArray: _*)
61+
OffsetSeq.fill(lines.map(parseOffset).toArray: _*)
4262
}
4363

4464
override protected def serialize(metadata: OffsetSeq, out: OutputStream): Unit = {
4565
// called inside a try-finally where the underlying stream is closed in the caller
4666
out.write(OffsetSeqLog.VERSION.getBytes(UTF_8))
47-
metadata.offsets.map(_.map(_.json)).flatten.foreach { offset =>
67+
metadata.offsets.map(_.map(_.json)).foreach { offset =>
4868
out.write('\n')
49-
out.write(offset.getBytes(UTF_8))
69+
offset match {
70+
case Some(json: String) => out.write(json.getBytes(UTF_8))
71+
case None => out.write(OffsetSeqLog.SERIALIZED_VOID_OFFSET.getBytes(UTF_8))
72+
}
5073
}
5174
}
5275
}
5376

5477
object OffsetSeqLog {
5578
private val VERSION = "v1"
79+
private val SERIALIZED_VOID_OFFSET = "-"
5680
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -683,7 +683,7 @@ class StreamExecution(
683683
val sourceStatuses = sources.map { s =>
684684
SourceStatus(
685685
s.toString,
686-
localAvailableOffsets.get(s).map(_.toString).getOrElse("-"), // TODO: use json if available
686+
localAvailableOffsets.get(s).map(_.json).getOrElse("-"),
687687
streamMetrics.currentSourceInputRate(s),
688688
streamMetrics.currentSourceProcessingRate(s),
689689
streamMetrics.currentSourceTriggerDetails(s))

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,8 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
106106
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
107107
// Compute the internal batch numbers to fetch: [startOrdinal, endOrdinal)
108108
val startOrdinal =
109-
start.map(LongOffset.convert(_)).getOrElse(LongOffset(-1)).offset.toInt + 1
110-
val endOrdinal = LongOffset.convert(end).offset.toInt + 1
109+
start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1
110+
val endOrdinal = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1
111111

112112
// Internal buffer only holds the batches after lastCommittedOffset.
113113
val newBlocks = synchronized {
@@ -137,12 +137,11 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
137137
batches.trimStart(offsetDiff)
138138
lastOffsetCommitted = newOffset
139139
}
140-
end match {
141-
case newOffset: LongOffset => check(newOffset)
142-
case so: SerializedOffset => check(LongOffset(so))
143-
case _ =>
144-
sys.error(s"MemoryStream.commit() received an offset ($end) that did not originate with " +
145-
"an instance of this class")
140+
141+
LongOffset.convert(end) match {
142+
case Some(lo) => check(lo)
143+
case None => sys.error(s"MemoryStream.commit() received an offset ($end) " +
144+
"that did not originate with an instance of this class")
146145
}
147146
}
148147

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,8 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo
116116
/** Returns the data that is between the offsets (`start`, `end`]. */
117117
override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized {
118118
val startOrdinal =
119-
start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1
120-
val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1
119+
start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1
120+
val endOrdinal = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1
121121

122122
// Internal buffer only holds the batches after lastOffsetCommitted
123123
val rawList = synchronized {
@@ -140,20 +140,19 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo
140140
}
141141

142142
override def commit(end: Offset): Unit = synchronized {
143-
if (end.isInstanceOf[LongOffset]) {
144-
val newOffset = end.asInstanceOf[LongOffset]
145-
val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
146-
147-
if (offsetDiff < 0) {
148-
sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
149-
}
150-
151-
batches.trimStart(offsetDiff)
152-
lastOffsetCommitted = newOffset
153-
} else {
143+
val newOffset = LongOffset.convert(end).getOrElse(
154144
sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " +
155145
s"originate with an instance of this class")
146+
)
147+
148+
val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
149+
150+
if (offsetDiff < 0) {
151+
sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
156152
}
153+
154+
batches.trimStart(offsetDiff)
155+
lastOffsetCommitted = newOffset
157156
}
158157

159158
/** Stop this source. */

sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ private[sql] object StreamingQueryStatus {
140140
sourceStatuses = Array(
141141
SourceStatus(
142142
desc = "MySource1",
143-
offsetDesc = LongOffset(0).toString,
143+
offsetDesc = LongOffset(0).json,
144144
inputRate = 15.5,
145145
processingRate = 23.5,
146146
triggerDetails = Map(

0 commit comments

Comments
 (0)