Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -279,28 +279,6 @@ case class DataSource(
}
}

/**
* Returns true if there is a single path that has a metadata log indicating which files should
* be read.
*/
def hasMetadata(path: Seq[String]): Boolean = {
path match {
case Seq(singlePath) =>
try {
val hdfsPath = new Path(singlePath)
val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
val metadataPath = new Path(hdfsPath, FileStreamSink.metadataDir)
val res = fs.exists(metadataPath)
res
} catch {
case NonFatal(e) =>
logWarning(s"Error while looking for metadata directory.")
false
}
case _ => false
}
}

/**
* Create a resolved [[BaseRelation]] that can be used to read data from or write data into this
* [[DataSource]]
Expand Down Expand Up @@ -331,7 +309,9 @@ case class DataSource(
// We are reading from the results of a streaming query. Load files from the metadata log
// instead of listing them using HDFS APIs.
case (format: FileFormat, _)
if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) =>
if FileStreamSink.hasMetadata(
caseInsensitiveOptions.get("path").toSeq ++ paths,
sparkSession.sessionState.newHadoopConf()) =>
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath)
val dataSchema = userSpecifiedSchema.orElse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.spark.sql.execution.streaming

import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.Logging
Expand All @@ -25,9 +28,31 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.datasources.{FileFormat, FileFormatWriter}

object FileStreamSink {
object FileStreamSink extends Logging {
// The name of the subdirectory that is used to store metadata about which files are valid.
val metadataDir = "_spark_metadata"

/**
* Returns true if there is a single path that has a metadata log indicating which files should
* be read.
*/
def hasMetadata(path: Seq[String], hadoopConf: Configuration): Boolean = {
path match {
case Seq(singlePath) =>
try {
val hdfsPath = new Path(singlePath)
val fs = hdfsPath.getFileSystem(hadoopConf)
val metadataPath = new Path(hdfsPath, metadataDir)
val res = fs.exists(metadataPath)
res
} catch {
case NonFatal(e) =>
logWarning(s"Error while looking for metadata directory.")
false
}
case _ => false
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.streaming

import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
Expand All @@ -43,8 +43,10 @@ class FileStreamSource(

private val sourceOptions = new FileStreamOptions(options)

private val hadoopConf = sparkSession.sessionState.newHadoopConf()

private val qualifiedBasePath: Path = {
val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf())
val fs = new Path(path).getFileSystem(hadoopConf)
fs.makeQualified(new Path(path)) // can contains glob patterns
}

Expand Down Expand Up @@ -157,14 +159,65 @@ class FileStreamSource(
checkFilesExist = false)))
}

/**
* If the source has a metadata log indicating which files should be read, then we should use it.
* Only when user gives a non-glob path that will we figure out whether the source has some
* metadata log
*
* None means we don't know at the moment
* Some(true) means we know for sure the source DOES have metadata
* Some(false) means we know for sure the source DOSE NOT have metadata
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

( some notes here since the changes are not trival )

here we're using this sourceHasMetadata to indicate whether we know for sure the source has metadata, as stated in the source file comments:

  • None means we don't know at the moment
  • Some(true) means we know for sure the source DOES have metadata
  • Some(false) means we know for sure the source DOSE NOT have metadata

*/
@volatile private[sql] var sourceHasMetadata: Option[Boolean] =
if (SparkHadoopUtil.get.isGlobPath(new Path(path))) Some(false) else None

private def allFilesUsingInMemoryFileIndex() = {
val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
val fileIndex = new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType))
fileIndex.allFiles()
}

private def allFilesUsingMetadataLogFileIndex() = {
// Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is guaranteed to be a
// non-glob path
new MetadataLogFileIndex(sparkSession, qualifiedBasePath).allFiles()
}

/**
* Returns a list of files found, sorted by their timestamp.
*/
private def fetchAllFiles(): Seq[(String, Long)] = {
val startTime = System.nanoTime
val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
val catalog = new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType))
val files = catalog.allFiles().sortBy(_.getModificationTime)(fileSortOrder).map { status =>

var allFiles: Seq[FileStatus] = null
sourceHasMetadata match {
case None =>
if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) {
sourceHasMetadata = Some(true)
allFiles = allFilesUsingMetadataLogFileIndex()
} else {
allFiles = allFilesUsingInMemoryFileIndex()
if (allFiles.isEmpty) {
// we still cannot decide
} else {
// decide what to use for future rounds
// double check whether source has metadata, preventing the extreme corner case that
// metadata log and data files are only generated after the previous
// `FileStreamSink.hasMetadata` check
if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) {
sourceHasMetadata = Some(true)
allFiles = allFilesUsingMetadataLogFileIndex()
} else {
sourceHasMetadata = Some(false)
// `allFiles` have already been fetched using InMemoryFileIndex in this round
}
}
}
case Some(true) => allFiles = allFilesUsingMetadataLogFileIndex()
case Some(false) => allFiles = allFilesUsingInMemoryFileIndex()
}

val files = allFiles.sortBy(_.getModificationTime)(fileSortOrder).map { status =>
(status.getPath.toUri.toString, status.getModificationTime)
}
val endTime = System.nanoTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,7 @@ abstract class FileStreamSourceTest
query.nonEmpty,
"Cannot add data when there is no query for finding the active file stream source")

val sources = query.get.logicalPlan.collect {
case StreamingExecutionRelation(source, _) if source.isInstanceOf[FileStreamSource] =>
source.asInstanceOf[FileStreamSource]
}
val sources = getSourcesFromStreamingQuery(query.get)
if (sources.isEmpty) {
throw new Exception(
"Could not find file source in the StreamExecution logical plan to add data to")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this common logic is extracted out

Expand Down Expand Up @@ -134,6 +131,14 @@ abstract class FileStreamSourceTest
}.head
}

protected def getSourcesFromStreamingQuery(query: StreamExecution): Seq[FileStreamSource] = {
query.logicalPlan.collect {
case StreamingExecutionRelation(source, _) if source.isInstanceOf[FileStreamSource] =>
source.asInstanceOf[FileStreamSource]
}
}


protected def withTempDirs(body: (File, File) => Unit) {
val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
Expand Down Expand Up @@ -388,9 +393,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
CheckAnswer("a", "b", "c", "d"),

AssertOnQuery("seen files should contain only one entry") { streamExecution =>
val source = streamExecution.logicalPlan.collect { case e: StreamingExecutionRelation =>
e.source.asInstanceOf[FileStreamSource]
}.head
val source = getSourcesFromStreamingQuery(streamExecution).head
assert(source.seenFiles.size == 1)
true
}
Expand Down Expand Up @@ -662,6 +665,101 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}

test("read data from outputs of another streaming query") {
withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
withTempDirs { case (outputDir, checkpointDir) =>
// q1 is a streaming query that reads from memory and writes to text files
val q1Source = MemoryStream[String]
val q1 =
q1Source
.toDF()
.writeStream
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.format("text")
.start(outputDir.getCanonicalPath)

// q2 is a streaming query that reads q1's text outputs
val q2 =
createFileStream("text", outputDir.getCanonicalPath).filter($"value" contains "keep")

def q1AddData(data: String*): StreamAction =
Execute { _ =>
q1Source.addData(data)
q1.processAllAvailable()
}
def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() }

testStream(q2)(
// batch 0
q1AddData("drop1", "keep2"),
q2ProcessAllAvailable(),
CheckAnswer("keep2"),

// batch 1
Assert {
// create a text file that won't be on q1's sink log
// thus even if its content contains "keep", it should NOT appear in q2's answer
val shouldNotKeep = new File(outputDir, "should_not_keep.txt")
stringToFile(shouldNotKeep, "should_not_keep!!!")
shouldNotKeep.exists()
},
q1AddData("keep3"),
q2ProcessAllAvailable(),
CheckAnswer("keep2", "keep3"),

// batch 2: check that things work well when the sink log gets compacted
q1AddData("keep4"),
Assert {
// compact interval is 3, so file "2.compact" should exist
new File(outputDir, s"${FileStreamSink.metadataDir}/2.compact").exists()
},
q2ProcessAllAvailable(),
CheckAnswer("keep2", "keep3", "keep4"),

Execute { _ => q1.stop() }
)
}
}
}

test("start before another streaming query, and read its output") {
withTempDirs { case (outputDir, checkpointDir) =>
// q1 is a streaming query that reads from memory and writes to text files
val q1Source = MemoryStream[String]
// define q1, but don't start it for now
val q1Write =
q1Source
.toDF()
.writeStream
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.format("text")
var q1: StreamingQuery = null

val q2 = createFileStream("text", outputDir.getCanonicalPath).filter($"value" contains "keep")

testStream(q2)(
AssertOnQuery { q2 =>
val fileSource = getSourcesFromStreamingQuery(q2).head
// q1 has not started yet, verify that q2 doesn't know whether q1 has metadata
fileSource.sourceHasMetadata === None
},
Execute { _ =>
q1 = q1Write.start(outputDir.getCanonicalPath)
q1Source.addData("drop1", "keep2")
q1.processAllAvailable()
},
AssertOnQuery { q2 =>
q2.processAllAvailable()
val fileSource = getSourcesFromStreamingQuery(q2).head
// q1 has started, verify that q2 knows q1 has metadata by now
fileSource.sourceHasMetadata === Some(true)
},
CheckAnswer("keep2"),
Execute { _ => q1.stop() }
)
}
}

test("when schema inference is turned on, should read partition data") {
def createFile(content: String, src: File, tmp: File): Unit = {
val tempFile = Utils.tempFileWith(new File(tmp, "text"))
Expand Down Expand Up @@ -755,10 +853,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
.streamingQuery
q.processAllAvailable()
val memorySink = q.sink.asInstanceOf[MemorySink]
val fileSource = q.logicalPlan.collect {
case StreamingExecutionRelation(source, _) if source.isInstanceOf[FileStreamSource] =>
source.asInstanceOf[FileStreamSource]
}.head
val fileSource = getSourcesFromStreamingQuery(q).head

/** Check the data read in the last batch */
def checkLastBatchData(data: Int*): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,12 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
}
}

/** Execute arbitrary code */
object Execute {
def apply(func: StreamExecution => Any): AssertOnQuery =
AssertOnQuery(query => { func(query); true })
}

class StreamManualClock(time: Long = 0L) extends ManualClock(time) with Serializable {
private var waitStartTime: Option[Long] = None

Expand Down Expand Up @@ -472,7 +478,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {

case a: AssertOnQuery =>
verify(currentStream != null || lastStream != null,
"cannot assert when not stream has been started")
"cannot assert when no stream has been started")
val streamToAssert = Option(currentStream).getOrElse(lastStream)
verify(a.condition(streamToAssert), s"Assert on query failed: ${a.message}")

Expand Down