Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf}

import org.apache.spark.internal.Logging
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.execution.streaming.FileStreamSink
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
Expand All @@ -36,20 +37,28 @@ import org.apache.spark.util.SerializableConfiguration
* A [[FileIndex]] that generates the list of files to process by recursively listing all the
* files present in `paths`.
*
* @param rootPaths the list of root table paths to scan
* @param rootPathsSpecified the list of root table paths to scan (some of which might be
* filtered out later)
* @param parameters as set of options to control discovery
* @param partitionSchema an optional partition schema that will be use to provide types for the
* discovered partitions
*/
class InMemoryFileIndex(
sparkSession: SparkSession,
override val rootPaths: Seq[Path],
rootPathsSpecified: Seq[Path],
parameters: Map[String, String],
partitionSchema: Option[StructType],
fileStatusCache: FileStatusCache = NoopCache)
extends PartitioningAwareFileIndex(
sparkSession, parameters, partitionSchema, fileStatusCache) {

// Filter out streaming metadata dirs or files such as "/.../_spark_metadata" (the metadata dir)
// or "/.../_spark_metadata/0" (a file in the metadata dir). `rootPathsSpecified` might contain
// such streaming metadata dir or files, e.g. when after globbing "basePath/*" where "basePath"
// is the output of a streaming query.
override val rootPaths =
rootPathsSpecified.filterNot(FileStreamSink.ancestorIsMetadataDirectory(_, hadoopConf))

@volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
@volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
@volatile private var cachedPartitionSpec: PartitionSpec = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,26 @@ object FileStreamSink extends Logging {
case _ => false
}
}

/**
* Returns true if the path is the metadata dir or its ancestor is the metadata dir.
* E.g.:
* - ancestorIsMetadataDirectory(/.../_spark_metadata) => true
* - ancestorIsMetadataDirectory(/.../_spark_metadata/0) => true
* - ancestorIsMetadataDirectory(/a/b/c) => false
*/
def ancestorIsMetadataDirectory(path: Path, hadoopConf: Configuration): Boolean = {
val fs = path.getFileSystem(hadoopConf)
var currentPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
while (currentPath != null) {
if (currentPath.getName == FileStreamSink.metadataDir) {
return true
} else {
currentPath = currentPath.getParent
}
}
return false
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi

val fileCatalog = new InMemoryFileIndex(
sparkSession = spark,
rootPaths = Seq(new Path(tempDir)),
rootPathsSpecified = Seq(new Path(tempDir)),
parameters = Map.empty[String, String],
partitionSchema = None)
// This should not fail.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package org.apache.spark.sql.streaming

import java.util.Locale

import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{AnalysisException, DataFrame}
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.streaming.{MemoryStream, MetadataLogFileIndex}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -145,6 +147,43 @@ class FileStreamSinkSuite extends StreamTest {
}
}

test("partitioned writing and batch reading with 'basePath'") {
withTempDir { outputDir =>
withTempDir { checkpointDir =>
val outputPath = outputDir.getAbsolutePath
val inputData = MemoryStream[Int]
val ds = inputData.toDS()

var query: StreamingQuery = null

try {
query =
ds.map(i => (i, -i, i * 1000))
.toDF("id1", "id2", "value")
.writeStream
.partitionBy("id1", "id2")
.option("checkpointLocation", checkpointDir.getAbsolutePath)
.format("parquet")
.start(outputPath)

inputData.addData(1, 2, 3)
failAfter(streamingTimeout) {
query.processAllAvailable()
}

val readIn = spark.read.option("basePath", outputPath).parquet(s"$outputDir/*/*")
checkDatasetUnorderly(
readIn.as[(Int, Int, Int)],
(1000, 1, -1), (2000, 2, -2), (3000, 3, -3))
} finally {
if (query != null) {
query.stop()
}
}
}
}
}

// This tests whether FileStreamSink works with aggregations. Specifically, it tests
// whether the correct streaming QueryExecution (i.e. IncrementalExecution) is used to
// to execute the trigger for writing data to file sink. See SPARK-18440 for more details.
Expand Down Expand Up @@ -266,4 +305,22 @@ class FileStreamSinkSuite extends StreamTest {
}
}
}

test("FileStreamSink.ancestorIsMetadataDirectory()") {
val hadoopConf = spark.sparkContext.hadoopConfiguration
def assertAncestorIsMetadataDirectory(path: String): Unit =
assert(FileStreamSink.ancestorIsMetadataDirectory(new Path(path), hadoopConf))
def assertAncestorIsNotMetadataDirectory(path: String): Unit =
assert(!FileStreamSink.ancestorIsMetadataDirectory(new Path(path), hadoopConf))

assertAncestorIsMetadataDirectory(s"/${FileStreamSink.metadataDir}")
assertAncestorIsMetadataDirectory(s"/${FileStreamSink.metadataDir}/")
assertAncestorIsMetadataDirectory(s"/a/${FileStreamSink.metadataDir}")
assertAncestorIsMetadataDirectory(s"/a/${FileStreamSink.metadataDir}/")
assertAncestorIsMetadataDirectory(s"/a/b/${FileStreamSink.metadataDir}/c")
assertAncestorIsMetadataDirectory(s"/a/b/${FileStreamSink.metadataDir}/c/")

assertAncestorIsNotMetadataDirectory(s"/a/b/c")
assertAncestorIsNotMetadataDirectory(s"/a/b/c/${FileStreamSink.metadataDir}extra")
}
}