Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@

package org.apache.spark.sql.execution.streaming.sources

import java.nio.file.Files
import java.util.Optional
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous._
Expand Down Expand Up @@ -54,12 +52,15 @@ class RateSourceSuite extends StreamTest {
}

test("microbatch in registry") {
DataSource.lookupDataSource("rate", spark.sqlContext.conf).newInstance() match {
case ds: MicroBatchReadSupport =>
val reader = ds.createMicroBatchReader(Optional.empty(), "dummy", DataSourceOptions.empty())
assert(reader.isInstanceOf[RateStreamMicroBatchReader])
case _ =>
throw new IllegalStateException("Could not find read support for rate")
withTempDir { temp =>
DataSource.lookupDataSource("rate", spark.sqlContext.conf).newInstance() match {
case ds: MicroBatchReadSupport =>
val reader = ds.createMicroBatchReader(
Optional.empty(), temp.getCanonicalPath, DataSourceOptions.empty())
assert(reader.isInstanceOf[RateStreamMicroBatchReader])
case _ =>
throw new IllegalStateException("Could not find read support for rate")
}
}
}

Expand Down Expand Up @@ -108,69 +109,75 @@ class RateSourceSuite extends StreamTest {
}

test("microbatch - set offset") {
val temp = Files.createTempDirectory("dummy").toString
val reader = new RateStreamMicroBatchReader(DataSourceOptions.empty(), temp)
val startOffset = LongOffset(0L)
val endOffset = LongOffset(1L)
reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
assert(reader.getStartOffset() == startOffset)
assert(reader.getEndOffset() == endOffset)
withTempDir { temp =>
val reader = new RateStreamMicroBatchReader(DataSourceOptions.empty(), temp.getCanonicalPath)
val startOffset = LongOffset(0L)
val endOffset = LongOffset(1L)
reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
assert(reader.getStartOffset() == startOffset)
assert(reader.getEndOffset() == endOffset)
}
}

test("microbatch - infer offsets") {
val tempFolder = Files.createTempDirectory("dummy").toString
val reader = new RateStreamMicroBatchReader(
new DataSourceOptions(
Map("numPartitions" -> "1", "rowsPerSecond" -> "100", "useManualClock" -> "true").asJava),
tempFolder)
reader.clock.asInstanceOf[ManualClock].advance(100000)
reader.setOffsetRange(Optional.empty(), Optional.empty())
reader.getStartOffset() match {
case r: LongOffset => assert(r.offset === 0L)
case _ => throw new IllegalStateException("unexpected offset type")
}
reader.getEndOffset() match {
case r: LongOffset => assert(r.offset >= 100)
case _ => throw new IllegalStateException("unexpected offset type")
withTempDir { temp =>
val reader = new RateStreamMicroBatchReader(
new DataSourceOptions(
Map("numPartitions" -> "1", "rowsPerSecond" -> "100", "useManualClock" -> "true").asJava),
temp.getCanonicalPath)
reader.clock.asInstanceOf[ManualClock].advance(100000)
reader.setOffsetRange(Optional.empty(), Optional.empty())
reader.getStartOffset() match {
case r: LongOffset => assert(r.offset === 0L)
case _ => throw new IllegalStateException("unexpected offset type")
}
reader.getEndOffset() match {
case r: LongOffset => assert(r.offset >= 100)
case _ => throw new IllegalStateException("unexpected offset type")
}
}
}

test("microbatch - predetermined batch size") {
val temp = Files.createTempDirectory("dummy").toString
val reader = new RateStreamMicroBatchReader(
new DataSourceOptions(Map("numPartitions" -> "1", "rowsPerSecond" -> "20").asJava), temp)
val startOffset = LongOffset(0L)
val endOffset = LongOffset(1L)
reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
val tasks = reader.planInputPartitions()
assert(tasks.size == 1)
val dataReader = tasks.get(0).createPartitionReader()
val data = ArrayBuffer[Row]()
while (dataReader.next()) {
data.append(dataReader.get())
withTempDir { temp =>
val reader = new RateStreamMicroBatchReader(
new DataSourceOptions(Map("numPartitions" -> "1", "rowsPerSecond" -> "20").asJava),
temp.getCanonicalPath)
val startOffset = LongOffset(0L)
val endOffset = LongOffset(1L)
reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
val tasks = reader.planInputPartitions()
assert(tasks.size == 1)
val dataReader = tasks.get(0).createPartitionReader()
val data = ArrayBuffer[Row]()
while (dataReader.next()) {
data.append(dataReader.get())
}
assert(data.size === 20)
}
assert(data.size === 20)
}

test("microbatch - data read") {
val temp = Files.createTempDirectory("dummy").toString
val reader = new RateStreamMicroBatchReader(
new DataSourceOptions(Map("numPartitions" -> "11", "rowsPerSecond" -> "33").asJava), temp)
val startOffset = LongOffset(0L)
val endOffset = LongOffset(1L)
reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
val tasks = reader.planInputPartitions()
assert(tasks.size == 11)

val readData = tasks.asScala
.map(_.createPartitionReader())
.flatMap { reader =>
val buf = scala.collection.mutable.ListBuffer[Row]()
while (reader.next()) buf.append(reader.get())
buf
}
withTempDir { temp =>
val reader = new RateStreamMicroBatchReader(
new DataSourceOptions(Map("numPartitions" -> "11", "rowsPerSecond" -> "33").asJava),
temp.getCanonicalPath)
val startOffset = LongOffset(0L)
val endOffset = LongOffset(1L)
reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
val tasks = reader.planInputPartitions()
assert(tasks.size == 11)

val readData = tasks.asScala
.map(_.createPartitionReader())
.flatMap { reader =>
val buf = scala.collection.mutable.ListBuffer[Row]()
while (reader.next()) buf.append(reader.get())
buf
}

assert(readData.map(_.getLong(1)).sorted == Range(0, 33))
assert(readData.map(_.getLong(1)).sorted == Range(0, 33))
}
}

test("valueAtSecond") {
Expand Down