diff --git a/docs/sql-data-sources-binaryFile.md b/docs/sql-data-sources-binaryFile.md index d861a24219be..0d41c9e441c6 100644 --- a/docs/sql-data-sources-binaryFile.md +++ b/docs/sql-data-sources-binaryFile.md @@ -28,21 +28,9 @@ It produces a DataFrame with the following columns and possibly partition column * `length`: LongType * `content`: BinaryType -It supports the following read option: - - - - - - - -
Property NameDefaultMeaning
pathGlobFilternone (accepts all) - An optional glob pattern to only include files with paths matching the pattern. - The syntax follows org.apache.hadoop.fs.GlobFilter. - It does not change the behavior of partition discovery. -
- To read whole binary files, you need to specify the data source `format` as `binaryFile`. +To load files with paths matching a given glob pattern while keeping the behavior of partition discovery, +you can use the general data source option `pathGlobFilter`. For example, the following code reads all PNG files from the input directory:
diff --git a/docs/sql-data-sources-load-save-functions.md b/docs/sql-data-sources-load-save-functions.md index a7efb9347ac6..07482137a28a 100644 --- a/docs/sql-data-sources-load-save-functions.md +++ b/docs/sql-data-sources-load-save-functions.md @@ -102,6 +102,27 @@ To load a CSV file you can use:
+To load files with paths matching a given glob pattern while keeping the behavior of partition discovery, +you can use: + +
+
+{% include_example load_with_path_glob_filter scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %} +
+ +
+{% include_example load_with_path_glob_filter java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %} +
+ +
+{% include_example load_with_path_glob_filter python/sql/datasource.py %} +
+ +
+{% include_example load_with_path_glob_filter r/RSparkSQLExample.R %} +
+
+ The extra options are also used during write operation. For example, you can control bloom filters and dictionary encodings for ORC data sources. The following ORC example will create bloom filter and use dictionary encoding only for `favorite_color`. diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java index cbe9dfdaa907..b2ce0bc08642 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java @@ -123,6 +123,11 @@ private static void runBasicDataSourceExample(SparkSession spark) { .option("header", "true") .load("examples/src/main/resources/people.csv"); // $example off:manual_load_options_csv$ + // $example on:load_with_path_glob_filter$ + Dataset partitionedUsersDF = spark.read().format("orc") + .option("pathGlobFilter", "*.orc") + .load("examples/src/main/resources/partitioned_users.orc"); + // $example off:load_with_path_glob_filter$ // $example on:manual_save_options_orc$ usersDF.write().format("orc") .option("orc.bloom.filter.columns", "favorite_color") diff --git a/examples/src/main/python/sql/datasource.py b/examples/src/main/python/sql/datasource.py index 04660724b308..0d78097ea975 100644 --- a/examples/src/main/python/sql/datasource.py +++ b/examples/src/main/python/sql/datasource.py @@ -57,6 +57,11 @@ def basic_datasource_example(spark): format="csv", sep=":", inferSchema="true", header="true") # $example off:manual_load_options_csv$ + # $example on:load_with_path_glob_filter$ + df = spark.read.load("examples/src/main/resources/partitioned_users.orc", + format="orc", pathGlobFilter="*.orc") + # $example off:load_with_path_glob_filter$ + # $example on:manual_save_options_orc$ df = spark.read.orc("examples/src/main/resources/users.orc") (df.write.format("orc") diff --git a/examples/src/main/r/RSparkSQLExample.R b/examples/src/main/r/RSparkSQLExample.R index 196a110f351c..fa083d5542fa 100644 --- a/examples/src/main/r/RSparkSQLExample.R +++ b/examples/src/main/r/RSparkSQLExample.R @@ -118,6 +118,10 @@ df <- read.df("examples/src/main/resources/people.csv", "csv", sep = ";", inferS namesAndAges <- select(df, "name", "age") # $example off:manual_load_options_csv$ +# $example on:load_with_path_glob_filter$ +df <- read.df("examples/src/main/resources/partitioned_users.orc", "orc", pathGlobFilter = "*.orc") +# $example off:load_with_path_glob_filter$ + # $example on:manual_save_options_orc$ df <- read.df("examples/src/main/resources/users.orc", "orc") write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = "favorite_color", orc.dictionary.key.threshold = 1.0, orc.column.encoding.direct = "name") diff --git a/examples/src/main/resources/partitioned_users.orc/do_not_read_this.txt b/examples/src/main/resources/partitioned_users.orc/do_not_read_this.txt new file mode 100644 index 000000000000..9c19f2a0449e --- /dev/null +++ b/examples/src/main/resources/partitioned_users.orc/do_not_read_this.txt @@ -0,0 +1 @@ +do not read this diff --git a/examples/src/main/resources/partitioned_users.orc/favorite_color=__HIVE_DEFAULT_PARTITION__/users.orc b/examples/src/main/resources/partitioned_users.orc/favorite_color=__HIVE_DEFAULT_PARTITION__/users.orc new file mode 100644 index 000000000000..890395a9281a Binary files /dev/null and b/examples/src/main/resources/partitioned_users.orc/favorite_color=__HIVE_DEFAULT_PARTITION__/users.orc differ diff --git a/examples/src/main/resources/partitioned_users.orc/favorite_color=red/users.orc b/examples/src/main/resources/partitioned_users.orc/favorite_color=red/users.orc new file mode 100644 index 000000000000..150615a6f3b2 Binary files /dev/null and b/examples/src/main/resources/partitioned_users.orc/favorite_color=red/users.orc differ diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala index 18615d9b9b90..c7b6a50f0ae7 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala @@ -56,6 +56,11 @@ object SQLDataSourceExample { .option("header", "true") .load("examples/src/main/resources/people.csv") // $example off:manual_load_options_csv$ + // $example on:load_with_path_glob_filter$ + val partitionedUsersDF = spark.read.format("orc") + .option("pathGlobFilter", "*.orc") + .load("examples/src/main/resources/partitioned_users.orc") + // $example off:load_with_path_glob_filter$ // $example on:manual_save_options_orc$ usersDF.write.format("orc") .option("orc.bloom.filter.columns", "favorite_color") diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index c2a7f3175943..be8223ccc9df 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -57,6 +57,10 @@ private[avro] class AvroFileFormat extends FileFormat options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { val conf = spark.sessionState.newHadoopConf() + if (options.contains("ignoreExtension")) { + logWarning(s"Option ${AvroOptions.ignoreExtensionKey} is deprecated. Please use the " + + "general data source option pathGlobFilter for filtering file names.") + } val parsedOptions = new AvroOptions(options, conf) // User can specify an optional avro json schema. diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index fec17bfff542..338244aa9e53 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -59,6 +59,7 @@ class AvroOptions( * If the option is not set, the Hadoop's config `avro.mapred.ignore.inputs.without.extension` * is taken into account. If the former one is not set too, file extensions are ignored. */ + @deprecated("Use the general data source option pathGlobFilter for filtering file names", "3.0") val ignoreExtension: Boolean = { val ignoreFilesWithoutExtensionByDefault = false val ignoreFilesWithoutExtension = conf.getBoolean( @@ -66,7 +67,7 @@ class AvroOptions( ignoreFilesWithoutExtensionByDefault) parameters - .get("ignoreExtension") + .get(AvroOptions.ignoreExtensionKey) .map(_.toBoolean) .getOrElse(!ignoreFilesWithoutExtension) } @@ -93,4 +94,6 @@ object AvroOptions { .getOrElse(new Configuration()) new AvroOptions(CaseInsensitiveMap(parameters), hadoopConf) } + + val ignoreExtensionKey = "ignoreExtension" } diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 1353a0f6e910..6413d88d1dcf 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -120,6 +120,9 @@ def option(self, key, value): * ``timeZone``: sets the string that indicates a timezone to be used to parse timestamps in the JSON/CSV datasources or partition values. If it isn't set, it uses the default value, session local timezone. + * ``pathGlobFilter``: an optional glob pattern to only include files with paths matching + the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. + It does not change the behavior of partition discovery. """ self._jreader = self._jreader.option(key, to_str(value)) return self @@ -132,6 +135,9 @@ def options(self, **options): * ``timeZone``: sets the string that indicates a timezone to be used to parse timestamps in the JSON/CSV datasources or partition values. If it isn't set, it uses the default value, session local timezone. + * ``pathGlobFilter``: an optional glob pattern to only include files with paths matching + the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. + It does not change the behavior of partition discovery. """ for k in options: self._jreader = self._jreader.option(k, to_str(options[k])) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index d15779bc4725..b100cd1acd36 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -341,6 +341,9 @@ def option(self, key, value): * ``timeZone``: sets the string that indicates a timezone to be used to parse timestamps in the JSON/CSV datasources or partition values. If it isn't set, it uses the default value, session local timezone. + * ``pathGlobFilter``: an optional glob pattern to only include files with paths matching + the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. + It does not change the behavior of partition discovery. .. note:: Evolving. @@ -357,6 +360,9 @@ def options(self, **options): * ``timeZone``: sets the string that indicates a timezone to be used to parse timestamps in the JSON/CSV datasources or partition values. If it isn't set, it uses the default value, session local timezone. + * ``pathGlobFilter``: an optional glob pattern to only include files with paths matching + the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. + It does not change the behavior of partition discovery. .. note:: Evolving. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 8460c7902e7d..dfc6d8ce96a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -98,6 +98,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * * * @since 1.4.0 @@ -135,6 +138,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * * * @since 1.4.0 @@ -151,6 +157,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * * * @since 1.4.0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 52f30acd4822..ef430f408b54 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -348,7 +348,8 @@ case class DataSource( sparkSession.sessionState.newHadoopConf(), sparkSession.sessionState.conf) => val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head) - val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath, userSpecifiedSchema) + val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath, + caseInsensitiveOptions, userSpecifiedSchema) val dataSchema = userSpecifiedSchema.orElse { format.inferSchema( sparkSession, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 29b304a1e487..3c932555179f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -56,6 +56,12 @@ abstract class PartitioningAwareFileIndex( protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] + protected lazy val pathGlobFilter = parameters.get("pathGlobFilter").map(new GlobFilter(_)) + + protected def matchGlobPattern(file: FileStatus): Boolean = { + pathGlobFilter.forall(_.accept(file.getPath)) + } + override def listFiles( partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { def isNonEmptyFile(f: FileStatus): Boolean = { @@ -69,7 +75,7 @@ abstract class PartitioningAwareFileIndex( val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match { case Some(existingDir) => // Directory has children files in it, return them - existingDir.filter(isNonEmptyFile) + existingDir.filter(f => matchGlobPattern(f) && isNonEmptyFile(f)) case None => // Directory does not exist, or has no children files @@ -89,7 +95,7 @@ abstract class PartitioningAwareFileIndex( override def sizeInBytes: Long = allFiles().map(_.getLen).sum def allFiles(): Seq[FileStatus] = { - if (partitionSpec().partitionColumns.isEmpty) { + val files = if (partitionSpec().partitionColumns.isEmpty) { // For each of the root input paths, get the list of files inside them rootPaths.flatMap { path => // Make the path qualified (consistent with listLeafFiles and bulkListLeafFiles). @@ -118,6 +124,7 @@ abstract class PartitioningAwareFileIndex( } else { leafFiles.values.toSeq } + files.filter(matchGlobPattern) } protected def inferPartitioning(): PartitionSpec = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala index 263778455493..cdc7cd53c8b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala @@ -47,12 +47,10 @@ import org.apache.spark.util.SerializableConfiguration * {{{ * // Scala * val df = spark.read.format("binaryFile") - * .option("pathGlobFilter", "*.png") * .load("/path/to/fileDir") * * // Java * Dataset df = spark.read().format("binaryFile") - * .option("pathGlobFilter", "*.png") * .load("/path/to/fileDir"); * }}} */ @@ -98,44 +96,37 @@ class BinaryFileFormat extends FileFormat with DataSourceRegister { val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - val binaryFileSourceOptions = new BinaryFileSourceOptions(options) - val pathGlobPattern = binaryFileSourceOptions.pathGlobFilter val filterFuncs = filters.map(filter => createFilterFunction(filter)) val maxLength = sparkSession.conf.get(SOURCES_BINARY_FILE_MAX_LENGTH) file: PartitionedFile => { val path = new Path(file.filePath) - // TODO: Improve performance here: each file will recompile the glob pattern here. - if (pathGlobPattern.forall(new GlobFilter(_).accept(path))) { - val fs = path.getFileSystem(broadcastedHadoopConf.value.value) - val status = fs.getFileStatus(path) - if (filterFuncs.forall(_.apply(status))) { - val writer = new UnsafeRowWriter(requiredSchema.length) - writer.resetRowWriter() - requiredSchema.fieldNames.zipWithIndex.foreach { - case (PATH, i) => writer.write(i, UTF8String.fromString(status.getPath.toString)) - case (LENGTH, i) => writer.write(i, status.getLen) - case (MODIFICATION_TIME, i) => - writer.write(i, DateTimeUtils.fromMillis(status.getModificationTime)) - case (CONTENT, i) => - if (status.getLen > maxLength) { - throw new SparkException( - s"The length of ${status.getPath} is ${status.getLen}, " + - s"which exceeds the max length allowed: ${maxLength}.") - } - val stream = fs.open(status.getPath) - try { - writer.write(i, ByteStreams.toByteArray(stream)) - } finally { - Closeables.close(stream, true) - } - case (other, _) => - throw new RuntimeException(s"Unsupported field name: ${other}") - } - Iterator.single(writer.getRow) - } else { - Iterator.empty + val fs = path.getFileSystem(broadcastedHadoopConf.value.value) + val status = fs.getFileStatus(path) + if (filterFuncs.forall(_.apply(status))) { + val writer = new UnsafeRowWriter(requiredSchema.length) + writer.resetRowWriter() + requiredSchema.fieldNames.zipWithIndex.foreach { + case (PATH, i) => writer.write(i, UTF8String.fromString(status.getPath.toString)) + case (LENGTH, i) => writer.write(i, status.getLen) + case (MODIFICATION_TIME, i) => + writer.write(i, DateTimeUtils.fromMillis(status.getModificationTime)) + case (CONTENT, i) => + if (status.getLen > maxLength) { + throw new SparkException( + s"The length of ${status.getPath} is ${status.getLen}, " + + s"which exceeds the max length allowed: ${maxLength}.") + } + val stream = fs.open(status.getPath) + try { + writer.write(i, ByteStreams.toByteArray(stream)) + } finally { + Closeables.close(stream, true) + } + case (other, _) => + throw new RuntimeException(s"Unsupported field name: ${other}") } + Iterator.single(writer.getRow) } else { Iterator.empty } @@ -204,14 +195,3 @@ object BinaryFileFormat { } } -class BinaryFileSourceOptions( - @transient private val parameters: CaseInsensitiveMap[String]) extends Serializable { - - def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) - - /** - * An optional glob pattern to only include files with paths matching the pattern. - * The syntax follows [[org.apache.hadoop.fs.GlobFilter]]. - */ - val pathGlobFilter: Option[String] = parameters.get("pathGlobFilter") -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index cef814b5b6d2..67e26dc1a2db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, LogicalRelation} import org.apache.spark.sql.types.StructType @@ -195,7 +196,8 @@ class FileStreamSource( private def allFilesUsingMetadataLogFileIndex() = { // Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is guaranteed to be a // non-glob path - new MetadataLogFileIndex(sparkSession, qualifiedBasePath, None).allFiles() + new MetadataLogFileIndex(sparkSession, qualifiedBasePath, + CaseInsensitiveMap(options), None).allFiles() } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala index 80eed7b27721..6eaccfb6d934 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala @@ -36,8 +36,9 @@ import org.apache.spark.sql.types.StructType class MetadataLogFileIndex( sparkSession: SparkSession, path: Path, + parameters: Map[String, String], userSpecifiedSchema: Option[StructType]) - extends PartitioningAwareFileIndex(sparkSession, Map.empty, userSpecifiedSchema) { + extends PartitioningAwareFileIndex(sparkSession, parameters, userSpecifiedSchema) { private val metadataDirectory = { val metadataDir = new Path(path, FileStreamSink.metadataDir) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 01083a994e8a..bb536b6fee47 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -83,6 +83,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * * * @since 2.0.0 @@ -120,6 +123,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * * * @since 2.0.0 @@ -136,6 +142,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * * * @since 2.0.0 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index c6fdf41ca7d9..3ab5bf26157a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -539,6 +539,38 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo } } + test("Option pathGlobFilter: filter files correctly") { + withTempPath { path => + val dataDir = path.getCanonicalPath + Seq("foo").toDS().write.text(dataDir) + Seq("bar").toDS().write.mode("append").orc(dataDir) + val df = spark.read.option("pathGlobFilter", "*.txt").text(dataDir) + checkAnswer(df, Row("foo")) + + // Both glob pattern in option and path should be effective to filter files. + val df2 = spark.read.option("pathGlobFilter", "*.txt").text(dataDir + "/*.orc") + checkAnswer(df2, Seq.empty) + + val df3 = spark.read.option("pathGlobFilter", "*.txt").text(dataDir + "/*xt") + checkAnswer(df3, Row("foo")) + } + } + + test("Option pathGlobFilter: simple extension filtering should contains partition info") { + withTempPath { path => + val input = Seq(("foo", 1), ("oof", 2)).toDF("a", "b") + input.write.partitionBy("b").text(path.getCanonicalPath) + Seq("bar").toDS().write.mode("append").orc(path.getCanonicalPath + "/b=1") + + // If we use glob pattern in the path, the partition column won't be shown in the result. + val df = spark.read.text(path.getCanonicalPath + "/*/*.txt") + checkAnswer(df, input.select("a")) + + val df2 = spark.read.option("pathGlobFilter", "*.txt").text(path.getCanonicalPath) + checkAnswer(df2, input) + } + } + test("Return correct results when data columns overlap with partition columns") { Seq("parquet", "orc", "json").foreach { format => withTempPath { path => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 4b0bab173ae9..2b8d77386925 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -483,6 +483,25 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("Option pathGlobFilter") { + val testTableName = "FileStreamSourceTest" + withTable(testTableName) { + withTempPath { output => + Seq("foo").toDS().write.text(output.getCanonicalPath) + Seq("bar").toDS().write.mode("append").orc(output.getCanonicalPath) + val df = spark.readStream.option("pathGlobFilter", "*.txt") + .format("text").load(output.getCanonicalPath) + val query = df.writeStream.format("memory").queryName(testTableName).start() + try { + query.processAllAvailable() + checkDatasetUnorderly(spark.table(testTableName).as[String], "foo") + } finally { + query.stop() + } + } + } + } + test("read from textfile") { withTempDirs { case (src, tmp) => val textStream = spark.readStream.textFile(src.getCanonicalPath)