-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[opt](hive) Speed up Hive insert on partition tables using cache #58166
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
TPC-H: Total hot run time: 34362 ms |
TPC-DS: Total hot run time: 188373 ms |
ClickBench: Total hot run time: 27.48 s |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR optimizes Hive INSERT performance on partitioned tables by leveraging cache instead of direct HMS queries and implementing selective cache refresh. The key problem addressed is that for tables with 10K+ partitions, INSERT operations are extremely slow due to expensive HMS RPC calls and full table cache invalidation.
Key changes:
- Modified partition metadata retrieval during INSERT to use cache (HiveTableSink.java) instead of direct HMS queries
- Implemented selective partition cache refresh after commit based on BE update info (NEW/APPEND/OVERWRITE)
- Added cache miss detection and graceful handling when BE marks partition as NEW but it already exists in HMS
Reviewed Changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| test_hive_partitions.groovy | Adds test case for cache miss scenario where Hive creates a partition that Doris then writes to |
| HiveTableSink.java | Changes partition retrieval from HMS client to cache-based approach with profiling |
| HiveInsertExecutor.java | Adds partition update tracking and selective cache refresh after commit |
| BaseExternalTableInsertExecutor.java | Introduces doAfterCommit() hook with default full table refresh behavior |
| HiveMetaStoreCache.java | Implements refreshAffectedPartitions() for selective partition cache invalidation |
| HMSTransaction.java | Adds HMS existence check for NEW partitions to handle cache misses gracefully |
| SummaryProfile.java | Adds profiling metrics for sink partition value setting operation |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
FE Regression Coverage ReportIncrement line coverage |
f37442a to
2908fe8
Compare
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
Outdated
Show resolved
Hide resolved
| boolean hasNewPartitions = false; | ||
|
|
||
| for (org.apache.doris.thrift.THivePartitionUpdate update : partitionUpdates) { | ||
| List<String> partitionValues = HiveUtil.toPartitionValues(update.getName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this line to case OVERWRITE:
|
|
||
| // Refresh partition values cache if new partitions were created | ||
| if (hasNewPartitions) { | ||
| invalidatePartitionValuesCache(nameMapping); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just modify the cache value instead of invalidate it all?
Otherwise, for insert with new partition, it always invalidate all cache
| LOG.info("Partition {} already exists in HMS (Doris cache miss), treating as APPEND", | ||
| pu.getName()); | ||
| insertExistsPartitions.add(Pair.of(pu, hivePartitionStatistics)); | ||
| } else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this else block is same as case OVERWRITE.
extract a method for it.
Take care of addPartition() and dropPartition()
| if (partitionUpdates != null && !partitionUpdates.isEmpty()) { | ||
| HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() | ||
| .getMetaStoreCache((HMSExternalCatalog) ((HMSExternalTable) table).getCatalog()); | ||
| cache.refreshAffectedPartitions((HMSExternalTable) table, partitionUpdates); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before, the Env.getCurrentEnv().getRefreshManager().handleRefreshTable() will write edit log, so that non-master FE will get the latest partition info.
| for (org.apache.hadoop.hive.metastore.api.Partition partition : hivePartitions) { | ||
|
|
||
| // Get partitions from cache instead of HMS client (similar to HiveScanNode) | ||
| HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this line to if (targetTable.isPartitionedTable())
| // Get all partition values from cache | ||
| List<Type> partitionColumnTypes = targetTable.getPartitionColumnTypes( | ||
| MvccUtil.getSnapshotFromContext(targetTable)); | ||
| HiveMetaStoreCache.HivePartitionValues partitionValues = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rebase and use targetTable.getHivePartitionValues() directly
2908fe8 to
6c6b4ca
Compare
|
run buildall |
TPC-H: Total hot run time: 34900 ms |
TPC-DS: Total hot run time: 187095 ms |
ClickBench: Total hot run time: 27.39 s |
FE UT Coverage ReportIncrement line coverage |
FE Regression Coverage ReportIncrement line coverage |
|
run buildall |
TPC-H: Total hot run time: 33961 ms |
TPC-DS: Total hot run time: 184561 ms |
ClickBench: Total hot run time: 27.74 s |
FE Regression Coverage ReportIncrement line coverage |
f988f83 to
992bca6
Compare
|
run buildall |
TPC-H: Total hot run time: 35248 ms |
TPC-DS: Total hot run time: 182146 ms |
ClickBench: Total hot run time: 27.2 s |
FE UT Coverage ReportIncrement line coverage |
FE Regression Coverage ReportIncrement line coverage |
) ### What problem does this PR solve? For Hive tables with massive partitions (10K+), INSERT operations are extremely slow because: - FE fetches all partition metadata from HMS directly (expensive RPC calls) - Full table cache invalidation after each insert (unnecessary) Problem Summary: 1. **Use cache for partition metadata in INSERT** - FE now fetches partition info from cache instead of directly querying HMS when preparing INSERT - Avoid expensive HMS RPC calls for every INSERT operation 2. **Selective cache refresh after commit** - Only invalidate affected partitions instead of full table cache - Based on partition update info from BE (NEW/APPEND/OVERWRITE) - Significantly reduces cache invalidation overhead 3. **Handle cache inconsistency gracefully** - When BE marks partition as NEW but it already exists in HMS (cache miss) - FE detects this by checking HMS and treats it as APPEND instead of failing - Prevents `AlreadyExistsException` errors For tables with partitions: - **Before**: HMS calls per INSERT + full cache invalidation - **After**: cache lookup + selective partition refresh - Expected speedup: 10x-100x for partition metadata fetching phas
…les (#58606) ### Problem Reproduction Steps: Create a Hive Catalog, create an unpartitioned table, then insert data. The following failure occurs. ``` copy file failed: software.amazon.awssdk.services.s3.model.NoSuchKeyException: The specified key does not exist. (Service: S3, Status Code: 404, ``` The BE mistakenly treats non-partitioned tables as partitioned ones. For partitioned tables, the system always appends a folder suffix for each partition, organizing data into partition directories. However, non-partitioned tables do not require partition information. In this case, the BE incorrectly added a partition folder suffix for non-partitioned tables, causing the insert operation to fail. ### Solution - Skip setting partition information for non-partitioned tables in the BE. - Maintain current behavior for partitioned tables, including folder suffix handling. ### Result - Inserts into non-partitioned object storage tables succeed. - Partitioned tables continue to work as expected. This issue was introduced in #58166
…#58748) ### What problem does this PR solve? Followup #58166 In #58166, the edit log need record "modified partitions" and "new partitions" separately, so that non-master FE can correctly update the partition cache. Otherwise, some new partitions can not be queried in non-master FE after inserting.
…che#58166) For Hive tables with massive partitions (10K+), INSERT operations are extremely slow because: - FE fetches all partition metadata from HMS directly (expensive RPC calls) - Full table cache invalidation after each insert (unnecessary) Problem Summary: 1. **Use cache for partition metadata in INSERT** - FE now fetches partition info from cache instead of directly querying HMS when preparing INSERT - Avoid expensive HMS RPC calls for every INSERT operation 2. **Selective cache refresh after commit** - Only invalidate affected partitions instead of full table cache - Based on partition update info from BE (NEW/APPEND/OVERWRITE) - Significantly reduces cache invalidation overhead 3. **Handle cache inconsistency gracefully** - When BE marks partition as NEW but it already exists in HMS (cache miss) - FE detects this by checking HMS and treats it as APPEND instead of failing - Prevents `AlreadyExistsException` errors For tables with partitions: - **Before**: HMS calls per INSERT + full cache invalidation - **After**: cache lookup + selective partition refresh - Expected speedup: 10x-100x for partition metadata fetching phas
…les (apache#58606) ### Problem Reproduction Steps: Create a Hive Catalog, create an unpartitioned table, then insert data. The following failure occurs. ``` copy file failed: software.amazon.awssdk.services.s3.model.NoSuchKeyException: The specified key does not exist. (Service: S3, Status Code: 404, ``` The BE mistakenly treats non-partitioned tables as partitioned ones. For partitioned tables, the system always appends a folder suffix for each partition, organizing data into partition directories. However, non-partitioned tables do not require partition information. In this case, the BE incorrectly added a partition folder suffix for non-partitioned tables, causing the insert operation to fail. ### Solution - Skip setting partition information for non-partitioned tables in the BE. - Maintain current behavior for partitioned tables, including folder suffix handling. ### Result - Inserts into non-partitioned object storage tables succeed. - Partitioned tables continue to work as expected. This issue was introduced in apache#58166
…apache#58748) ### What problem does this PR solve? Followup apache#58166 In apache#58166, the edit log need record "modified partitions" and "new partitions" separately, so that non-master FE can correctly update the partition cache. Otherwise, some new partitions can not be queried in non-master FE after inserting.
…che#58166) ### What problem does this PR solve? For Hive tables with massive partitions (10K+), INSERT operations are extremely slow because: - FE fetches all partition metadata from HMS directly (expensive RPC calls) - Full table cache invalidation after each insert (unnecessary) Problem Summary: 1. **Use cache for partition metadata in INSERT** - FE now fetches partition info from cache instead of directly querying HMS when preparing INSERT - Avoid expensive HMS RPC calls for every INSERT operation 2. **Selective cache refresh after commit** - Only invalidate affected partitions instead of full table cache - Based on partition update info from BE (NEW/APPEND/OVERWRITE) - Significantly reduces cache invalidation overhead 3. **Handle cache inconsistency gracefully** - When BE marks partition as NEW but it already exists in HMS (cache miss) - FE detects this by checking HMS and treats it as APPEND instead of failing - Prevents `AlreadyExistsException` errors For tables with partitions: - **Before**: HMS calls per INSERT + full cache invalidation - **After**: cache lookup + selective partition refresh - Expected speedup: 10x-100x for partition metadata fetching phas
…les (apache#58606) ### Problem Reproduction Steps: Create a Hive Catalog, create an unpartitioned table, then insert data. The following failure occurs. ``` copy file failed: software.amazon.awssdk.services.s3.model.NoSuchKeyException: The specified key does not exist. (Service: S3, Status Code: 404, ``` The BE mistakenly treats non-partitioned tables as partitioned ones. For partitioned tables, the system always appends a folder suffix for each partition, organizing data into partition directories. However, non-partitioned tables do not require partition information. In this case, the BE incorrectly added a partition folder suffix for non-partitioned tables, causing the insert operation to fail. ### Solution - Skip setting partition information for non-partitioned tables in the BE. - Maintain current behavior for partitioned tables, including folder suffix handling. ### Result - Inserts into non-partitioned object storage tables succeed. - Partitioned tables continue to work as expected. This issue was introduced in apache#58166
…apache#58748) ### What problem does this PR solve? Followup apache#58166 In apache#58166, the edit log need record "modified partitions" and "new partitions" separately, so that non-master FE can correctly update the partition cache. Otherwise, some new partitions can not be queried in non-master FE after inserting.
…les using cache apache#58166 apache#58606 apache#58748 (apache#58886)" This reverts commit a5ce97a.
What problem does this PR solve?
For Hive tables with massive partitions (10K+), INSERT operations are extremely slow because:
Problem Summary:
AlreadyExistsExceptionerrorsFor tables with partitions:
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)