Skip to content

Conversation

@maropu
Copy link
Member

@maropu maropu commented Jul 4, 2016

What changes were proposed in this pull request?

This pr is to add an interface for filtering files in FileFormat not to pass invalid files into FileFormat#buildReader.

How was this patch tested?

Added tests to filter files in a driver and in parallel.

@SparkQA
Copy link

SparkQA commented Jul 4, 2016

Test build #61701 has finished for PR 14038 at commit 6770309.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Jul 4, 2016

@liancheng Could you review this after v2.0 released?

@maropu maropu closed this Jul 4, 2016
@maropu maropu reopened this Jul 4, 2016
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add either the data source options map or the Hadoop conf as an argument of this method?

For example, the Avro data source may filter out all input files whose file names don't end with ".avro" if Hadoop conf "avro.mapred.ignore.inputs.without.extension" is set to true. This is consistent with default behavior of AvroInputFormat.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, I'll fix now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the semantics of the return value of the method? Seems that it should never return a null filter since it defaults to an "accept all" filter. If this is true, it's unnecessary to use Option to wrap returned filters elsewhere in this PR.

Copy link
Member Author

@maropu maropu Jul 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, my bad. I'll re-check the whole code to remove Option.

@liancheng
Copy link
Contributor

Left some comments, the overall structure looks good. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be conciser:

(filter1 ++ filter2).reduceOption { (f1, f2) =>
  (path: Path) => f1.accept(path) && f2.accept(path)
}.getOrElse {
  (path: Path) => true
}

@SparkQA
Copy link

SparkQA commented Jul 5, 2016

Test build #61750 has finished for PR 14038 at commit f032a4e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Jul 5, 2016

@liancheng okay, re-check please.

Copy link
Contributor

@liancheng liancheng Jul 6, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just rename it as PathFilter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it probably makes more sense to move this class into fileSourceInterfaces.scala since it's part of the public interface.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, fixed.

@liancheng
Copy link
Contributor

cc @rxin

@maropu
Copy link
Member Author

maropu commented Jul 6, 2016

okay, updated.

@SparkQA
Copy link

SparkQA commented Jul 6, 2016

Test build #61846 has finished for PR 14038 at commit 5115d26.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class PathFilter extends Serializable

@maropu
Copy link
Member Author

maropu commented Jul 11, 2016

ping @rxin

@SparkQA
Copy link

SparkQA commented Aug 19, 2016

Test build #64049 has finished for PR 14038 at commit d53ad8e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class PathFilter extends Serializable

@SparkQA
Copy link

SparkQA commented Aug 19, 2016

Test build #64062 has finished for PR 14038 at commit 60f05ad.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 19, 2016

Test build #64075 has finished for PR 14038 at commit c3e046f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Aug 20, 2016

ping @rxin @liancheng

@steveloughran
Copy link
Contributor

Path filtering in Hadoop FS calls on anything other than filename is very suboptimal; in #14731 you can see where the filtering has been postoned until after the listing, when the full FileStatus entry list has been returned.

As filtering is the last operation in the various listFiles calls, there's no penalty to doing the filtering after the results come in. In FileSytem.globStatus() the filtering takes place after the glob match, but during the scan...a larger list will be built and returned, but that is all.

I think a new filter should be executed after these operations, taking the FileStatus object, this provides a superset of filtering possible within the Hadoop calls (timestamp, filetype, ...), with no performance penalty. It's more flexible than the simple accept(path), and will guarantee that nobody using the API will implement a suboptimal filter.

Consider also taking a predicate Filesystem => Boolean, rather than requiring callers to implement new classes. It can be fed straight into Iterator.filter().

I note you are making extensive use of listLeafFiles; that's a potentially inefficent implementation against object stores. Keep using it —I'll patch it to use FileSystem.listFiles(path, true) for in FS recursion and O(files/5000) listing against S3A in Hadoop 2.8; eventually Azure and swoft

@maropu
Copy link
Member Author

maropu commented Aug 22, 2016

@steveloughran Thank for the comment and good suggestion. Seems you'd better off opening a new JIRA ticket to discuss more there. btw, do you know how the recursion you pointed out makes big impacts on actual performance? Could you have any performance results for that?

@steveloughran
Copy link
Contributor

Oh, i don't want to take on any more work...I just think you should make the predicate passed in something that goes FileStatus => Boolean instead of String => Boolean, and doing the filtering after the results come back.

Regarding speedup, we've seen 20x in simple test trees, but don't have real data on how representative that is: HADOOP-13208

@maropu
Copy link
Member Author

maropu commented Aug 22, 2016

If my understanding is correct, PathFilter is not passed into FileSystem.listFiles in ListingFileCatalog#listLeafFiles inside. If even so, the performance degrades you pointed out occur?

@steveloughran
Copy link
Contributor

There's no performance problem from filtering just on names. It's when people try to filter on more complex things (file type, timestamp) they need to call getFileStatus(path) and that's the performance problem.

I've been through Spark looking at where anything like that is being done, and have a patch to fix it....i don't want to have to do the same thing again in future; something we can avoid by having a richer filter which passes the FileStatus generated in the listing process.

Now, you may think "why doesn't Hadoop's list/glob operations take a richer predicate?". That I can't answer, it's history is lost in the oldest bits of the code.

@maropu
Copy link
Member Author

maropu commented Aug 23, 2016

Understood though, it seems this is a difficult issue because I'm not 100% sure how rich we should need for the filter interface (at least timestamp and file type is not used for now when filtering files in listingFileCatalog#listLeafFiles) and FileStatus adds a hadoop dependency. What do u think? cc: @rxin @liancheng If reasonable, I'll change the interface to FileStatus=>Boolean.

@maropu
Copy link
Member Author

maropu commented Nov 18, 2016

@liancheng I'm not sure that the original motivation keeps alive in SPARK-16317 though, if I need to do something, please let me know. I made new code based on this pr (master...maropu:SPARK-16317-2) because I found the file listing class had been refactored recently. Thanks!

@steveloughran
Copy link
Contributor

@maropu if you create a PR for your work I'll comment on it

@rxin
Copy link
Contributor

rxin commented Nov 21, 2016

Do we have a strong need for this? Anything wrong with just filtering out all file names that start with _ and .?

@maropu
Copy link
Member Author

maropu commented Nov 21, 2016

yea, as for data files, it's okay to filter out '_' and '.'. But, the file pattens of metadata depend on file formats as suggested in https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala#L433

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd consider adding the full set of invalid files:

p1=2/file=3 -> 1
p1=2/.temp -> 1

@SparkQA
Copy link

SparkQA commented Jan 15, 2017

Test build #71384 has finished for PR 14038 at commit 4e3628b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class PathFilter extends Serializable
  • class MetadataLogFileIndex(sparkSession: SparkSession, path: Path, pathFilter: PathFilter)

@SparkQA
Copy link

SparkQA commented Jan 15, 2017

Test build #71388 has finished for PR 14038 at commit 85b0f61.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class PathFilter extends Serializable
  • class MetadataLogFileIndex(sparkSession: SparkSession, path: Path, pathFilter: PathFilter)

@SparkQA
Copy link

SparkQA commented Jan 15, 2017

Test build #71389 has started for PR 14038 at commit d08ff73.

@maropu
Copy link
Member Author

maropu commented Jan 15, 2017

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jan 15, 2017

Test build #71391 has finished for PR 14038 at commit d08ff73.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class PathFilter extends Serializable
  • class MetadataLogFileIndex(sparkSession: SparkSession, path: Path, pathFilter: PathFilter)

@maropu
Copy link
Member Author

maropu commented Jan 15, 2017

@liancheng Could you check this?

@SparkQA
Copy link

SparkQA commented Jan 24, 2017

Test build #71899 has finished for PR 14038 at commit 9284827.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class PathFilter extends Serializable
  • class MetadataLogFileIndex(sparkSession: SparkSession, path: Path, pathFilter: PathFilter)

@maropu
Copy link
Member Author

maropu commented Jan 24, 2017

@liancheng ping

2 similar comments
@maropu
Copy link
Member Author

maropu commented Feb 4, 2017

@liancheng ping

@maropu
Copy link
Member Author

maropu commented Feb 13, 2017

@liancheng ping

@SparkQA
Copy link

SparkQA commented Mar 21, 2017

Test build #74927 has finished for PR 14038 at commit c89a876.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class PathFilter extends Serializable
  • class MetadataLogFileIndex(sparkSession: SparkSession, path: Path, pathFilter: PathFilter)

@SparkQA
Copy link

SparkQA commented Mar 21, 2017

Test build #74964 has finished for PR 14038 at commit b405861.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class PathFilter extends Serializable
  • class MetadataLogFileIndex(sparkSession: SparkSession, path: Path, pathFilter: PathFilter)

@maropu
Copy link
Member Author

maropu commented Mar 24, 2017

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Mar 24, 2017

Test build #75132 has finished for PR 14038 at commit b405861.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class PathFilter extends Serializable
  • class MetadataLogFileIndex(sparkSession: SparkSession, path: Path, pathFilter: PathFilter)

// 2. everything that ends with `._COPYING_`, because this is a intermediate state of file. we
// should skip this file in case of double reading.
val name = path.getName
!((name.startsWith("_") && !name.contains("=")) || name.startsWith(".") ||
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like @rxin said, this sounds risky to me too.

@gatorsmile
Copy link
Member

Could we first close this PR? We can revisit it later?

@maropu
Copy link
Member Author

maropu commented May 23, 2017

@gatorsmile ok, I'll close this for now. Thanks!

@maropu maropu closed this May 23, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants