diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java index 494946fd1c0cb6..67c8768899445f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java @@ -183,7 +183,17 @@ public void replayRefreshTable(ExternalObjectLog log) { db.get().unregisterTable(log.getTableName()); db.get().resetMetaCacheNames(); } else { - refreshTableInternal(db.get(), table.get(), log.getLastUpdateTime()); + List partitionNames = log.getPartitionNames(); + if (partitionNames != null && !partitionNames.isEmpty()) { + // Partition-level cache invalidation + Env.getCurrentEnv().getExtMetaCacheMgr() + .invalidatePartitionsCache(table.get(), partitionNames); + LOG.info("replay refresh partitions for table {}, partitions count: {}", + table.get().getName(), partitionNames.size()); + } else { + // Full table cache invalidation + refreshTableInternal(db.get(), table.get(), log.getLastUpdateTime()); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java index 4e99ee14cf76a6..da86a3cef559de 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java @@ -85,6 +85,7 @@ public class SummaryProfile { public static final String GET_PARTITIONS_TIME = "Get Partitions Time"; public static final String GET_PARTITION_FILES_TIME = "Get Partition Files Time"; public static final String CREATE_SCAN_RANGE_TIME = "Create Scan Range Time"; + public static final String SINK_SET_PARTITION_VALUES_TIME = "Sink Set Partition Values Time"; public static final String PLAN_TIME = "Plan Time"; public static final String SCHEDULE_TIME = "Schedule Time"; public static final String ASSIGN_FRAGMENT_TIME = "Fragment Assign Time"; @@ -161,6 +162,7 @@ public class SummaryProfile { GET_SPLITS_TIME, GET_PARTITIONS_TIME, GET_PARTITION_FILES_TIME, + SINK_SET_PARTITION_VALUES_TIME, CREATE_SCAN_RANGE_TIME, NEREIDS_DISTRIBUTE_TIME, GET_META_VERSION_TIME, @@ -211,6 +213,7 @@ public class SummaryProfile { .put(NEREIDS_BE_FOLD_CONST_TIME, 2) .put(GET_PARTITIONS_TIME, 3) .put(GET_PARTITION_FILES_TIME, 3) + .put(SINK_SET_PARTITION_VALUES_TIME, 3) .put(CREATE_SCAN_RANGE_TIME, 2) .put(GET_PARTITION_VERSION_TIME, 1) .put(GET_PARTITION_VERSION_COUNT, 1) @@ -281,6 +284,10 @@ public class SummaryProfile { private long getPartitionsFinishTime = -1; @SerializedName(value = "getPartitionFilesFinishTime") private long getPartitionFilesFinishTime = -1; + @SerializedName(value = "sinkSetPartitionValuesStartTime") + private long sinkSetPartitionValuesStartTime = -1; + @SerializedName(value = "sinkSetPartitionValuesFinishTime") + private long sinkSetPartitionValuesFinishTime = -1; @SerializedName(value = "getSplitsFinishTime") private long getSplitsFinishTime = -1; @SerializedName(value = "createScanRangeFinishTime") @@ -477,6 +484,8 @@ private void updateExecutionSummaryProfile() { getPrettyTime(getPartitionsFinishTime, getSplitsStartTime, TUnit.TIME_MS)); executionSummaryProfile.addInfoString(GET_PARTITION_FILES_TIME, getPrettyTime(getPartitionFilesFinishTime, getPartitionsFinishTime, TUnit.TIME_MS)); + executionSummaryProfile.addInfoString(SINK_SET_PARTITION_VALUES_TIME, + getPrettyTime(sinkSetPartitionValuesFinishTime, sinkSetPartitionValuesStartTime, TUnit.TIME_MS)); executionSummaryProfile.addInfoString(CREATE_SCAN_RANGE_TIME, getPrettyTime(createScanRangeFinishTime, getSplitsFinishTime, TUnit.TIME_MS)); executionSummaryProfile.addInfoString(SCHEDULE_TIME, @@ -619,6 +628,14 @@ public void setGetPartitionsFinishTime() { this.getPartitionsFinishTime = TimeUtils.getStartTimeMs(); } + public void setSinkGetPartitionsStartTime() { + this.sinkSetPartitionValuesStartTime = TimeUtils.getStartTimeMs(); + } + + public void setSinkGetPartitionsFinishTime() { + this.sinkSetPartitionValuesFinishTime = TimeUtils.getStartTimeMs(); + } + public void setGetPartitionFilesFinishTime() { this.getPartitionFilesFinishTime = TimeUtils.getStartTimeMs(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java index cf7872f8b39c26..313f6fc403f63e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java @@ -80,6 +80,16 @@ public static ExternalObjectLog createForRefreshTable(long catalogId, String dbN return externalObjectLog; } + public static ExternalObjectLog createForRefreshPartitions(long catalogId, String dbName, String tblName, + List partitionNames) { + ExternalObjectLog externalObjectLog = new ExternalObjectLog(); + externalObjectLog.setCatalogId(catalogId); + externalObjectLog.setDbName(dbName); + externalObjectLog.setTableName(tblName); + externalObjectLog.setPartitionNames(partitionNames); + return externalObjectLog; + } + public static ExternalObjectLog createForRenameTable(long catalogId, String dbName, String tblName, String newTblName) { ExternalObjectLog externalObjectLog = new ExternalObjectLog(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java index 7141bdd98745fd..54ee9f46ec54bb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java @@ -47,6 +47,7 @@ import com.google.common.base.Joiner; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.base.Verify; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; @@ -268,30 +269,48 @@ public void finishInsertTable(NameMapping nameMapping) { insertExistsPartitions.add(Pair.of(pu, hivePartitionStatistics)); break; case NEW: + // Check if partition really exists in HMS (may be cache miss in Doris) + String partitionName = pu.getName(); + if (Strings.isNullOrEmpty(partitionName)) { + // This should not happen for partitioned tables + LOG.warn("Partition name is null/empty for NEW mode in partitioned table, skipping"); + break; + } + List partitionValues = HiveUtil.toPartitionValues(partitionName); + boolean existsInHMS = false; + try { + Partition hmsPartition = hiveOps.getClient().getPartition( + nameMapping.getRemoteDbName(), + nameMapping.getRemoteTblName(), + partitionValues); + existsInHMS = (hmsPartition != null); + } catch (Exception e) { + // Partition not found in HMS, treat as truly new + if (LOG.isDebugEnabled()) { + LOG.debug("Partition {} not found in HMS, will create it", pu.getName()); + } + } + + if (existsInHMS) { + // Partition exists in HMS but not in Doris cache + // Treat as APPEND instead of NEW to avoid creation error + LOG.info("Partition {} already exists in HMS (Doris cache miss), treating as APPEND", + pu.getName()); + insertExistsPartitions.add(Pair.of(pu, hivePartitionStatistics)); + } else { + // Truly new partition, create it + createAndAddPartition(nameMapping, table, partitionValues, writePath, + pu, hivePartitionStatistics, false); + } + break; case OVERWRITE: - StorageDescriptor sd = table.getSd(); - // For object storage (FILE_S3), use writePath to keep original scheme (oss://, cos://) - // For HDFS, use targetPath which is the final path after rename - String pathForHMS = this.fileType == TFileType.FILE_S3 - ? writePath - : pu.getLocation().getTargetPath(); - HivePartition hivePartition = new HivePartition( - nameMapping, - false, - sd.getInputFormat(), - pathForHMS, - HiveUtil.toPartitionValues(pu.getName()), - Maps.newHashMap(), - sd.getOutputFormat(), - sd.getSerdeInfo().getSerializationLib(), - sd.getCols() - ); - if (updateMode == TUpdateMode.OVERWRITE) { - dropPartition(nameMapping, hivePartition.getPartitionValues(), true); + String overwritePartitionName = pu.getName(); + if (Strings.isNullOrEmpty(overwritePartitionName)) { + LOG.warn("Partition name is null/empty for OVERWRITE mode in partitioned table, skipping"); + break; } - addPartition( - nameMapping, hivePartition, writePath, - pu.getName(), pu.getFileNames(), hivePartitionStatistics, pu); + createAndAddPartition(nameMapping, table, HiveUtil.toPartitionValues(overwritePartitionName), + writePath, pu, hivePartitionStatistics, true); break; default: throw new RuntimeException("Not support mode:[" + updateMode + "] in partitioned table"); @@ -377,6 +396,10 @@ public long getUpdateCnt() { return hivePartitionUpdates.stream().mapToLong(THivePartitionUpdate::getRowCount).sum(); } + public List getHivePartitionUpdates() { + return hivePartitionUpdates; + } + private void convertToInsertExistingPartitionAction( NameMapping nameMapping, List> partitions) { @@ -1029,6 +1052,37 @@ private void checkNoPartitionAction(NameMapping nameMapping) { } } + private void createAndAddPartition( + NameMapping nameMapping, + Table table, + List partitionValues, + String writePath, + THivePartitionUpdate pu, + HivePartitionStatistics hivePartitionStatistics, + boolean dropFirst) { + StorageDescriptor sd = table.getSd(); + String pathForHMS = this.fileType == TFileType.FILE_S3 + ? writePath + : pu.getLocation().getTargetPath(); + HivePartition hivePartition = new HivePartition( + nameMapping, + false, + sd.getInputFormat(), + pathForHMS, + partitionValues, + Maps.newHashMap(), + sd.getOutputFormat(), + sd.getSerdeInfo().getSerializationLib(), + sd.getCols() + ); + if (dropFirst) { + dropPartition(nameMapping, hivePartition.getPartitionValues(), true); + } + addPartition( + nameMapping, hivePartition, writePath, + pu.getName(), pu.getFileNames(), hivePartitionStatistics, pu); + } + public synchronized void addPartition( NameMapping nameMapping, HivePartition partition, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 4fbda7c19f1df1..82504111e34b5a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -60,6 +60,7 @@ import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.collect.Iterables; @@ -85,6 +86,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -564,6 +566,61 @@ public void invalidatePartitionCache(ExternalTable dorisTable, String partitionN } } + /** + * Selectively refreshes cache for affected partitions based on update information from BE. + * For APPEND/OVERWRITE: invalidate both partition cache and file cache using existing method. + * For NEW: add to partition values cache. + * + * @param table The Hive table whose partitions were modified + * @param partitionUpdates List of partition updates from BE + */ + public void refreshAffectedPartitions(HMSExternalTable table, + List partitionUpdates) { + if (partitionUpdates == null || partitionUpdates.isEmpty()) { + return; + } + + List modifiedPartitionNames = new ArrayList<>(); + List newPartitionNames = new ArrayList<>(); + + for (org.apache.doris.thrift.THivePartitionUpdate update : partitionUpdates) { + String partitionName = update.getName(); + // Skip if partition name is null/empty (non-partitioned table case) + if (Strings.isNullOrEmpty(partitionName)) { + continue; + } + + switch (update.getUpdateMode()) { + case APPEND: + case OVERWRITE: + modifiedPartitionNames.add(partitionName); + break; + case NEW: + newPartitionNames.add(partitionName); + break; + default: + LOG.warn("Unknown update mode {} for partition {}", + update.getUpdateMode(), partitionName); + break; + } + } + + // Invalidate cache for modified partitions (both partition cache and file cache) + for (String partitionName : modifiedPartitionNames) { + invalidatePartitionCache(table, partitionName); + } + + // Add new partitions to partition values cache + if (!newPartitionNames.isEmpty()) { + addPartitionsCache(table.getOrBuildNameMapping(), newPartitionNames, + table.getPartitionColumnTypes(Optional.empty())); + } + + // Log summary + LOG.info("Refreshed cache for table {}: {} modified partitions, {} new partitions", + table.getName(), modifiedPartitionNames.size(), newPartitionNames.size()); + } + public void invalidateDbCache(String dbName) { long start = System.currentTimeMillis(); Set keys = partitionValuesCache.asMap().keySet(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java index b1f2856b767700..1caef21851e8e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java @@ -19,6 +19,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.UserException; import org.apache.doris.common.profile.SummaryProfile; @@ -112,14 +113,26 @@ protected void onComplete() throws UserException { } summaryProfile.ifPresent(SummaryProfile::setTransactionEndTime); txnStatus = TransactionStatus.COMMITTED; - Env.getCurrentEnv().getRefreshManager().handleRefreshTable( - catalogName, - table.getDatabase().getFullName(), - table.getName(), - true); + + // Handle post-commit operations (e.g., cache refresh) + doAfterCommit(); } } + /** + * Called after transaction commit. + * Subclasses can override this to customize post-commit behavior. + * Default: full table refresh. + */ + protected void doAfterCommit() throws DdlException { + // Default: full table refresh + Env.getCurrentEnv().getRefreshManager().handleRefreshTable( + catalogName, + table.getDatabase().getFullName(), + table.getName(), + true); + } + @Override protected void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink) { try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java index 2a423b8a26b512..fed495dbb352c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java @@ -17,13 +17,19 @@ package org.apache.doris.nereids.trees.plans.commands.insert; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.datasource.ExternalObjectLog; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.HMSTransaction; +import org.apache.doris.datasource.hive.HiveMetaStoreCache; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.THivePartitionUpdate; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.TransactionType; @@ -31,6 +37,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; /** @@ -39,6 +47,8 @@ public class HiveInsertExecutor extends BaseExternalTableInsertExecutor { private static final Logger LOG = LogManager.getLogger(HiveInsertExecutor.class); + private List partitionUpdates; + /** * constructor */ @@ -65,6 +75,53 @@ protected void doBeforeCommit() throws UserException { HMSTransaction transaction = (HMSTransaction) transactionManager.getTransaction(txnId); loadedRows = transaction.getUpdateCnt(); transaction.finishInsertTable(((ExternalTable) table).getOrBuildNameMapping()); + + // Save partition updates for cache refresh after commit + partitionUpdates = transaction.getHivePartitionUpdates(); + } + + @Override + protected void doAfterCommit() throws DdlException { + HMSExternalTable hmsTable = (HMSExternalTable) table; + + // For partitioned tables, do selective partition refresh + // For non-partitioned tables, do full table cache invalidation + List affectedPartitionNames = null; + if (hmsTable.isPartitionedTable() && partitionUpdates != null && !partitionUpdates.isEmpty()) { + HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() + .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog()); + cache.refreshAffectedPartitions(hmsTable, partitionUpdates); + + // Collect partition names for edit log + affectedPartitionNames = new ArrayList<>(); + for (THivePartitionUpdate update : partitionUpdates) { + String partitionName = update.getName(); + if (partitionName != null && !partitionName.isEmpty()) { + affectedPartitionNames.add(partitionName); + } + } + } else { + // Non-partitioned table or no partition updates, do full table refresh + Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(hmsTable); + } + + // Write edit log to notify other FEs + ExternalObjectLog log; + if (affectedPartitionNames != null && !affectedPartitionNames.isEmpty()) { + // Partition-level refresh for other FEs + log = ExternalObjectLog.createForRefreshPartitions( + hmsTable.getCatalog().getId(), + table.getDatabase().getFullName(), + table.getName(), + affectedPartitionNames); + } else { + // Full table refresh for other FEs + log = ExternalObjectLog.createForRefreshTable( + hmsTable.getCatalog().getId(), + table.getDatabase().getFullName(), + table.getName()); + } + Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java index b9ad7ced616ebc..54dae1df135122 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java @@ -21,12 +21,16 @@ package org.apache.doris.planner; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.datasource.hive.HiveMetaStoreCache; import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper; +import org.apache.doris.datasource.hive.HivePartition; import org.apache.doris.datasource.hive.HiveProperties; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext; import org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext; import org.apache.doris.qe.ConnectContext; @@ -44,10 +48,12 @@ import org.apache.doris.thrift.THiveTableSink; import com.google.common.base.Strings; +import com.google.common.collect.Lists; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Optional; @@ -191,18 +197,40 @@ private void setCompressType(THiveTableSink tSink, TFileFormatType formatType) { } private void setPartitionValues(THiveTableSink tSink) throws AnalysisException { + if (ConnectContext.get().getExecutor() != null) { + ConnectContext.get().getExecutor().getSummaryProfile().setSinkGetPartitionsStartTime(); + } + List partitions = new ArrayList<>(); - List hivePartitions = - ((HMSExternalCatalog) targetTable.getCatalog()) - .getClient().listPartitions(targetTable.getRemoteDbName(), targetTable.getRemoteName()); - for (org.apache.hadoop.hive.metastore.api.Partition partition : hivePartitions) { + + List hivePartitions; + if (targetTable.isPartitionedTable()) { + // Get partitions from cache instead of HMS client (similar to HiveScanNode) + HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() + .getMetaStoreCache((HMSExternalCatalog) targetTable.getCatalog()); + HiveMetaStoreCache.HivePartitionValues partitionValues = + targetTable.getHivePartitionValues(MvccUtil.getSnapshotFromContext(targetTable)); + List> partitionValuesList = + new ArrayList<>(partitionValues.getPartitionValuesMap().values()); + hivePartitions = cache.getAllPartitionsWithCache(targetTable, partitionValuesList); + } else { + // Non-partitioned table, create dummy partition + hivePartitions = Lists.newArrayList(); + StorageDescriptor sd = targetTable.getRemoteTable().getSd(); + HivePartition dummyPartition = new HivePartition(targetTable.getOrBuildNameMapping(), true, + sd.getInputFormat(), sd.getLocation(), Lists.newArrayList(), + sd.getParameters() != null ? sd.getParameters() : new HashMap<>()); + hivePartitions.add(dummyPartition); + } + + // Convert HivePartition to THivePartition (same logic as before) + for (HivePartition partition : hivePartitions) { THivePartition hivePartition = new THivePartition(); - StorageDescriptor sd = partition.getSd(); - hivePartition.setFileFormat(getTFileFormatType(sd.getInputFormat())); + hivePartition.setFileFormat(getTFileFormatType(partition.getInputFormat())); + hivePartition.setValues(partition.getPartitionValues()); - hivePartition.setValues(partition.getValues()); THiveLocationParams locationParams = new THiveLocationParams(); - String location = sd.getLocation(); + String location = partition.getPath(); // pass the same of write path and target path to partition locationParams.setWritePath(location); locationParams.setTargetPath(location); @@ -210,7 +238,12 @@ private void setPartitionValues(THiveTableSink tSink) throws AnalysisException { hivePartition.setLocation(locationParams); partitions.add(hivePartition); } + tSink.setPartitions(partitions); + + if (ConnectContext.get().getExecutor() != null) { + ConnectContext.get().getExecutor().getSummaryProfile().setSinkGetPartitionsFinishTime(); + } } private void setSerDeProperties(THiveTableSink tSink) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java index c8bc82e67a14bd..8f4adff727cb7b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java @@ -28,6 +28,7 @@ import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.ThriftHMSCachedClient; import org.apache.doris.datasource.property.storage.StorageProperties; +import org.apache.doris.qe.ConnectContext; import mockit.Mock; import mockit.MockUp; @@ -48,10 +49,13 @@ import java.util.function.Function; import java.util.stream.Collectors; + public class HiveTableSinkTest { @Test public void testBindDataSink() throws UserException { + ConnectContext ctx = new ConnectContext(); + ctx.setThreadLocalInfo(); new MockUp() { @Mock @@ -123,6 +127,11 @@ public HMSCachedClient getClient() { private void mockDifferLocationTable(String location) { new MockUp() { + @Mock + public boolean isPartitionedTable() { + return false; + } + @Mock public Set getPartitionColumnNames() { return new HashSet() {{ diff --git a/regression-test/suites/external_table_p0/hive/test_hive_partitions.groovy b/regression-test/suites/external_table_p0/hive/test_hive_partitions.groovy index cc3425106a59a0..b8b3a39a2f9b25 100644 --- a/regression-test/suites/external_table_p0/hive/test_hive_partitions.groovy +++ b/regression-test/suites/external_table_p0/hive/test_hive_partitions.groovy @@ -77,6 +77,7 @@ suite("test_hive_partitions", "p0,external,hive,external_docker,external_docker_ } for (String hivePrefix : ["hive2", "hive3"]) { + setHivePrefix(hivePrefix) try { String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") String catalog_name = "${hivePrefix}_test_partitions" @@ -91,6 +92,102 @@ suite("test_hive_partitions", "p0,external,hive,external_docker,external_docker_ q01() + // Test cache miss scenario: Hive adds partition, then Doris writes to it + def test_cache_miss = { + def dbName = "test_cache_miss_db" + def tblName = "test_cache_miss_table" + + try { + // Clean up + hive_docker """DROP TABLE IF EXISTS ${dbName}.${tblName}""" + hive_docker """DROP DATABASE IF EXISTS ${dbName}""" + + // Create database and partitioned table in Hive + hive_docker """CREATE DATABASE ${dbName}""" + hive_docker """ + CREATE TABLE ${dbName}.${tblName} ( + id INT, + name STRING + ) + PARTITIONED BY (pt INT) + STORED AS ORC + """ + + // Hive writes 3 partitions + hive_docker """ + INSERT INTO ${dbName}.${tblName} PARTITION(pt=1) + VALUES (1, 'hive_pt1') + """ + hive_docker """ + INSERT INTO ${dbName}.${tblName} PARTITION(pt=2) + VALUES (2, 'hive_pt2') + """ + hive_docker """ + INSERT INTO ${dbName}.${tblName} PARTITION(pt=3) + VALUES (3, 'hive_pt3') + """ + + sql """refresh catalog `${catalog_name}`""" + // Doris reads data to populate cache (only knows about 3 partitions) + def result1 = sql """SELECT COUNT(*) as cnt FROM `${catalog_name}`.`${dbName}`.`${tblName}`""" + assertEquals(3, result1[0][0]) + logger.info("Doris cache populated with 3 partitions") + + // Hive writes 4th partition (Doris cache doesn't know about it) + hive_docker """ + INSERT INTO ${dbName}.${tblName} PARTITION(pt=4) + VALUES (4, 'hive_pt4') + """ + logger.info("Hive added 4th partition (pt=4)") + + // Doris writes to the 4th partition + // This should trigger cache miss detection and treat as APPEND instead of NEW + sql """ + INSERT INTO `${catalog_name}`.`${dbName}`.`${tblName}` + VALUES (40, 'doris_pt4', 4) + """ + logger.info("Doris wrote to 4th partition (should handle cache miss)") + + // Verify: should have 5 rows total (3 from hive + 1 from hive pt4 + 1 from doris pt4) + def result2 = sql """SELECT COUNT(*) as cnt FROM `${catalog_name}`.`${dbName}`.`${tblName}`""" + assertEquals(5, result2[0][0]) + + // Verify partition 4 has 2 rows + def result3 = sql """ + SELECT COUNT(*) as cnt + FROM `${catalog_name}`.`${dbName}`.`${tblName}` + WHERE pt = 4 + """ + assertEquals(2, result3[0][0]) + + // Verify data content + def result4 = sql """ + SELECT id, name + FROM `${catalog_name}`.`${dbName}`.`${tblName}` + WHERE pt = 4 + ORDER BY id + """ + assertEquals(2, result4.size()) + assertEquals(4, result4[0][0]) + assertEquals("hive_pt4", result4[0][1]) + assertEquals(40, result4[1][0]) + assertEquals("doris_pt4", result4[1][1]) + + logger.info("Cache miss test passed!") + + } finally { + // Clean up + try { + hive_docker """DROP TABLE IF EXISTS ${dbName}.${tblName}""" + hive_docker """DROP DATABASE IF EXISTS ${dbName}""" + } catch (Exception e) { + logger.warn("Cleanup failed: ${e.message}") + } + } + } + + test_cache_miss() + qt_string_partition_table_with_comma """ select * from partition_tables.string_partition_table_with_comma order by id; """