diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java index 2802055a8873c3..37433ef5f6b561 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java @@ -26,9 +26,7 @@ import org.apache.doris.datasource.ExternalDatabase; import org.apache.doris.datasource.ExternalObjectLog; import org.apache.doris.datasource.ExternalTable; -import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalTable; -import org.apache.doris.datasource.hive.HiveMetaStoreCache; import org.apache.doris.persist.OperationType; import com.google.common.base.Strings; @@ -185,24 +183,7 @@ public void replayRefreshTable(ExternalObjectLog log) { db.get().unregisterTable(log.getTableName()); db.get().resetMetaCacheNames(); } else { - List modifiedPartNames = log.getPartitionNames(); - List newPartNames = log.getNewPartitionNames(); - if (catalog instanceof HMSExternalCatalog - && ((modifiedPartNames != null && !modifiedPartNames.isEmpty()) - || (newPartNames != null && !newPartNames.isEmpty()))) { - // Partition-level cache invalidation, only for hive catalog - HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() - .getMetaStoreCache((HMSExternalCatalog) catalog); - cache.refreshAffectedPartitionsCache((HMSExternalTable) table.get(), modifiedPartNames, newPartNames); - LOG.info("replay refresh partitions for table {}, " - + "modified partitions count: {}, " - + "new partitions count: {}", - table.get().getName(), modifiedPartNames == null ? 0 : modifiedPartNames.size(), - newPartNames == null ? 0 : newPartNames.size()); - } else { - // Full table cache invalidation - refreshTableInternal(db.get(), table.get(), log.getLastUpdateTime()); - } + refreshTableInternal(db.get(), table.get(), log.getLastUpdateTime()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java index 87516a19a776a6..1410f9618811e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java @@ -83,7 +83,6 @@ public class SummaryProfile { public static final String GET_PARTITIONS_TIME = "Get Partitions Time"; public static final String GET_PARTITION_FILES_TIME = "Get Partition Files Time"; public static final String CREATE_SCAN_RANGE_TIME = "Create Scan Range Time"; - public static final String SINK_SET_PARTITION_VALUES_TIME = "Sink Set Partition Values Time"; public static final String PLAN_TIME = "Plan Time"; public static final String SCHEDULE_TIME = "Schedule Time"; public static final String ASSIGN_FRAGMENT_TIME = "Fragment Assign Time"; @@ -159,7 +158,6 @@ public class SummaryProfile { GET_SPLITS_TIME, GET_PARTITIONS_TIME, GET_PARTITION_FILES_TIME, - SINK_SET_PARTITION_VALUES_TIME, CREATE_SCAN_RANGE_TIME, DISTRIBUTE_TIME, GET_META_VERSION_TIME, @@ -210,7 +208,6 @@ public class SummaryProfile { .put(NEREIDS_BE_FOLD_CONST_TIME, 2) .put(GET_PARTITIONS_TIME, 3) .put(GET_PARTITION_FILES_TIME, 3) - .put(SINK_SET_PARTITION_VALUES_TIME, 3) .put(CREATE_SCAN_RANGE_TIME, 2) .put(GET_PARTITION_VERSION_TIME, 1) .put(GET_PARTITION_VERSION_COUNT, 1) @@ -280,10 +277,6 @@ public class SummaryProfile { private long getPartitionsFinishTime = -1; @SerializedName(value = "getPartitionFilesFinishTime") private long getPartitionFilesFinishTime = -1; - @SerializedName(value = "sinkSetPartitionValuesStartTime") - private long sinkSetPartitionValuesStartTime = -1; - @SerializedName(value = "sinkSetPartitionValuesFinishTime") - private long sinkSetPartitionValuesFinishTime = -1; @SerializedName(value = "getSplitsFinishTime") private long getSplitsFinishTime = -1; @SerializedName(value = "createScanRangeFinishTime") @@ -470,8 +463,6 @@ private void updateExecutionSummaryProfile() { getPrettyTime(getPartitionsFinishTime, getSplitsStartTime, TUnit.TIME_MS)); executionSummaryProfile.addInfoString(GET_PARTITION_FILES_TIME, getPrettyTime(getPartitionFilesFinishTime, getPartitionsFinishTime, TUnit.TIME_MS)); - executionSummaryProfile.addInfoString(SINK_SET_PARTITION_VALUES_TIME, - getPrettyTime(sinkSetPartitionValuesFinishTime, sinkSetPartitionValuesStartTime, TUnit.TIME_MS)); executionSummaryProfile.addInfoString(CREATE_SCAN_RANGE_TIME, getPrettyTime(createScanRangeFinishTime, getSplitsFinishTime, TUnit.TIME_MS)); executionSummaryProfile.addInfoString(SCHEDULE_TIME, @@ -610,14 +601,6 @@ public void setGetPartitionsFinishTime() { this.getPartitionsFinishTime = TimeUtils.getStartTimeMs(); } - public void setSinkGetPartitionsStartTime() { - this.sinkSetPartitionValuesStartTime = TimeUtils.getStartTimeMs(); - } - - public void setSinkGetPartitionsFinishTime() { - this.sinkSetPartitionValuesFinishTime = TimeUtils.getStartTimeMs(); - } - public void setGetPartitionFilesFinishTime() { this.getPartitionFilesFinishTime = TimeUtils.getStartTimeMs(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java index 43a0675d783c41..cf7872f8b39c26 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java @@ -58,9 +58,6 @@ public class ExternalObjectLog implements Writable { @SerializedName(value = "partitionNames") private List partitionNames; - @SerializedName(value = "newPartitionNames") - private List newPartitionNames; - @SerializedName(value = "lastUpdateTime") private long lastUpdateTime; @@ -83,17 +80,6 @@ public static ExternalObjectLog createForRefreshTable(long catalogId, String dbN return externalObjectLog; } - public static ExternalObjectLog createForRefreshPartitions(long catalogId, String dbName, String tblName, - List modifiedPartNames, List newPartNames) { - ExternalObjectLog externalObjectLog = new ExternalObjectLog(); - externalObjectLog.setCatalogId(catalogId); - externalObjectLog.setDbName(dbName); - externalObjectLog.setTableName(tblName); - externalObjectLog.setPartitionNames(modifiedPartNames); - externalObjectLog.setNewPartitionNames(newPartNames); - return externalObjectLog; - } - public static ExternalObjectLog createForRenameTable(long catalogId, String dbName, String tblName, String newTblName) { ExternalObjectLog externalObjectLog = new ExternalObjectLog(); @@ -138,12 +124,6 @@ public String debugForRefreshTable() { } else { sb.append("tableId: " + tableId + "]"); } - if (partitionNames != null && !partitionNames.isEmpty()) { - sb.append(", partitionNames: " + partitionNames); - } - if (newPartitionNames != null && !newPartitionNames.isEmpty()) { - sb.append(", newPartitionNames: " + newPartitionNames); - } return sb.toString(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java index 182e214b00ca34..2864f146f03c07 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java @@ -47,7 +47,6 @@ import com.google.common.base.Joiner; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import com.google.common.base.Verify; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; @@ -269,48 +268,30 @@ public void finishInsertTable(NameMapping nameMapping) { insertExistsPartitions.add(Pair.of(pu, hivePartitionStatistics)); break; case NEW: - // Check if partition really exists in HMS (may be cache miss in Doris) - String partitionName = pu.getName(); - if (Strings.isNullOrEmpty(partitionName)) { - // This should not happen for partitioned tables - LOG.warn("Partition name is null/empty for NEW mode in partitioned table, skipping"); - break; - } - List partitionValues = HiveUtil.toPartitionValues(partitionName); - boolean existsInHMS = false; - try { - Partition hmsPartition = hiveOps.getClient().getPartition( - nameMapping.getRemoteDbName(), - nameMapping.getRemoteTblName(), - partitionValues); - existsInHMS = (hmsPartition != null); - } catch (Exception e) { - // Partition not found in HMS, treat as truly new - if (LOG.isDebugEnabled()) { - LOG.debug("Partition {} not found in HMS, will create it", pu.getName()); - } - } - - if (existsInHMS) { - // Partition exists in HMS but not in Doris cache - // Treat as APPEND instead of NEW to avoid creation error - LOG.info("Partition {} already exists in HMS (Doris cache miss), treating as APPEND", - pu.getName()); - insertExistsPartitions.add(Pair.of(pu, hivePartitionStatistics)); - } else { - // Truly new partition, create it - createAndAddPartition(nameMapping, table, partitionValues, writePath, - pu, hivePartitionStatistics, false); - } - break; case OVERWRITE: - String overwritePartitionName = pu.getName(); - if (Strings.isNullOrEmpty(overwritePartitionName)) { - LOG.warn("Partition name is null/empty for OVERWRITE mode in partitioned table, skipping"); - break; + StorageDescriptor sd = table.getSd(); + // For object storage (FILE_S3), use writePath to keep original scheme (oss://, cos://) + // For HDFS, use targetPath which is the final path after rename + String pathForHMS = this.fileType == TFileType.FILE_S3 + ? writePath + : pu.getLocation().getTargetPath(); + HivePartition hivePartition = new HivePartition( + nameMapping, + false, + sd.getInputFormat(), + pathForHMS, + HiveUtil.toPartitionValues(pu.getName()), + Maps.newHashMap(), + sd.getOutputFormat(), + sd.getSerdeInfo().getSerializationLib(), + sd.getCols() + ); + if (updateMode == TUpdateMode.OVERWRITE) { + dropPartition(nameMapping, hivePartition.getPartitionValues(), true); } - createAndAddPartition(nameMapping, table, HiveUtil.toPartitionValues(overwritePartitionName), - writePath, pu, hivePartitionStatistics, true); + addPartition( + nameMapping, hivePartition, writePath, + pu.getName(), pu.getFileNames(), hivePartitionStatistics, pu); break; default: throw new RuntimeException("Not support mode:[" + updateMode + "] in partitioned table"); @@ -396,10 +377,6 @@ public long getUpdateCnt() { return hivePartitionUpdates.stream().mapToLong(THivePartitionUpdate::getRowCount).sum(); } - public List getHivePartitionUpdates() { - return hivePartitionUpdates; - } - private void convertToInsertExistingPartitionAction( NameMapping nameMapping, List> partitions) { @@ -1051,37 +1028,6 @@ private void checkNoPartitionAction(NameMapping nameMapping) { } } - private void createAndAddPartition( - NameMapping nameMapping, - Table table, - List partitionValues, - String writePath, - THivePartitionUpdate pu, - HivePartitionStatistics hivePartitionStatistics, - boolean dropFirst) { - StorageDescriptor sd = table.getSd(); - String pathForHMS = this.fileType == TFileType.FILE_S3 - ? writePath - : pu.getLocation().getTargetPath(); - HivePartition hivePartition = new HivePartition( - nameMapping, - false, - sd.getInputFormat(), - pathForHMS, - partitionValues, - Maps.newHashMap(), - sd.getOutputFormat(), - sd.getSerdeInfo().getSerializationLib(), - sd.getCols() - ); - if (dropFirst) { - dropPartition(nameMapping, hivePartition.getPartitionValues(), true); - } - addPartition( - nameMapping, hivePartition, writePath, - pu.getName(), pu.getFileNames(), hivePartitionStatistics, pu); - } - public synchronized void addPartition( NameMapping nameMapping, HivePartition partition, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index d1d6f6fc563e40..7180551f64a4f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -60,7 +60,6 @@ import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.collect.Iterables; @@ -562,67 +561,6 @@ public void invalidatePartitionCache(ExternalTable dorisTable, String partitionN } } - /** - * Selectively refreshes cache for affected partitions based on update information from BE. - * For APPEND/OVERWRITE: invalidate both partition cache and file cache using existing method. - * For NEW: add to partition values cache. - * - * @param table The Hive table whose partitions were modified - * @param partitionUpdates List of partition updates from BE - * @param modifiedPartNames Output list to collect names of modified partitions - * @param newPartNames Output list to collect names of new partitions - */ - public void refreshAffectedPartitions(HMSExternalTable table, - List partitionUpdates, - List modifiedPartNames, List newPartNames) { - if (partitionUpdates == null || partitionUpdates.isEmpty()) { - return; - } - - for (org.apache.doris.thrift.THivePartitionUpdate update : partitionUpdates) { - String partitionName = update.getName(); - // Skip if partition name is null/empty (non-partitioned table case) - if (Strings.isNullOrEmpty(partitionName)) { - continue; - } - - switch (update.getUpdateMode()) { - case APPEND: - case OVERWRITE: - modifiedPartNames.add(partitionName); - break; - case NEW: - newPartNames.add(partitionName); - break; - default: - LOG.warn("Unknown update mode {} for partition {}", - update.getUpdateMode(), partitionName); - break; - } - } - - refreshAffectedPartitionsCache(table, modifiedPartNames, newPartNames); - } - - public void refreshAffectedPartitionsCache(HMSExternalTable table, - List modifiedPartNames, List newPartNames) { - - // Invalidate cache for modified partitions (both partition cache and file cache) - for (String partitionName : modifiedPartNames) { - invalidatePartitionCache(table, partitionName); - } - - // Add new partitions to partition values cache - if (!newPartNames.isEmpty()) { - addPartitionsCache(table.getOrBuildNameMapping(), newPartNames, - table.getPartitionColumnTypes(Optional.empty())); - } - - // Log summary - LOG.info("Refreshed cache for table {}: {} modified partitions, {} new partitions", - table.getName(), modifiedPartNames.size(), newPartNames.size()); - } - public void invalidateDbCache(String dbName) { long start = System.currentTimeMillis(); Set keys = partitionValuesCache.asMap().keySet(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java index f82fefd89f213c..93693233e1168e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java @@ -18,7 +18,6 @@ package org.apache.doris.nereids.trees.plans.commands.insert; import org.apache.doris.catalog.Env; -import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.UserException; import org.apache.doris.common.profile.SummaryProfile; @@ -111,26 +110,14 @@ protected void onComplete() throws UserException { } summaryProfile.ifPresent(SummaryProfile::setTransactionEndTime); txnStatus = TransactionStatus.COMMITTED; - - // Handle post-commit operations (e.g., cache refresh) - doAfterCommit(); + Env.getCurrentEnv().getRefreshManager().handleRefreshTable( + catalogName, + table.getDatabase().getFullName(), + table.getName(), + true); } } - /** - * Called after transaction commit. - * Subclasses can override this to customize post-commit behavior. - * Default: full table refresh. - */ - protected void doAfterCommit() throws DdlException { - // Default: full table refresh - Env.getCurrentEnv().getRefreshManager().handleRefreshTable( - catalogName, - table.getDatabase().getFullName(), - table.getName(), - true); - } - @Override protected void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink) { try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java index f9345a4495caac..64f68454d8449d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java @@ -17,28 +17,20 @@ package org.apache.doris.nereids.trees.plans.commands.insert; -import org.apache.doris.catalog.Env; -import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.datasource.ExternalObjectLog; import org.apache.doris.datasource.ExternalTable; -import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.HMSTransaction; -import org.apache.doris.datasource.hive.HiveMetaStoreCache; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.qe.ConnectContext; -import org.apache.doris.thrift.THivePartitionUpdate; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.TransactionType; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.List; import java.util.Optional; /** @@ -47,8 +39,6 @@ public class HiveInsertExecutor extends BaseExternalTableInsertExecutor { private static final Logger LOG = LogManager.getLogger(HiveInsertExecutor.class); - private List partitionUpdates; - /** * constructor */ @@ -75,46 +65,6 @@ protected void doBeforeCommit() throws UserException { HMSTransaction transaction = (HMSTransaction) transactionManager.getTransaction(txnId); loadedRows = transaction.getUpdateCnt(); transaction.finishInsertTable(((ExternalTable) table).getOrBuildNameMapping()); - - // Save partition updates for cache refresh after commit - partitionUpdates = transaction.getHivePartitionUpdates(); - } - - @Override - protected void doAfterCommit() throws DdlException { - HMSExternalTable hmsTable = (HMSExternalTable) table; - - // For partitioned tables, do selective partition refresh - // For non-partitioned tables, do full table cache invalidation - List modifiedPartNames = Lists.newArrayList(); - List newPartNames = Lists.newArrayList(); - if (hmsTable.isPartitionedTable() && partitionUpdates != null && !partitionUpdates.isEmpty()) { - HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() - .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog()); - cache.refreshAffectedPartitions(hmsTable, partitionUpdates, modifiedPartNames, newPartNames); - } else { - // Non-partitioned table or no partition updates, do full table refresh - Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(hmsTable); - } - - // Write edit log to notify other FEs - ExternalObjectLog log; - if (!modifiedPartNames.isEmpty() || !newPartNames.isEmpty()) { - // Partition-level refresh for other FEs - log = ExternalObjectLog.createForRefreshPartitions( - hmsTable.getCatalog().getId(), - table.getDatabase().getFullName(), - table.getName(), - modifiedPartNames, - newPartNames); - } else { - // Full table refresh for other FEs - log = ExternalObjectLog.createForRefreshTable( - hmsTable.getCatalog().getId(), - table.getDatabase().getFullName(), - table.getName()); - } - Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java index 2ae4ae8e397872..68a0edc430f234 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java @@ -21,16 +21,12 @@ package org.apache.doris.planner; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalTable; -import org.apache.doris.datasource.hive.HiveMetaStoreCache; import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper; -import org.apache.doris.datasource.hive.HivePartition; import org.apache.doris.datasource.hive.HiveProperties; -import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext; import org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext; import org.apache.doris.qe.ConnectContext; @@ -195,32 +191,18 @@ private void setCompressType(THiveTableSink tSink, TFileFormatType formatType) { } private void setPartitionValues(THiveTableSink tSink) throws AnalysisException { - if (ConnectContext.get().getExecutor() != null) { - ConnectContext.get().getExecutor().getSummaryProfile().setSinkGetPartitionsStartTime(); - } - List partitions = new ArrayList<>(); - - List hivePartitions = new ArrayList<>(); - if (targetTable.isPartitionedTable()) { - // Get partitions from cache instead of HMS client (similar to HiveScanNode) - HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() - .getMetaStoreCache((HMSExternalCatalog) targetTable.getCatalog()); - HiveMetaStoreCache.HivePartitionValues partitionValues = - targetTable.getHivePartitionValues(MvccUtil.getSnapshotFromContext(targetTable)); - List> partitionValuesList = - new ArrayList<>(partitionValues.getPartitionValuesMap().values()); - hivePartitions = cache.getAllPartitionsWithCache(targetTable, partitionValuesList); - } - - // Convert HivePartition to THivePartition (same logic as before) - for (HivePartition partition : hivePartitions) { + List hivePartitions = + ((HMSExternalCatalog) targetTable.getCatalog()) + .getClient().listPartitions(targetTable.getRemoteDbName(), targetTable.getRemoteName()); + for (org.apache.hadoop.hive.metastore.api.Partition partition : hivePartitions) { THivePartition hivePartition = new THivePartition(); - hivePartition.setFileFormat(getTFileFormatType(partition.getInputFormat())); - hivePartition.setValues(partition.getPartitionValues()); + StorageDescriptor sd = partition.getSd(); + hivePartition.setFileFormat(getTFileFormatType(sd.getInputFormat())); + hivePartition.setValues(partition.getValues()); THiveLocationParams locationParams = new THiveLocationParams(); - String location = partition.getPath(); + String location = sd.getLocation(); // pass the same of write path and target path to partition locationParams.setWritePath(location); locationParams.setTargetPath(location); @@ -228,12 +210,7 @@ private void setPartitionValues(THiveTableSink tSink) throws AnalysisException { hivePartition.setLocation(locationParams); partitions.add(hivePartition); } - tSink.setPartitions(partitions); - - if (ConnectContext.get().getExecutor() != null) { - ConnectContext.get().getExecutor().getSummaryProfile().setSinkGetPartitionsFinishTime(); - } } private void setSerDeProperties(THiveTableSink tSink) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java index 8f4adff727cb7b..c8bc82e67a14bd 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java @@ -28,7 +28,6 @@ import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.ThriftHMSCachedClient; import org.apache.doris.datasource.property.storage.StorageProperties; -import org.apache.doris.qe.ConnectContext; import mockit.Mock; import mockit.MockUp; @@ -49,13 +48,10 @@ import java.util.function.Function; import java.util.stream.Collectors; - public class HiveTableSinkTest { @Test public void testBindDataSink() throws UserException { - ConnectContext ctx = new ConnectContext(); - ctx.setThreadLocalInfo(); new MockUp() { @Mock @@ -127,11 +123,6 @@ public HMSCachedClient getClient() { private void mockDifferLocationTable(String location) { new MockUp() { - @Mock - public boolean isPartitionedTable() { - return false; - } - @Mock public Set getPartitionColumnNames() { return new HashSet() {{ diff --git a/regression-test/suites/external_table_p0/hive/test_hive_partitions.groovy b/regression-test/suites/external_table_p0/hive/test_hive_partitions.groovy index b8b3a39a2f9b25..cc3425106a59a0 100644 --- a/regression-test/suites/external_table_p0/hive/test_hive_partitions.groovy +++ b/regression-test/suites/external_table_p0/hive/test_hive_partitions.groovy @@ -77,7 +77,6 @@ suite("test_hive_partitions", "p0,external,hive,external_docker,external_docker_ } for (String hivePrefix : ["hive2", "hive3"]) { - setHivePrefix(hivePrefix) try { String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") String catalog_name = "${hivePrefix}_test_partitions" @@ -92,102 +91,6 @@ suite("test_hive_partitions", "p0,external,hive,external_docker,external_docker_ q01() - // Test cache miss scenario: Hive adds partition, then Doris writes to it - def test_cache_miss = { - def dbName = "test_cache_miss_db" - def tblName = "test_cache_miss_table" - - try { - // Clean up - hive_docker """DROP TABLE IF EXISTS ${dbName}.${tblName}""" - hive_docker """DROP DATABASE IF EXISTS ${dbName}""" - - // Create database and partitioned table in Hive - hive_docker """CREATE DATABASE ${dbName}""" - hive_docker """ - CREATE TABLE ${dbName}.${tblName} ( - id INT, - name STRING - ) - PARTITIONED BY (pt INT) - STORED AS ORC - """ - - // Hive writes 3 partitions - hive_docker """ - INSERT INTO ${dbName}.${tblName} PARTITION(pt=1) - VALUES (1, 'hive_pt1') - """ - hive_docker """ - INSERT INTO ${dbName}.${tblName} PARTITION(pt=2) - VALUES (2, 'hive_pt2') - """ - hive_docker """ - INSERT INTO ${dbName}.${tblName} PARTITION(pt=3) - VALUES (3, 'hive_pt3') - """ - - sql """refresh catalog `${catalog_name}`""" - // Doris reads data to populate cache (only knows about 3 partitions) - def result1 = sql """SELECT COUNT(*) as cnt FROM `${catalog_name}`.`${dbName}`.`${tblName}`""" - assertEquals(3, result1[0][0]) - logger.info("Doris cache populated with 3 partitions") - - // Hive writes 4th partition (Doris cache doesn't know about it) - hive_docker """ - INSERT INTO ${dbName}.${tblName} PARTITION(pt=4) - VALUES (4, 'hive_pt4') - """ - logger.info("Hive added 4th partition (pt=4)") - - // Doris writes to the 4th partition - // This should trigger cache miss detection and treat as APPEND instead of NEW - sql """ - INSERT INTO `${catalog_name}`.`${dbName}`.`${tblName}` - VALUES (40, 'doris_pt4', 4) - """ - logger.info("Doris wrote to 4th partition (should handle cache miss)") - - // Verify: should have 5 rows total (3 from hive + 1 from hive pt4 + 1 from doris pt4) - def result2 = sql """SELECT COUNT(*) as cnt FROM `${catalog_name}`.`${dbName}`.`${tblName}`""" - assertEquals(5, result2[0][0]) - - // Verify partition 4 has 2 rows - def result3 = sql """ - SELECT COUNT(*) as cnt - FROM `${catalog_name}`.`${dbName}`.`${tblName}` - WHERE pt = 4 - """ - assertEquals(2, result3[0][0]) - - // Verify data content - def result4 = sql """ - SELECT id, name - FROM `${catalog_name}`.`${dbName}`.`${tblName}` - WHERE pt = 4 - ORDER BY id - """ - assertEquals(2, result4.size()) - assertEquals(4, result4[0][0]) - assertEquals("hive_pt4", result4[0][1]) - assertEquals(40, result4[1][0]) - assertEquals("doris_pt4", result4[1][1]) - - logger.info("Cache miss test passed!") - - } finally { - // Clean up - try { - hive_docker """DROP TABLE IF EXISTS ${dbName}.${tblName}""" - hive_docker """DROP DATABASE IF EXISTS ${dbName}""" - } catch (Exception e) { - logger.warn("Cleanup failed: ${e.message}") - } - } - } - - test_cache_miss() - qt_string_partition_table_with_comma """ select * from partition_tables.string_partition_table_with_comma order by id; """