-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-29831][SQL] Scan Hive partitioned table should not dramatically increase data parallelism #26461
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #113557 has finished for PR 26461 at commit
|
| .booleanConf | ||
| .createWithDefault(true) | ||
|
|
||
| val HIVE_TABLE_SCAN_MAX_PARALLELISM = buildConf("spark.sql.hive.tableScan.maxParallelism") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this really useful? The parallelism should depend on data size, and it's a hard job to tune this config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When reading a Hive partitioned table, users could get an unreasonable number of partitions like dozens of thousands.
Hive Scan node returns a UnionRDD of Hive table partitions. Each Hive table partition is read as a HadoopRDD. For each Hive table partition, the parallelism depends on data size. But final UnionRDD sums up all number of parallelism of all Hive table partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to get the size of each hadoop RDD and do coalesce automatically?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can get split size and total number of splits of a Hadoop RDD.
| "RDD of reading such table is larger than this value, Spark will reduce the partition " + | ||
| "number by doing a coalesce on the RDD.") | ||
| .intConf | ||
| .createOptional |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a default value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I leave it optional is to allow us keep current behavior.
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya . This is another magic number depending on the data and cluster. Why not recommending our general Hint? Is it not enough?
SELECT /*+ COALESCE(numPartitions) */
SELECT /*+ REPARTITION(numPartitions) */
|
cc @dbtsai , too. |
| } | ||
| } | ||
|
|
||
| test("HiveTableScanExec should not increase data parallelism") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should not increase data parallelism -> should respect HIVE_TABLE_SCAN_MAX_PARALLELISM?
|
@dongjoon-hyun Thanks for review. As I mentioned in the description, although end-users can add coalesce by Dataset or SQL APIs, the reason to add this config is based on:
The current behavior of Hive scan is not friendly for end-users and cluster operator. We have better behavior on datasource table scan node, but in some user cases we still need read Hive table. |
|
The optimal value for each table is unknown, isn't it? This PR doesn't give any clue for the default value for this conf because of that.
|
Like I am also considering @cloud-fan's suggestion, to use data size to determine if adding coalesce or not.
Well, I think end-users usually do not know why there is a union and the job has big parallelism. This is implementation details under Hive Scan node. Users need dig into source code, or ask cluster operators, in order to know that. End-users know their data and queries, it does not mean they also know where the big parallelism comes from. Because they know data and queries, they are more confused because there is no point to have the big parallelism based on their data and queries. This is a config can be used by both end-users and cluster operators. Before this, cluster operators can not do anything. It is easier to set a config value, but it is hard to insert a hint into end-users queries.
This sounds good point. However, for tables and queries needed for tuning, you still can change config value or disable it and turn to hints. This config is a guardian for preventing unreasonable number of partitions seen when reading Hive partitioned table. |
|
Another point is, for datasource table scan node, the parallelism is controlled by configs But for Hive table scan node, we have nothing to control it. So currently, when reading datasource table, end-users do not worry parallelism. When reading Hive table, end-users need to add custom coalesce hints if big parallelism is seen. |
|
For those performance reason, Apache Spark already converts Hive table to data source tables, doesn't we? Do you need this for your non-convertible Hive tables? If then, can we have more specifically focused PR description?
|
Thanks. I will update the description. |
|
I agree with the problem mentioned by @viirya , but I'm not sure this config is the right cure. Users still need to know the big parallelism problem and set the config carefully. The file source config
I'm OK to have a config for hive table scan, but we should make it simple set. |
Because the default parallelism affects spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala Lines 86 to 95 in 053dd85
IIUC, Maybe we can also rely on |
SGTM |
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, to me I actually agree with Dongjoon's point. Why don't we just explicitly coalesce or hints for that? There are some alternatives like converting Hive table scan to Spark's scan as well.
coalesce does not necessarily make it faster. On the flip side, users might get surprised by coalesce popping up suddenly.
This sounds fine in general but IIRC there have been several tries to merge big Hive partitions if I am not wrong; however, it needed a pretty big change which I don't think is worthy. e.g. #10572 |
|
@HyukjinKwon Thanks for comment!
We encourage users to convert to datasource table, but there are inconvertible cases. We have configs for datasource table scan. But not for Hive table. It means we expect datasource scan has reasonable partition number, but not for Hive scan. For Hive table users, things gets troublesome as you need to add coalesce/hints for every query. I think that big parallelism gets more attentions from end-users, and causes more confused. Big number of partitions wastes time on task scheduling too.
I think this should not be a change as big as that one. |
|
For now we will take another approach to this issue. Hive scan has a few other questions like predicate pushdown, schema pruning. Although I think this is a real problem, I may not have time to follow up on this. Thus close it first. |
What changes were proposed in this pull request?
Hive table scan operator reads each Hive partition as a HadoopRDD and unions all RDDs. The data parallelism of the result RDD can be dramatically increased, when reading a lot of partitions with a lot of files.
This patch proposes to add a config to limit the maximum of the data parallelism for scanning Hive partitioned table, when we can not convert the Hive table to datasource table.
Why are the changes needed?
Although users can also do coalesce by themselves, this patch proposes to add a config to limit the maximum of the data parallelism. Because:
Although we convert Hive table scan to datasource table scan most of time, we still have some inconvertible tables. For datasource table scan node, the parallelism is controlled by configs
spark.default.parallelismandspark.sql.files.maxPartitionBytes.But for Hive table scan node, we have nothing to control it.
So currently, when reading datasource table, end-users do not worry parallelism. When reading Hive table, end-users need to add custom coalesce hints if big parallelism is seen.
Does this PR introduce any user-facing change?
No, if not set the config.
If set a maximum value by the config, when scanning Hive partitioned table, once the number of partitions exceeds the maximum, Spark coalesces the result RDD.
How was this patch tested?