diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java index 67c8768899445f..9738ee39355ada 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java @@ -26,7 +26,9 @@ import org.apache.doris.datasource.ExternalDatabase; import org.apache.doris.datasource.ExternalObjectLog; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.datasource.hive.HiveMetaStoreCache; import org.apache.doris.persist.OperationType; import com.google.common.base.Strings; @@ -183,13 +185,20 @@ public void replayRefreshTable(ExternalObjectLog log) { db.get().unregisterTable(log.getTableName()); db.get().resetMetaCacheNames(); } else { - List partitionNames = log.getPartitionNames(); - if (partitionNames != null && !partitionNames.isEmpty()) { - // Partition-level cache invalidation - Env.getCurrentEnv().getExtMetaCacheMgr() - .invalidatePartitionsCache(table.get(), partitionNames); - LOG.info("replay refresh partitions for table {}, partitions count: {}", - table.get().getName(), partitionNames.size()); + List modifiedPartNames = log.getPartitionNames(); + List newPartNames = log.getNewPartitionNames(); + if (catalog instanceof HMSExternalCatalog + && ((modifiedPartNames != null && !modifiedPartNames.isEmpty()) + || (newPartNames != null && !newPartNames.isEmpty()))) { + // Partition-level cache invalidation, only for hive catalog + HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() + .getMetaStoreCache((HMSExternalCatalog) catalog); + cache.refreshAffectedPartitionsCache((HMSExternalTable) table.get(), modifiedPartNames, newPartNames); + LOG.info("replay refresh partitions for table {}, " + + "modified partitions count: {}, " + + "new partitions count: {}", + table.get().getName(), modifiedPartNames == null ? 0 : modifiedPartNames.size(), + newPartNames == null ? 0 : newPartNames.size()); } else { // Full table cache invalidation refreshTableInternal(db.get(), table.get(), log.getLastUpdateTime()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java index 313f6fc403f63e..43a0675d783c41 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java @@ -58,6 +58,9 @@ public class ExternalObjectLog implements Writable { @SerializedName(value = "partitionNames") private List partitionNames; + @SerializedName(value = "newPartitionNames") + private List newPartitionNames; + @SerializedName(value = "lastUpdateTime") private long lastUpdateTime; @@ -81,12 +84,13 @@ public static ExternalObjectLog createForRefreshTable(long catalogId, String dbN } public static ExternalObjectLog createForRefreshPartitions(long catalogId, String dbName, String tblName, - List partitionNames) { + List modifiedPartNames, List newPartNames) { ExternalObjectLog externalObjectLog = new ExternalObjectLog(); externalObjectLog.setCatalogId(catalogId); externalObjectLog.setDbName(dbName); externalObjectLog.setTableName(tblName); - externalObjectLog.setPartitionNames(partitionNames); + externalObjectLog.setPartitionNames(modifiedPartNames); + externalObjectLog.setNewPartitionNames(newPartNames); return externalObjectLog; } @@ -134,6 +138,12 @@ public String debugForRefreshTable() { } else { sb.append("tableId: " + tableId + "]"); } + if (partitionNames != null && !partitionNames.isEmpty()) { + sb.append(", partitionNames: " + partitionNames); + } + if (newPartitionNames != null && !newPartitionNames.isEmpty()) { + sb.append(", newPartitionNames: " + newPartitionNames); + } return sb.toString(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 82504111e34b5a..c6ceb2f4f2033e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -573,16 +573,16 @@ public void invalidatePartitionCache(ExternalTable dorisTable, String partitionN * * @param table The Hive table whose partitions were modified * @param partitionUpdates List of partition updates from BE + * @param modifiedPartNames Output list to collect names of modified partitions + * @param newPartNames Output list to collect names of new partitions */ public void refreshAffectedPartitions(HMSExternalTable table, - List partitionUpdates) { + List partitionUpdates, + List modifiedPartNames, List newPartNames) { if (partitionUpdates == null || partitionUpdates.isEmpty()) { return; } - List modifiedPartitionNames = new ArrayList<>(); - List newPartitionNames = new ArrayList<>(); - for (org.apache.doris.thrift.THivePartitionUpdate update : partitionUpdates) { String partitionName = update.getName(); // Skip if partition name is null/empty (non-partitioned table case) @@ -593,10 +593,10 @@ public void refreshAffectedPartitions(HMSExternalTable table, switch (update.getUpdateMode()) { case APPEND: case OVERWRITE: - modifiedPartitionNames.add(partitionName); + modifiedPartNames.add(partitionName); break; case NEW: - newPartitionNames.add(partitionName); + newPartNames.add(partitionName); break; default: LOG.warn("Unknown update mode {} for partition {}", @@ -605,20 +605,26 @@ public void refreshAffectedPartitions(HMSExternalTable table, } } + refreshAffectedPartitionsCache(table, modifiedPartNames, newPartNames); + } + + public void refreshAffectedPartitionsCache(HMSExternalTable table, + List modifiedPartNames, List newPartNames) { + // Invalidate cache for modified partitions (both partition cache and file cache) - for (String partitionName : modifiedPartitionNames) { + for (String partitionName : modifiedPartNames) { invalidatePartitionCache(table, partitionName); } // Add new partitions to partition values cache - if (!newPartitionNames.isEmpty()) { - addPartitionsCache(table.getOrBuildNameMapping(), newPartitionNames, + if (!newPartNames.isEmpty()) { + addPartitionsCache(table.getOrBuildNameMapping(), newPartNames, table.getPartitionColumnTypes(Optional.empty())); } // Log summary LOG.info("Refreshed cache for table {}: {} modified partitions, {} new partitions", - table.getName(), modifiedPartitionNames.size(), newPartitionNames.size()); + table.getName(), modifiedPartNames.size(), newPartNames.size()); } public void invalidateDbCache(String dbName) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java index fed495dbb352c2..9f8be52afec30d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java @@ -34,10 +34,10 @@ import org.apache.doris.transaction.TransactionType; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -86,20 +86,12 @@ protected void doAfterCommit() throws DdlException { // For partitioned tables, do selective partition refresh // For non-partitioned tables, do full table cache invalidation - List affectedPartitionNames = null; + List modifiedPartNames = Lists.newArrayList(); + List newPartNames = Lists.newArrayList(); if (hmsTable.isPartitionedTable() && partitionUpdates != null && !partitionUpdates.isEmpty()) { HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog()); - cache.refreshAffectedPartitions(hmsTable, partitionUpdates); - - // Collect partition names for edit log - affectedPartitionNames = new ArrayList<>(); - for (THivePartitionUpdate update : partitionUpdates) { - String partitionName = update.getName(); - if (partitionName != null && !partitionName.isEmpty()) { - affectedPartitionNames.add(partitionName); - } - } + cache.refreshAffectedPartitions(hmsTable, partitionUpdates, modifiedPartNames, newPartNames); } else { // Non-partitioned table or no partition updates, do full table refresh Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(hmsTable); @@ -107,13 +99,14 @@ protected void doAfterCommit() throws DdlException { // Write edit log to notify other FEs ExternalObjectLog log; - if (affectedPartitionNames != null && !affectedPartitionNames.isEmpty()) { + if (!modifiedPartNames.isEmpty() || !newPartNames.isEmpty()) { // Partition-level refresh for other FEs log = ExternalObjectLog.createForRefreshPartitions( hmsTable.getCatalog().getId(), table.getDatabase().getFullName(), table.getName(), - affectedPartitionNames); + modifiedPartNames, + newPartNames); } else { // Full table refresh for other FEs log = ExternalObjectLog.createForRefreshTable(