-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-31364][SQL][TESTS] Benchmark Parquet Nested Field Predicate Pushdown #28319
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
ok to test |
|
Thank you for your first contribution, @JiJiTang . |
| * Results will be written to "benchmarks/ParquetNestedPredicatePushDownBenchmark-results.txt". | ||
| * }}} | ||
| */ | ||
| object ParquetNestedPredicatePushDownBenchmark extends SqlBasedBenchmark { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, you need to switch to JDK8 HOME in your environment and run the above command once more. That will generate ParquetNestedPredicatePushDownBenchmark-results.txt additionally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot @dongjoon-hyun. I will run the benchmark with JDK8 and commit the report.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun , jdk8 benchmark results pushed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
|
cc @dbtsai , @holdenk , @gatorsmile |
Add jdk8 benchmark result
|
ok to test |
|
Test build #121702 has finished for PR 28319 at commit
|
| name: String, filterFn: DataFrame => DataFrame): Unit = { | ||
| val loadDF = spark.read.parquet(inputPath) | ||
| benchmark.addCase(name) { | ||
| _ => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, move _ => to previous line
|
|
||
| private def addCase(benchmark: Benchmark, inputPath: String, | ||
| enableNestedPD: Boolean, | ||
| name: String, filterFn: DataFrame => DataFrame): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do 2 space indentation in our codebase.
private def addCase(
benchmark: Benchmark, inputPath: String,
enableNestedPD: Boolean,
name: String, filterFn: DataFrame => DataFrame): Unit = {There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, you might call filterFn as withFilter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, 4 spaces indentation for method parameters :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
haha, my bad~ was sleepy... lol
| df.write.mode(SaveMode.Overwrite).parquet(tempDir.getCanonicalPath) | ||
| val benchmark = new Benchmark(name, N, NUMBER_OF_ITER, output = output) | ||
| addCase(benchmark, outputPath, enableNestedPD = false, | ||
| "NestedFieldsPredicatePushDownDisabled", filterFn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just call it, With nested predicate Pushdown and Without nested predicate pushdown for readability?
| */ | ||
| def runLoadNoRowGroupWhenPredicatePushedDown(): Unit = { | ||
| // no row group will be loaded when predicate pushed down | ||
| val filterFn: DataFrame => DataFrame = df => df.filter("nested.x < 0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, you might call filterFn as withFilter. Typically, the type is not required for non-public variable.
| val filterFn: DataFrame => DataFrame = { df => | ||
| df.filter("nested.x >= 0").filter(s"nested.x <= $N") | ||
| } | ||
| createAndRunBenchmark("LoadAllRowGroupsWhenPredicatePushedDown", filterFn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"All row groups can not be skipped"
| def runLoadSomeRowGroupWhenPredicatePushedDown(): Unit = { | ||
| // only a row group will be loaded when predicate pushed down | ||
| val filterFn: DataFrame => DataFrame = df => df.filter("nested.x = 100") | ||
| createAndRunBenchmark("LoadSomeRowGroupsWhenPredicatePushedDown", filterFn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Some row groups can be skipped with nested predicate pushdown"
| def runLoadNoRowGroupWhenPredicatePushedDown(): Unit = { | ||
| // no row group will be loaded when predicate pushed down | ||
| val filterFn: DataFrame => DataFrame = df => df.filter("nested.x < 0") | ||
| createAndRunBenchmark("LoadNoRowGroupsWhenPredicatePushedDown", filterFn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"all row groups can be skipped with nested predicate pushdown"
dbtsai
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the benchmark.
Looks like that nested predicate pushdown doesn't add overhead in the worst case when no row group can be skipped. cc @HyukjinKwon @cloud-fan @gatorsmile and @rdblue
On this synthetic data, the performance improvement is very impressive; in some of our prod jobs, we see 10x gains.
LGTM on this PR except some minor comments.
|
cc @MaxGekk |
Add jdk8 benchmark result
|
Thanks a lot @dbtsai , @cloud-fan. I have pushed a commit to fix coding style and rename the benchmark names. BTW do we have a IDE formatter config somewhere so that we can apply the formatting before pushing the commits? |
|
|
||
| private def createAndRunBenchmark(name: String, withFilter: DataFrame => DataFrame): Unit = { | ||
| withTempPath { | ||
| tempDir => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this to previous line
| withTempPath { | ||
| tempDir => | ||
| val outputPath = tempDir.getCanonicalPath | ||
| df.write.mode(SaveMode.Overwrite).parquet(tempDir.getCanonicalPath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
df.write.mode(SaveMode.Overwrite).parquet(outputPath)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated @dbtsai . And also pushed scalafmt formatted source file.
|
@dbtsai , @cloud-fan , just found that there's |
Rename benchmark and apply scalafmt
| private val N = 100 * 1024 * 1024 | ||
| private val NUMBER_OF_ITER = 10 | ||
|
|
||
| override def getSparkSession: SparkSession = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you override it? What's the problem with default settings?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MaxGekk , thanks a lot, this was copied from FilterPushdownBenchmark.scala. Just realised the default setting also sets the master to local[1]...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
override method removed.
| } | ||
|
|
||
| private val df: DataFrame = spark | ||
| .range(1, N, 1, 4) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the reason for creating 4 partitions (and 4 files) if you have only 1 CPU?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @MaxGekk, 4 partitions are here to make sure we have multiple row groups created for the small benchmark parquet dataset (as I didn't change parquet row group block size). Multiple partitions and 1 CPU to simulate a production scenario that we get a lot of partitions across limited number of executors with limited number of cores, with nest predicate pushed down we can have big performance gain since we don't need to read all the row groups. In this benchmark, since the data set is small, if put multiple CPUs, partitions will be read in parallel when nest predicate pushdown disabled, in which case we will not be able see a clear performance gain in terms of job execution time.
|
LGTM. |
remove override of spark session creation
|
Merged into master / branch-3.0 Thanks! |
…shdown ### What changes were proposed in this pull request? This PR aims to add a benchmark suite for nested predicate pushdown with parquet file: Performance comparison: Nested predicate pushdown disabled vs enabled, with the following queries scenarios: 1. When predicate pushed down, parquet reader are able to filter out all the row groups without loading them. 2. When predicate pushed down, parquet reader only loads one of the row groups. 3. When predicate pushed down, parquet reader can't filter out any row group in order to see if we introduce too much overhead or not when enabling nested predicate push down. ### Why are the changes needed? No benchmark exists today for nested fields predicate pushdown performance evaluation. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Benchmark runs and reporting result. Closes #28319 from JiJiTang/SPARK-31364. Authored-by: Jian Tang <jian_tang@apple.com> Signed-off-by: DB Tsai <d_tsai@apple.com> (cherry picked from commit 6a57616) Signed-off-by: DB Tsai <d_tsai@apple.com>
|
@JiJiTang do you have a apache jira account so I can assign this ticket to you? |
|
Test build #121778 has finished for PR 28319 at commit
|
|
Test build #121780 has finished for PR 28319 at commit
|
|
Test build #121783 has finished for PR 28319 at commit
|
|
@dbtsai , thanks a lot. here you are my jira account : https://issues.apache.org/jira/secure/ViewProfile.jspa?name=jijitang |
|
Thanks, @JiJiTang . |
|
Thanks a lot @dongjoon-hyun |
What changes were proposed in this pull request?
This PR aims to add a benchmark suite for nested predicate pushdown with parquet file:
Performance comparison: Nested predicate pushdown disabled vs enabled, with the following queries scenarios:
When predicate pushed down, parquet reader are able to filter out all the row groups without loading them.
When predicate pushed down, parquet reader only loads one of the row groups.
When predicate pushed down, parquet reader can't filter out any row group in order to see if we introduce too much overhead or not when enabling nested predicate push down.
Why are the changes needed?
No benchmark exists today for nested fields predicate pushdown performance evaluation.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Benchmark runs and reporting result.