Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ trait StreamTest extends QueryTest with Timeouts {
}

val testThread = Thread.currentThread()
val metadataRoot = Utils.createTempDir("streaming.metadata").getCanonicalPath
val metadataRoot = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath

try {
startedTest.foreach { action =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,8 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
@volatile var query: StreamExecution = null
try {
val df = ds.toDF
val metadataRoot = Utils.createTempDir("streaming.metadata").getCanonicalPath
val metadataRoot =
Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
query = sqlContext
.streams
.startQuery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with BeforeAndAfter {
import testImplicits._

private def newMetadataDir = Utils.createTempDir("streaming.metadata").getCanonicalPath
private def newMetadataDir =
Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath

after {
sqlContext.streams.active.foreach(_.stop())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext {
val inputData = MemoryStream[Int]
val df = inputData.toDF()

val outputDir = Utils.createTempDir("stream.output").getCanonicalPath
val checkpointDir = Utils.createTempDir("stream.checkpoint").getCanonicalPath
val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath

val query =
df.write
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}

test("read from text files") {
val src = Utils.createTempDir("streaming.src")
val tmp = Utils.createTempDir("streaming.tmp")
val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")

val textSource = createFileStreamSource("text", src.getCanonicalPath)
val filtered = textSource.toDF().filter($"value" contains "keep")
Expand All @@ -224,8 +224,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}

test("read from json files") {
val src = Utils.createTempDir("streaming.src")
val tmp = Utils.createTempDir("streaming.tmp")
val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")

val textSource = createFileStreamSource("json", src.getCanonicalPath, Some(valueSchema))
val filtered = textSource.toDF().filter($"value" contains "keep")
Expand Down Expand Up @@ -258,8 +258,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}

test("read from json files with inferring schema") {
val src = Utils.createTempDir("streaming.src")
val tmp = Utils.createTempDir("streaming.tmp")
val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")

// Add a file so that we can infer its schema
stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}")
Expand All @@ -279,8 +279,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}

test("read from parquet files") {
val src = Utils.createTempDir("streaming.src")
val tmp = Utils.createTempDir("streaming.tmp")
val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")

val fileSource = createFileStreamSource("parquet", src.getCanonicalPath, Some(valueSchema))
val filtered = fileSource.toDF().filter($"value" contains "keep")
Expand All @@ -301,7 +301,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}

test("file stream source without schema") {
val src = Utils.createTempDir("streaming.src")
val src = Utils.createTempDir(namePrefix = "streaming.src")

// Only "text" doesn't need a schema
createFileStreamSource("text", src.getCanonicalPath)
Expand All @@ -318,8 +318,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}

test("fault tolerance") {
val src = Utils.createTempDir("streaming.src")
val tmp = Utils.createTempDir("streaming.tmp")
val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")

val textSource = createFileStreamSource("text", src.getCanonicalPath)
val filtered = textSource.toDF().filter($"value" contains "keep")
Expand All @@ -346,8 +346,8 @@ class FileStreamSourceStressTestSuite extends FileStreamSourceTest with SharedSQ
import testImplicits._

test("file source stress test") {
val src = Utils.createTempDir("streaming.src")
val tmp = Utils.createTempDir("streaming.tmp")
val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")

val textSource = createFileStreamSource("text", src.getCanonicalPath)
val ds = textSource.toDS[String]().map(_.toInt + 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ class FileStressSuite extends StreamTest with SharedSQLContext {

test("fault tolerance stress test") {
val numRecords = 10000
val inputDir = Utils.createTempDir("stream.input").getCanonicalPath
val stagingDir = Utils.createTempDir("stream.staging").getCanonicalPath
val outputDir = Utils.createTempDir("stream.output").getCanonicalPath
val checkpoint = Utils.createTempDir("stream.checkpoint").getCanonicalPath
val inputDir = Utils.createTempDir(namePrefix = "stream.input").getCanonicalPath
val stagingDir = Utils.createTempDir(namePrefix = "stream.staging").getCanonicalPath
val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
val checkpoint = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath

@volatile
var continue = true
Expand Down