Skip to content

Conversation

@guykhazma
Copy link
Contributor

What changes were proposed in this pull request?

Follow up on SPARK-30428 which added support for partition pruning in File source V2.
This PR implements the necessary changes in order to pass the dataFilters to the listFiles. This enables having FileIndex implementations which use the dataFilters for further pruning the file listing (see the discussion here).

Why are the changes needed?

Datasources such as csv and json do not implement the SupportsPushDownFilters trait. In order to support data skipping uniformly for all file based data sources, one can override the listFiles method in a FileIndex implementation, which consults external metadata and prunes the list of files.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Modifying the unit tests for v2 file sources to verify the dataFilters are passed

@gengliangwang
Copy link
Member

Jenkins, test this please.

Copy link
Member

@gengliangwang gengliangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guykhazma Thanks for working on it.
Two suggestions:

  1. Please create another JIRA. SPARK-30428 is for partition pruning.
  2. Please add more test cases

@guykhazma guykhazma changed the title [SPARK-30428][SQL][FOLLOWUP] File source V2: Push data filters for file listing [SPARK-30475][SQL] File source V2: Push data filters for file listing Jan 9, 2020
@guykhazma
Copy link
Contributor Author

guykhazma commented Jan 10, 2020

@gengliangwang as for the tests, I have added to the existing tests a check that the dataFilters are indeed passed to the FileScan.
In addition I have added a test which doesn't have partitionFilters so only the dataFilters should be not empty.
Since the current FileIndex (PartitioningAwareFileIndex) is not affected by the dataFilters there is no test that checks any pruning besides the pruning that is done by the partitionFilters.

@SparkQA
Copy link

SparkQA commented Jan 10, 2020

Test build #116427 has finished for PR 27157 at commit 1a65933.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@guykhazma
Copy link
Contributor Author

retest this please

@guykhazma
Copy link
Contributor Author

@gengliangwang I have fixed the tests and added also a test for Avro scan without partitionFilters

getPartitionKeyFiltersAndDataFilters(scan.sparkSession, v2Relation,
scan.readPartitionSchema, filters, output)
// The dataFilters are pushed down only once
if (partitionKeyFilters.nonEmpty || (dataFilters.nonEmpty && scan.dataFilters.isEmpty)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for the condition

(dataFilters.nonEmpty && scan.dataFilters.isEmpty)

Is that unlike the partitionFilters which are pushed down and don't need to be reevaluated (which will make the partitionKeyFilters.nonEmpty to be false in the next iteration) the dataFilters will remain non empty so scan.dataFilters.isEmpty is needed to make sure we don't get stack overflow.

@gengliangwang
Copy link
Member

@guykhazma sorry but I still have concerns about this PR.
Could you give an example of "data skipping uniformly for all file based data sources" in the comments #27112 (comment)

@gengliangwang
Copy link
Member

retest this please.

@SparkQA
Copy link

SparkQA commented Jan 13, 2020

Test build #116575 has finished for PR 27157 at commit 8ab97db.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@guykhazma
Copy link
Contributor Author

guykhazma commented Jan 13, 2020

@gengliangwang by "data skipping uniformly for all file based data sources" I mean that the above approach works uniformly for all formats whether they support pushdown or not.
(It has also benefits for formats which support pushdown such as parquet by avoiding the need to read the footer of each file). See for example this Spark Summit talk.

Note that in datasource v1 the dataFilters are also passed to the listFiles method in the FileSourceScanExec case class which is used by all of the file based datasources.

@guykhazma
Copy link
Contributor Author

@gengliangwang see also this PR which originally added the dataFilters to the list files method.

@guykhazma
Copy link
Contributor Author

@gengliangwang @cloud-fan can you please review this PR.

@gengliangwang
Copy link
Member

@guykhazma Sorry to reply late.
I was thinking about another approach, but I can't come up with a better one yet.

My major concern is that the filters are supposed to be pushed down in the FileScanBuilder. It is wired to push down again for in the FileScan. Technically, the partition filters should be pushed down in FileScanBuilder as well.
However, the current DSV2 API exposes the filters as Filter only instead of Expression. The coverage of Filter is limited. That's why I push the partition filters into FileScan in #27112.

Keeping the behavior in V2 is also important. I will merge this one. We can improve the approach in the future.

@guykhazma
Copy link
Contributor Author

guykhazma commented Jan 18, 2020

@gengliangwang thanks for reviewing.
I agree with your concern, this can be improved in subsequent PRs which will require a broader change in the V2 File based DataSources and v2 API. I'll be glad to help with that.

@gengliangwang
Copy link
Member

retest this please.

@SparkQA
Copy link

SparkQA commented Jan 21, 2020

Test build #117138 has finished for PR 27157 at commit d181e38.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member

Thanks, merging to master.

@guykhazma
Copy link
Contributor Author

@gengliangwang thanks for reviewing and merging!

cloud-fan pushed a commit that referenced this pull request Mar 23, 2021
### What changes were proposed in this pull request?

This bug was introduced by SPARK-30428 at Apache Spark 3.0.0.
This PR fixes `FileScan.equals()`.

### Why are the changes needed?
- Without this fix `FileScan.equals` doesn't take `fileIndex` and `readSchema` into account.
- Partition filters and data filters added to `FileScan` (in #27112 and #27157) caused that canonicalized form of some `BatchScanExec` nodes don't match and this prevents some reuse possibilities.

### Does this PR introduce _any_ user-facing change?
Yes, before this fix incorrect reuse of `FileScan` and so `BatchScanExec` could have happed causing correctness issues.

### How was this patch tested?
Added new UTs.

Closes #31848 from peter-toth/SPARK-34756-fix-filescan-equality-check.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Mar 23, 2021
### What changes were proposed in this pull request?

This bug was introduced by SPARK-30428 at Apache Spark 3.0.0.
This PR fixes `FileScan.equals()`.

### Why are the changes needed?
- Without this fix `FileScan.equals` doesn't take `fileIndex` and `readSchema` into account.
- Partition filters and data filters added to `FileScan` (in #27112 and #27157) caused that canonicalized form of some `BatchScanExec` nodes don't match and this prevents some reuse possibilities.

### Does this PR introduce _any_ user-facing change?
Yes, before this fix incorrect reuse of `FileScan` and so `BatchScanExec` could have happed causing correctness issues.

### How was this patch tested?
Added new UTs.

Closes #31848 from peter-toth/SPARK-34756-fix-filescan-equality-check.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 93a5d34)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Mar 23, 2021
This bug was introduced by SPARK-30428 at Apache Spark 3.0.0.
This PR fixes `FileScan.equals()`.

- Without this fix `FileScan.equals` doesn't take `fileIndex` and `readSchema` into account.
- Partition filters and data filters added to `FileScan` (in #27112 and #27157) caused that canonicalized form of some `BatchScanExec` nodes don't match and this prevents some reuse possibilities.

Yes, before this fix incorrect reuse of `FileScan` and so `BatchScanExec` could have happed causing correctness issues.

Added new UTs.

Closes #31848 from peter-toth/SPARK-34756-fix-filescan-equality-check.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 93a5d34)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
peter-toth added a commit to peter-toth/spark that referenced this pull request Mar 24, 2021
This bug was introduced by SPARK-30428 at Apache Spark 3.0.0.
This PR fixes `FileScan.equals()`.

- Without this fix `FileScan.equals` doesn't take `fileIndex` and `readSchema` into account.
- Partition filters and data filters added to `FileScan` (in apache#27112 and apache#27157) caused that canonicalized form of some `BatchScanExec` nodes don't match and this prevents some reuse possibilities.

Yes, before this fix incorrect reuse of `FileScan` and so `BatchScanExec` could have happed causing correctness issues.

Added new UTs.

Closes apache#31848 from peter-toth/SPARK-34756-fix-filescan-equality-check.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 93a5d34)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Mar 24, 2021
### What changes were proposed in this pull request?
This bug was introduced by SPARK-30428 at Apache Spark 3.0.0.
This PR fixes `FileScan.equals()`.

### Why are the changes needed?
- Without this fix `FileScan.equals` doesn't take `fileIndex` and `readSchema` into account.
- Partition filters and data filters added to `FileScan` (in #27112 and #27157) caused that canonicalized form of some `BatchScanExec` nodes don't match and this prevents some reuse possibilities.

### Does this PR introduce _any_ user-facing change?
Yes, before this fix incorrect reuse of `FileScan` and so `BatchScanExec` could have happed causing correctness issues.

### How was this patch tested?
Added new UTs.

Closes #31952 from peter-toth/SPARK-34756-fix-filescan-equality-check-3.0.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
flyrain pushed a commit to flyrain/spark that referenced this pull request Sep 21, 2021
### What changes were proposed in this pull request?

This bug was introduced by SPARK-30428 at Apache Spark 3.0.0.
This PR fixes `FileScan.equals()`.

### Why are the changes needed?
- Without this fix `FileScan.equals` doesn't take `fileIndex` and `readSchema` into account.
- Partition filters and data filters added to `FileScan` (in apache#27112 and apache#27157) caused that canonicalized form of some `BatchScanExec` nodes don't match and this prevents some reuse possibilities.

### Does this PR introduce _any_ user-facing change?
Yes, before this fix incorrect reuse of `FileScan` and so `BatchScanExec` could have happed causing correctness issues.

### How was this patch tested?
Added new UTs.

Closes apache#31848 from peter-toth/SPARK-34756-fix-filescan-equality-check.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 93a5d34)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
fishcus pushed a commit to fishcus/spark that referenced this pull request Jan 12, 2022
### What changes were proposed in this pull request?

This bug was introduced by SPARK-30428 at Apache Spark 3.0.0.
This PR fixes `FileScan.equals()`.

### Why are the changes needed?
- Without this fix `FileScan.equals` doesn't take `fileIndex` and `readSchema` into account.
- Partition filters and data filters added to `FileScan` (in apache#27112 and apache#27157) caused that canonicalized form of some `BatchScanExec` nodes don't match and this prevents some reuse possibilities.

### Does this PR introduce _any_ user-facing change?
Yes, before this fix incorrect reuse of `FileScan` and so `BatchScanExec` could have happed causing correctness issues.

### How was this patch tested?
Added new UTs.

Closes apache#31848 from peter-toth/SPARK-34756-fix-filescan-equality-check.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 93a5d34)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
leejaywei pushed a commit to Kyligence/spark that referenced this pull request Feb 3, 2023
### What changes were proposed in this pull request?
Follow up on [SPARK-30428](apache#27112) which added support for partition pruning in File source V2.
This PR implements the necessary changes in order to pass the `dataFilters` to the `listFiles`. This enables having `FileIndex` implementations which use the `dataFilters` for further pruning the file listing (see the discussion [here](apache#27112 (comment))).

### Why are the changes needed?
Datasources such as `csv` and `json` do not implement the `SupportsPushDownFilters` trait. In order to support data skipping uniformly for all file based data sources, one can override the `listFiles` method in a `FileIndex` implementation, which consults external metadata and prunes the list of files.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Modifying the unit tests for v2 file sources to verify the `dataFilters` are passed

Closes apache#27157 from guykhazma/PushdataFiltersInFileListing.

Authored-by: Guy Khazma <guykhag@gmail.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>

# Conflicts:
#	external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala
#	external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala
#	sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
leejaywei pushed a commit to Kyligence/spark that referenced this pull request May 15, 2023
### What changes were proposed in this pull request?
Follow up on [SPARK-30428](apache#27112) which added support for partition pruning in File source V2.
This PR implements the necessary changes in order to pass the `dataFilters` to the `listFiles`. This enables having `FileIndex` implementations which use the `dataFilters` for further pruning the file listing (see the discussion [here](apache#27112 (comment))).

### Why are the changes needed?
Datasources such as `csv` and `json` do not implement the `SupportsPushDownFilters` trait. In order to support data skipping uniformly for all file based data sources, one can override the `listFiles` method in a `FileIndex` implementation, which consults external metadata and prunes the list of files.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Modifying the unit tests for v2 file sources to verify the `dataFilters` are passed

Closes apache#27157 from guykhazma/PushdataFiltersInFileListing.

Authored-by: Guy Khazma <guykhag@gmail.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>

# Conflicts:
#	external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala
#	external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala
#	sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
leejaywei pushed a commit to Kyligence/spark that referenced this pull request May 16, 2023
### What changes were proposed in this pull request?
Follow up on [SPARK-30428](apache#27112) which added support for partition pruning in File source V2.
This PR implements the necessary changes in order to pass the `dataFilters` to the `listFiles`. This enables having `FileIndex` implementations which use the `dataFilters` for further pruning the file listing (see the discussion [here](apache#27112 (comment))).

### Why are the changes needed?
Datasources such as `csv` and `json` do not implement the `SupportsPushDownFilters` trait. In order to support data skipping uniformly for all file based data sources, one can override the `listFiles` method in a `FileIndex` implementation, which consults external metadata and prunes the list of files.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Modifying the unit tests for v2 file sources to verify the `dataFilters` are passed

Closes apache#27157 from guykhazma/PushdataFiltersInFileListing.

Authored-by: Guy Khazma <guykhag@gmail.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>

# Conflicts:
#	external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala
#	external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala
#	sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
leejaywei pushed a commit to Kyligence/spark that referenced this pull request Jun 16, 2023
### What changes were proposed in this pull request?
Follow up on [SPARK-30428](apache#27112) which added support for partition pruning in File source V2.
This PR implements the necessary changes in order to pass the `dataFilters` to the `listFiles`. This enables having `FileIndex` implementations which use the `dataFilters` for further pruning the file listing (see the discussion [here](apache#27112 (comment))).

### Why are the changes needed?
Datasources such as `csv` and `json` do not implement the `SupportsPushDownFilters` trait. In order to support data skipping uniformly for all file based data sources, one can override the `listFiles` method in a `FileIndex` implementation, which consults external metadata and prunes the list of files.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Modifying the unit tests for v2 file sources to verify the `dataFilters` are passed

Closes apache#27157 from guykhazma/PushdataFiltersInFileListing.

Authored-by: Guy Khazma <guykhag@gmail.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>

# Conflicts:
#	external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala
#	external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala
#	sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants