-
Notifications
You must be signed in to change notification settings - Fork 0
Add metrics and cost tests for partition pruning effectiveness #5
Add metrics and cost tests for partition pruning effectiveness #5
Conversation
partitions' files from a table file catalog
BasicFileCatalog (or vice-versa)
instead of once per partition
moving a protected method down and making it private
| .createOrReplaceTempView("jt_array") | ||
|
|
||
| setConf(HiveUtils.CONVERT_METASTORE_PARQUET, true) | ||
| assert(spark.sqlContext.getConf(HiveUtils.CONVERT_METASTORE_PARQUET.key) == "true") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain why you made this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should no longer be needed since the flag value is true by default. I changed it to an assert to validate this.
This lets us get rid of the setConf(..., false) in the afterAll(), which was causing the conf value to be leaked to other suites.
|
Please rebase your changes off of 765f93c. |
|
^ just did |
Sorry, I'm not seeing that. I still see 21 commits in this PR. |
|
Ah, I did a merge. You can "squash and merge" into the pr branch right? |
| val METRIC_PARTITIONS_FETCHED = metricRegistry.counter(MetricRegistry.name("partitionsFetched")) | ||
|
|
||
| /** | ||
| * Tracks the total number of files discovered off of S3 by ListingFileCatalog. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see how this is specific to S3.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
| } | ||
| } | ||
|
|
||
| test("late partition pruning reads only necessary partition data") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know what you mean by "late" here. Did you mean "lazy"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| // of doing plan cache validation based on the entire partition set. | ||
| HiveCatalogMetrics.reset() | ||
| spark.sql("select * from test where partCol1 = 999").count() | ||
| assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would expect this to be 5 because this table has 5 partitions. Why does the test expect 10?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first 5 are from resolving the table, and the latter 5 are from ListingFileCatalog. It is possible to optimize this to only have 5, but it didn't seem worth the cost since this is (1) legacy mode and (2) not a regression..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, maybe I can break it up into analysis and execution to make it more clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not easy, so just added a comment here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the clarification. I think that adding the comment is good enough.
* [SPARK-16980][SQL] Load only catalog table partition metadata required to answer a query * Add a new catalyst optimizer rule to SQL core for pruning unneeded partitions' files from a table file catalog * Include the type of file catalog in the FileSourceScanExec metadata * TODO: Consider renaming FileCatalog to better differentiate it from BasicFileCatalog (or vice-versa) * try out parquet case insensitive fallback * Refactor the FileSourceScanExec.metadata val to make it prettier * fix and add test for input files * rename test * Refactor `TableFileCatalog.listFiles` to call `listDataLeafFiles` once instead of once per partition * fix it * more test cases * also fix a bug with zero partitions selected * feature flag * add comments * extend and fix flakiness in test * Enhance `ParquetMetastoreSuite` with mixed-case partition columns * Tidy up a little by removing some unused imports, an unused method and moving a protected method down and making it private * Put partition count in `FileSourceScanExec.metadata` for partitioned tables * Fix some errors in my revision of `ParquetSourceSuite` * Thu Oct 13 17:18:14 PDT 2016 * more generic * Thu Oct 13 18:09:42 PDT 2016 * Thu Oct 13 18:09:55 PDT 2016 * Thu Oct 13 18:22:31 PDT 2016
## What changes were proposed in this pull request?
This PR aims to optimize GroupExpressions by removing repeating expressions. `RemoveRepetitionFromGroupExpressions` is added.
**Before**
```scala
scala> sql("select a+1 from values 1,2 T(a) group by a+1, 1+a, A+1, 1+A").explain()
== Physical Plan ==
WholeStageCodegen
: +- TungstenAggregate(key=[(a#0 + 1)#6,(1 + a#0)#7,(A#0 + 1)#8,(1 + A#0)#9], functions=[], output=[(a + 1)#5])
: +- INPUT
+- Exchange hashpartitioning((a#0 + 1)#6, (1 + a#0)#7, (A#0 + 1)#8, (1 + A#0)#9, 200), None
+- WholeStageCodegen
: +- TungstenAggregate(key=[(a#0 + 1) AS (a#0 + 1)#6,(1 + a#0) AS (1 + a#0)#7,(A#0 + 1) AS (A#0 + 1)#8,(1 + A#0) AS (1 + A#0)#9], functions=[], output=[(a#0 + 1)#6,(1 + a#0)#7,(A#0 + 1)#8,(1 + A#0)#9])
: +- INPUT
+- LocalTableScan [a#0], [[1],[2]]
```
**After**
```scala
scala> sql("select a+1 from values 1,2 T(a) group by a+1, 1+a, A+1, 1+A").explain()
== Physical Plan ==
WholeStageCodegen
: +- TungstenAggregate(key=[(a#0 + 1)#6], functions=[], output=[(a + 1)#5])
: +- INPUT
+- Exchange hashpartitioning((a#0 + 1)#6, 200), None
+- WholeStageCodegen
: +- TungstenAggregate(key=[(a#0 + 1) AS (a#0 + 1)#6], functions=[], output=[(a#0 + 1)#6])
: +- INPUT
+- LocalTableScan [a#0], [[1],[2]]
```
## How was this patch tested?
Pass the Jenkins tests (with a new testcase)
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes apache#12590 from dongjoon-hyun/SPARK-14830.
(cherry picked from commit 6e63201)
Signed-off-by: Michael Armbrust <michael@databricks.com>
## What changes were proposed in this pull request?
Implements Every, Some, Any aggregates in SQL. These new aggregate expressions are analyzed in normal way and rewritten to equivalent existing aggregate expressions in the optimizer.
Every(x) => Min(x) where x is boolean.
Some(x) => Max(x) where x is boolean.
Any is a synonym for Some.
SQL
```
explain extended select every(v) from test_agg group by k;
```
Plan :
```
== Parsed Logical Plan ==
'Aggregate ['k], [unresolvedalias('every('v), None)]
+- 'UnresolvedRelation `test_agg`
== Analyzed Logical Plan ==
every(v): boolean
Aggregate [k#0], [every(v#1) AS every(v)#5]
+- SubqueryAlias `test_agg`
+- Project [k#0, v#1]
+- SubqueryAlias `test_agg`
+- LocalRelation [k#0, v#1]
== Optimized Logical Plan ==
Aggregate [k#0], [min(v#1) AS every(v)#5]
+- LocalRelation [k#0, v#1]
== Physical Plan ==
*(2) HashAggregate(keys=[k#0], functions=[min(v#1)], output=[every(v)#5])
+- Exchange hashpartitioning(k#0, 200)
+- *(1) HashAggregate(keys=[k#0], functions=[partial_min(v#1)], output=[k#0, min#7])
+- LocalTableScan [k#0, v#1]
Time taken: 0.512 seconds, Fetched 1 row(s)
```
## How was this patch tested?
Added tests in SQLQueryTestSuite, DataframeAggregateSuite
Closes apache#22809 from dilipbiswal/SPARK-19851-specific-rewrite.
Authored-by: Dilip Biswal <dbiswal@us.ibm.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This adds tests that verify the expected metadata IO cost of executing queries with metastore pruning enabled.
Also, fixed a configuration leak in a suite. That was causing other suites to fail by flipping CONVERT_METASTORE_PARQUET's value.