Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class ImageFileFormatSuite extends SparkFunSuite with MLlibTestSparkContext {

// Single column of images named "image"
private lazy val imagePath = "../data/mllib/images/partitioned"
private lazy val recursiveImagePath = "../data/mllib/images"

test("image datasource count test") {
val df1 = spark.read.format("image").load(imagePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ abstract class PartitioningAwareFileIndex(
pathGlobFilter.forall(_.accept(file.getPath))
}

protected lazy val recursiveFileLookup = {
parameters.getOrElse("recursiveFileLookup", "false").toBoolean
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we document the option in DataFrameReader?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ngone51 Could you submit a follow-up PR to document this? This affects all the built-in file sources. We need to update the documentation of both PySpark and Scala APIs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, there is a Jira about adding this documentation which you will want to reference: SPARK-29903

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nchammas Could you submit a PR to fix readwriter.py for supporting this new option?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do. I suppose we'll do that separately from adding the docs, which will get their own PR, correct?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guys, we should also update DataStreamReadaer and streaming.py.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll submit a PR to document it. @gatorsmile

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

override def listFiles(
partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
def isNonEmptyFile(f: FileStatus): Boolean = {
Expand All @@ -70,6 +74,10 @@ abstract class PartitioningAwareFileIndex(
val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
PartitionDirectory(InternalRow.empty, allFiles().filter(isNonEmptyFile)) :: Nil
} else {
if (recursiveFileLookup) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This branch seems not reachable. Should we simply use assert here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it is reachable I think.
See class PrunedInMemoryFileIndex which explicitly set partitionSpec.

throw new IllegalArgumentException(
"Datasource with partition do not allow recursive file loading.")
}
prunePartitions(partitionFilters, partitionSpec()).map {
case PartitionPath(values, path) =>
val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
Expand All @@ -95,7 +103,7 @@ abstract class PartitioningAwareFileIndex(
override def sizeInBytes: Long = allFiles().map(_.getLen).sum

def allFiles(): Seq[FileStatus] = {
val files = if (partitionSpec().partitionColumns.isEmpty) {
val files = if (partitionSpec().partitionColumns.isEmpty && !recursiveFileLookup) {
// For each of the root input paths, get the list of files inside them
rootPaths.flatMap { path =>
// Make the path qualified (consistent with listLeafFiles and bulkListLeafFiles).
Expand Down Expand Up @@ -128,23 +136,27 @@ abstract class PartitioningAwareFileIndex(
}

protected def inferPartitioning(): PartitionSpec = {
// We use leaf dirs containing data files to discover the schema.
val leafDirs = leafDirToChildrenFiles.filter { case (_, files) =>
files.exists(f => isDataPath(f.getPath))
}.keys.toSeq

val caseInsensitiveOptions = CaseInsensitiveMap(parameters)
val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION)
.getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone)

PartitioningUtils.parsePartitions(
leafDirs,
typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
basePaths = basePaths,
userSpecifiedSchema = userSpecifiedSchema,
caseSensitive = sparkSession.sqlContext.conf.caseSensitiveAnalysis,
validatePartitionColumns = sparkSession.sqlContext.conf.validatePartitionColumns,
timeZoneId = timeZoneId)
if (recursiveFileLookup) {
PartitionSpec.emptySpec
} else {
// We use leaf dirs containing data files to discover the schema.
val leafDirs = leafDirToChildrenFiles.filter { case (_, files) =>
files.exists(f => isDataPath(f.getPath))
}.keys.toSeq

val caseInsensitiveOptions = CaseInsensitiveMap(parameters)
val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION)
.getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone)

PartitioningUtils.parsePartitions(
leafDirs,
typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
basePaths = basePaths,
userSpecifiedSchema = userSpecifiedSchema,
caseSensitive = sparkSession.sqlContext.conf.caseSensitiveAnalysis,
validatePartitionColumns = sparkSession.sqlContext.conf.validatePartitionColumns,
timeZoneId = timeZoneId)
}
}

private def prunePartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql

import java.io.{File, FilenameFilter, FileNotFoundException}
import java.nio.file.{Files, StandardOpenOption}
import java.util.Locale

import scala.collection.mutable
Expand Down Expand Up @@ -572,6 +573,75 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
}
}

test("Option recursiveFileLookup: recursive loading correctly") {

val expectedFileList = mutable.ListBuffer[String]()

def createFile(dir: File, fileName: String, format: String): Unit = {
val path = new File(dir, s"${fileName}.${format}")
Files.write(
path.toPath,
s"content of ${path.toString}".getBytes,
StandardOpenOption.CREATE, StandardOpenOption.WRITE
)
val fsPath = new Path(path.getAbsoluteFile.toURI).toString
expectedFileList.append(fsPath)
}

def createDir(path: File, dirName: String, level: Int): Unit = {
val dir = new File(path, s"dir${dirName}-${level}")
dir.mkdir()
createFile(dir, s"file${level}", "bin")
createFile(dir, s"file${level}", "text")

if (level < 4) {
// create sub-dir
createDir(dir, "sub0", level + 1)
createDir(dir, "sub1", level + 1)
}
}

withTempPath { path =>
path.mkdir()
createDir(path, "root", 0)

val dataPath = new File(path, "dirroot-0").getAbsolutePath
val fileList = spark.read.format("binaryFile")
.option("recursiveFileLookup", true)
.load(dataPath)
.select("path").collect().map(_.getString(0))

assert(fileList.toSet === expectedFileList.toSet)

val fileList2 = spark.read.format("binaryFile")
.option("recursiveFileLookup", true)
.option("pathGlobFilter", "*.bin")
.load(dataPath)
.select("path").collect().map(_.getString(0))

assert(fileList2.toSet === expectedFileList.filter(_.endsWith(".bin")).toSet)
}
}

test("Option recursiveFileLookup: disable partition inferring") {
val dataPath = Thread.currentThread().getContextClassLoader
.getResource("test-data/text-partitioned").toString

val df = spark.read.format("binaryFile")
.option("recursiveFileLookup", true)
.load(dataPath)

assert(!df.columns.contains("year"), "Expect partition inferring disabled")
val fileList = df.select("path").collect().map(_.getString(0))

val expectedFileList = Array(
dataPath + "/year=2014/data.txt",
dataPath + "/year=2015/data.txt"
).map(path => new Path(path).toString)

assert(fileList.toSet === expectedFileList.toSet)
}

test("Return correct results when data columns overlap with partition columns") {
Seq("parquet", "orc", "json").foreach { format =>
withTempPath { path =>
Expand Down