Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.ExternalObjectLog;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.persist.OperationType;

import com.google.common.base.Strings;
Expand Down Expand Up @@ -185,24 +183,7 @@ public void replayRefreshTable(ExternalObjectLog log) {
db.get().unregisterTable(log.getTableName());
db.get().resetMetaCacheNames();
} else {
List<String> modifiedPartNames = log.getPartitionNames();
List<String> newPartNames = log.getNewPartitionNames();
if (catalog instanceof HMSExternalCatalog
&& ((modifiedPartNames != null && !modifiedPartNames.isEmpty())
|| (newPartNames != null && !newPartNames.isEmpty()))) {
// Partition-level cache invalidation, only for hive catalog
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) catalog);
cache.refreshAffectedPartitionsCache((HMSExternalTable) table.get(), modifiedPartNames, newPartNames);
LOG.info("replay refresh partitions for table {}, "
+ "modified partitions count: {}, "
+ "new partitions count: {}",
table.get().getName(), modifiedPartNames == null ? 0 : modifiedPartNames.size(),
newPartNames == null ? 0 : newPartNames.size());
} else {
// Full table cache invalidation
refreshTableInternal(db.get(), table.get(), log.getLastUpdateTime());
}
refreshTableInternal(db.get(), table.get(), log.getLastUpdateTime());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ public class SummaryProfile {
public static final String GET_PARTITIONS_TIME = "Get Partitions Time";
public static final String GET_PARTITION_FILES_TIME = "Get Partition Files Time";
public static final String CREATE_SCAN_RANGE_TIME = "Create Scan Range Time";
public static final String SINK_SET_PARTITION_VALUES_TIME = "Sink Set Partition Values Time";
public static final String PLAN_TIME = "Plan Time";
public static final String SCHEDULE_TIME = "Schedule Time";
public static final String ASSIGN_FRAGMENT_TIME = "Fragment Assign Time";
Expand Down Expand Up @@ -159,7 +158,6 @@ public class SummaryProfile {
GET_SPLITS_TIME,
GET_PARTITIONS_TIME,
GET_PARTITION_FILES_TIME,
SINK_SET_PARTITION_VALUES_TIME,
CREATE_SCAN_RANGE_TIME,
DISTRIBUTE_TIME,
GET_META_VERSION_TIME,
Expand Down Expand Up @@ -210,7 +208,6 @@ public class SummaryProfile {
.put(NEREIDS_BE_FOLD_CONST_TIME, 2)
.put(GET_PARTITIONS_TIME, 3)
.put(GET_PARTITION_FILES_TIME, 3)
.put(SINK_SET_PARTITION_VALUES_TIME, 3)
.put(CREATE_SCAN_RANGE_TIME, 2)
.put(GET_PARTITION_VERSION_TIME, 1)
.put(GET_PARTITION_VERSION_COUNT, 1)
Expand Down Expand Up @@ -280,10 +277,6 @@ public class SummaryProfile {
private long getPartitionsFinishTime = -1;
@SerializedName(value = "getPartitionFilesFinishTime")
private long getPartitionFilesFinishTime = -1;
@SerializedName(value = "sinkSetPartitionValuesStartTime")
private long sinkSetPartitionValuesStartTime = -1;
@SerializedName(value = "sinkSetPartitionValuesFinishTime")
private long sinkSetPartitionValuesFinishTime = -1;
@SerializedName(value = "getSplitsFinishTime")
private long getSplitsFinishTime = -1;
@SerializedName(value = "createScanRangeFinishTime")
Expand Down Expand Up @@ -470,8 +463,6 @@ private void updateExecutionSummaryProfile() {
getPrettyTime(getPartitionsFinishTime, getSplitsStartTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(GET_PARTITION_FILES_TIME,
getPrettyTime(getPartitionFilesFinishTime, getPartitionsFinishTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(SINK_SET_PARTITION_VALUES_TIME,
getPrettyTime(sinkSetPartitionValuesFinishTime, sinkSetPartitionValuesStartTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(CREATE_SCAN_RANGE_TIME,
getPrettyTime(createScanRangeFinishTime, getSplitsFinishTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(SCHEDULE_TIME,
Expand Down Expand Up @@ -610,14 +601,6 @@ public void setGetPartitionsFinishTime() {
this.getPartitionsFinishTime = TimeUtils.getStartTimeMs();
}

public void setSinkGetPartitionsStartTime() {
this.sinkSetPartitionValuesStartTime = TimeUtils.getStartTimeMs();
}

public void setSinkGetPartitionsFinishTime() {
this.sinkSetPartitionValuesFinishTime = TimeUtils.getStartTimeMs();
}

public void setGetPartitionFilesFinishTime() {
this.getPartitionFilesFinishTime = TimeUtils.getStartTimeMs();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@ public class ExternalObjectLog implements Writable {
@SerializedName(value = "partitionNames")
private List<String> partitionNames;

@SerializedName(value = "newPartitionNames")
private List<String> newPartitionNames;

@SerializedName(value = "lastUpdateTime")
private long lastUpdateTime;

Expand All @@ -83,17 +80,6 @@ public static ExternalObjectLog createForRefreshTable(long catalogId, String dbN
return externalObjectLog;
}

public static ExternalObjectLog createForRefreshPartitions(long catalogId, String dbName, String tblName,
List<String> modifiedPartNames, List<String> newPartNames) {
ExternalObjectLog externalObjectLog = new ExternalObjectLog();
externalObjectLog.setCatalogId(catalogId);
externalObjectLog.setDbName(dbName);
externalObjectLog.setTableName(tblName);
externalObjectLog.setPartitionNames(modifiedPartNames);
externalObjectLog.setNewPartitionNames(newPartNames);
return externalObjectLog;
}

public static ExternalObjectLog createForRenameTable(long catalogId, String dbName, String tblName,
String newTblName) {
ExternalObjectLog externalObjectLog = new ExternalObjectLog();
Expand Down Expand Up @@ -138,12 +124,6 @@ public String debugForRefreshTable() {
} else {
sb.append("tableId: " + tableId + "]");
}
if (partitionNames != null && !partitionNames.isEmpty()) {
sb.append(", partitionNames: " + partitionNames);
}
if (newPartitionNames != null && !newPartitionNames.isEmpty()) {
sb.append(", newPartitionNames: " + newPartitionNames);
}
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import com.google.common.base.Joiner;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
Expand Down Expand Up @@ -269,48 +268,30 @@ public void finishInsertTable(NameMapping nameMapping) {
insertExistsPartitions.add(Pair.of(pu, hivePartitionStatistics));
break;
case NEW:
// Check if partition really exists in HMS (may be cache miss in Doris)
String partitionName = pu.getName();
if (Strings.isNullOrEmpty(partitionName)) {
// This should not happen for partitioned tables
LOG.warn("Partition name is null/empty for NEW mode in partitioned table, skipping");
break;
}
List<String> partitionValues = HiveUtil.toPartitionValues(partitionName);
boolean existsInHMS = false;
try {
Partition hmsPartition = hiveOps.getClient().getPartition(
nameMapping.getRemoteDbName(),
nameMapping.getRemoteTblName(),
partitionValues);
existsInHMS = (hmsPartition != null);
} catch (Exception e) {
// Partition not found in HMS, treat as truly new
if (LOG.isDebugEnabled()) {
LOG.debug("Partition {} not found in HMS, will create it", pu.getName());
}
}

if (existsInHMS) {
// Partition exists in HMS but not in Doris cache
// Treat as APPEND instead of NEW to avoid creation error
LOG.info("Partition {} already exists in HMS (Doris cache miss), treating as APPEND",
pu.getName());
insertExistsPartitions.add(Pair.of(pu, hivePartitionStatistics));
} else {
// Truly new partition, create it
createAndAddPartition(nameMapping, table, partitionValues, writePath,
pu, hivePartitionStatistics, false);
}
break;
case OVERWRITE:
String overwritePartitionName = pu.getName();
if (Strings.isNullOrEmpty(overwritePartitionName)) {
LOG.warn("Partition name is null/empty for OVERWRITE mode in partitioned table, skipping");
break;
StorageDescriptor sd = table.getSd();
// For object storage (FILE_S3), use writePath to keep original scheme (oss://, cos://)
// For HDFS, use targetPath which is the final path after rename
String pathForHMS = this.fileType == TFileType.FILE_S3
? writePath
: pu.getLocation().getTargetPath();
HivePartition hivePartition = new HivePartition(
nameMapping,
false,
sd.getInputFormat(),
pathForHMS,
HiveUtil.toPartitionValues(pu.getName()),
Maps.newHashMap(),
sd.getOutputFormat(),
sd.getSerdeInfo().getSerializationLib(),
sd.getCols()
);
if (updateMode == TUpdateMode.OVERWRITE) {
dropPartition(nameMapping, hivePartition.getPartitionValues(), true);
}
createAndAddPartition(nameMapping, table, HiveUtil.toPartitionValues(overwritePartitionName),
writePath, pu, hivePartitionStatistics, true);
addPartition(
nameMapping, hivePartition, writePath,
pu.getName(), pu.getFileNames(), hivePartitionStatistics, pu);
break;
default:
throw new RuntimeException("Not support mode:[" + updateMode + "] in partitioned table");
Expand Down Expand Up @@ -396,10 +377,6 @@ public long getUpdateCnt() {
return hivePartitionUpdates.stream().mapToLong(THivePartitionUpdate::getRowCount).sum();
}

public List<THivePartitionUpdate> getHivePartitionUpdates() {
return hivePartitionUpdates;
}

private void convertToInsertExistingPartitionAction(
NameMapping nameMapping,
List<Pair<THivePartitionUpdate, HivePartitionStatistics>> partitions) {
Expand Down Expand Up @@ -1051,37 +1028,6 @@ private void checkNoPartitionAction(NameMapping nameMapping) {
}
}

private void createAndAddPartition(
NameMapping nameMapping,
Table table,
List<String> partitionValues,
String writePath,
THivePartitionUpdate pu,
HivePartitionStatistics hivePartitionStatistics,
boolean dropFirst) {
StorageDescriptor sd = table.getSd();
String pathForHMS = this.fileType == TFileType.FILE_S3
? writePath
: pu.getLocation().getTargetPath();
HivePartition hivePartition = new HivePartition(
nameMapping,
false,
sd.getInputFormat(),
pathForHMS,
partitionValues,
Maps.newHashMap(),
sd.getOutputFormat(),
sd.getSerdeInfo().getSerializationLib(),
sd.getCols()
);
if (dropFirst) {
dropPartition(nameMapping, hivePartition.getPartitionValues(), true);
}
addPartition(
nameMapping, hivePartition, writePath,
pu.getName(), pu.getFileNames(), hivePartitionStatistics, pu);
}

public synchronized void addPartition(
NameMapping nameMapping,
HivePartition partition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Iterables;
Expand Down Expand Up @@ -562,67 +561,6 @@ public void invalidatePartitionCache(ExternalTable dorisTable, String partitionN
}
}

/**
* Selectively refreshes cache for affected partitions based on update information from BE.
* For APPEND/OVERWRITE: invalidate both partition cache and file cache using existing method.
* For NEW: add to partition values cache.
*
* @param table The Hive table whose partitions were modified
* @param partitionUpdates List of partition updates from BE
* @param modifiedPartNames Output list to collect names of modified partitions
* @param newPartNames Output list to collect names of new partitions
*/
public void refreshAffectedPartitions(HMSExternalTable table,
List<org.apache.doris.thrift.THivePartitionUpdate> partitionUpdates,
List<String> modifiedPartNames, List<String> newPartNames) {
if (partitionUpdates == null || partitionUpdates.isEmpty()) {
return;
}

for (org.apache.doris.thrift.THivePartitionUpdate update : partitionUpdates) {
String partitionName = update.getName();
// Skip if partition name is null/empty (non-partitioned table case)
if (Strings.isNullOrEmpty(partitionName)) {
continue;
}

switch (update.getUpdateMode()) {
case APPEND:
case OVERWRITE:
modifiedPartNames.add(partitionName);
break;
case NEW:
newPartNames.add(partitionName);
break;
default:
LOG.warn("Unknown update mode {} for partition {}",
update.getUpdateMode(), partitionName);
break;
}
}

refreshAffectedPartitionsCache(table, modifiedPartNames, newPartNames);
}

public void refreshAffectedPartitionsCache(HMSExternalTable table,
List<String> modifiedPartNames, List<String> newPartNames) {

// Invalidate cache for modified partitions (both partition cache and file cache)
for (String partitionName : modifiedPartNames) {
invalidatePartitionCache(table, partitionName);
}

// Add new partitions to partition values cache
if (!newPartNames.isEmpty()) {
addPartitionsCache(table.getOrBuildNameMapping(), newPartNames,
table.getPartitionColumnTypes(Optional.empty()));
}

// Log summary
LOG.info("Refreshed cache for table {}: {} modified partitions, {} new partitions",
table.getName(), modifiedPartNames.size(), newPartNames.size());
}

public void invalidateDbCache(String dbName) {
long start = System.currentTimeMillis();
Set<PartitionValueCacheKey> keys = partitionValuesCache.asMap().keySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.doris.nereids.trees.plans.commands.insert;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.SummaryProfile;
Expand Down Expand Up @@ -111,26 +110,14 @@ protected void onComplete() throws UserException {
}
summaryProfile.ifPresent(SummaryProfile::setTransactionEndTime);
txnStatus = TransactionStatus.COMMITTED;

// Handle post-commit operations (e.g., cache refresh)
doAfterCommit();
Env.getCurrentEnv().getRefreshManager().handleRefreshTable(
catalogName,
table.getDatabase().getFullName(),
table.getName(),
true);
}
}

/**
* Called after transaction commit.
* Subclasses can override this to customize post-commit behavior.
* Default: full table refresh.
*/
protected void doAfterCommit() throws DdlException {
// Default: full table refresh
Env.getCurrentEnv().getRefreshManager().handleRefreshTable(
catalogName,
table.getDatabase().getFullName(),
table.getName(),
true);
}

@Override
protected void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink) {
try {
Expand Down
Loading
Loading