Skip to content

Commit 8ee336d

Browse files
author
Tyson Condie
committed
update
1 parent d6fec94 commit 8ee336d

File tree

4 files changed

+16
-16
lines changed

4 files changed

+16
-16
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,7 @@ private[kafka010] object KafkaSourceOffset {
5858

5959
/**
6060
* Returns [[KafkaSourceOffset]] from a JSON [[SerializedOffset]]
61-
* or [[KafkaSourceOffset]]
6261
*/
63-
def apply(offset: Offset): KafkaSourceOffset = offset match {
64-
case kso: KafkaSourceOffset => KafkaSourceOffset(kso.partitionToOffsets)
65-
case so: SerializedOffset =>
66-
KafkaSourceOffset(JsonUtils.partitionOffsets(offset.json))
67-
}
62+
def apply(offset: SerializedOffset): KafkaSourceOffset =
63+
KafkaSourceOffset(JsonUtils.partitionOffsets(offset.json))
6864
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,8 @@ class FileStreamSource(
131131
* Returns the data that is between the offsets (`start`, `end`].
132132
*/
133133
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
134-
val startId = start.map(LongOffset(_).offset).getOrElse(-1L)
135-
val endId = LongOffset(end).offset
134+
val startId = start.map(LongOffset.convert(_).offset).getOrElse(-1L)
135+
val endId = LongOffset.convert(end).offset
136136

137137
assert(startId <= endId)
138138
val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,17 @@ case class LongOffset(offset: Long) extends Offset {
3333
object LongOffset {
3434

3535
/**
36-
* LongOffset factory from Offset.
37-
* @param offset used to initialize LongOffset. Assumed to be
38-
* either a LongOffset or SerializedOffset
36+
* LongOffset factory from serialized offset.
3937
* @return new LongOffset
4038
*/
41-
def apply(offset: Offset) : LongOffset = offset match {
42-
case lo: LongOffset => new LongOffset(lo.offset)
43-
case so: SerializedOffset => new LongOffset(offset.json.toLong)
39+
def apply(offset: SerializedOffset) : LongOffset = new LongOffset(offset.json.toLong)
40+
41+
/**
42+
* Convert generic Offset to LongOffset if possible.
43+
* @return converted LongOffset
44+
*/
45+
def convert(offset: Offset): LongOffset = offset match {
46+
case lo: LongOffset => lo
47+
case so: SerializedOffset => LongOffset(so)
4448
}
4549
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,8 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
106106
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
107107
// Compute the internal batch numbers to fetch: [startOrdinal, endOrdinal)
108108
val startOrdinal =
109-
start.map(LongOffset(_)).getOrElse(LongOffset(-1)).offset.toInt + 1
110-
val endOrdinal = LongOffset(end).offset.toInt + 1
109+
start.map(LongOffset.convert(_)).getOrElse(LongOffset(-1)).offset.toInt + 1
110+
val endOrdinal = LongOffset.convert(end).offset.toInt + 1
111111

112112
// Internal buffer only holds the batches after lastCommittedOffset.
113113
val newBlocks = synchronized {

0 commit comments

Comments
 (0)