Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.ExternalObjectLog;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.persist.OperationType;

import com.google.common.base.Strings;
Expand Down Expand Up @@ -183,13 +185,20 @@ public void replayRefreshTable(ExternalObjectLog log) {
db.get().unregisterTable(log.getTableName());
db.get().resetMetaCacheNames();
} else {
List<String> partitionNames = log.getPartitionNames();
if (partitionNames != null && !partitionNames.isEmpty()) {
// Partition-level cache invalidation
Env.getCurrentEnv().getExtMetaCacheMgr()
.invalidatePartitionsCache(table.get(), partitionNames);
LOG.info("replay refresh partitions for table {}, partitions count: {}",
table.get().getName(), partitionNames.size());
List<String> modifiedPartNames = log.getPartitionNames();
List<String> newPartNames = log.getNewPartitionNames();
if (catalog instanceof HMSExternalCatalog
&& ((modifiedPartNames != null && !modifiedPartNames.isEmpty())
|| (newPartNames != null && !newPartNames.isEmpty()))) {
// Partition-level cache invalidation, only for hive catalog
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) catalog);
cache.refreshAffectedPartitionsCache((HMSExternalTable) table.get(), modifiedPartNames, newPartNames);
LOG.info("replay refresh partitions for table {}, "
+ "modified partitions count: {}, "
+ "new partitions count: {}",
table.get().getName(), modifiedPartNames == null ? 0 : modifiedPartNames.size(),
newPartNames == null ? 0 : newPartNames.size());
} else {
// Full table cache invalidation
refreshTableInternal(db.get(), table.get(), log.getLastUpdateTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ public class ExternalObjectLog implements Writable {
@SerializedName(value = "partitionNames")
private List<String> partitionNames;

@SerializedName(value = "newPartitionNames")
private List<String> newPartitionNames;

@SerializedName(value = "lastUpdateTime")
private long lastUpdateTime;

Expand All @@ -81,12 +84,13 @@ public static ExternalObjectLog createForRefreshTable(long catalogId, String dbN
}

public static ExternalObjectLog createForRefreshPartitions(long catalogId, String dbName, String tblName,
List<String> partitionNames) {
List<String> modifiedPartNames, List<String> newPartNames) {
ExternalObjectLog externalObjectLog = new ExternalObjectLog();
externalObjectLog.setCatalogId(catalogId);
externalObjectLog.setDbName(dbName);
externalObjectLog.setTableName(tblName);
externalObjectLog.setPartitionNames(partitionNames);
externalObjectLog.setPartitionNames(modifiedPartNames);
externalObjectLog.setNewPartitionNames(newPartNames);
return externalObjectLog;
}

Expand Down Expand Up @@ -134,6 +138,12 @@ public String debugForRefreshTable() {
} else {
sb.append("tableId: " + tableId + "]");
}
if (partitionNames != null && !partitionNames.isEmpty()) {
sb.append(", partitionNames: " + partitionNames);
}
if (newPartitionNames != null && !newPartitionNames.isEmpty()) {
sb.append(", newPartitionNames: " + newPartitionNames);
}
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -573,16 +573,16 @@ public void invalidatePartitionCache(ExternalTable dorisTable, String partitionN
*
* @param table The Hive table whose partitions were modified
* @param partitionUpdates List of partition updates from BE
* @param modifiedPartNames Output list to collect names of modified partitions
* @param newPartNames Output list to collect names of new partitions
*/
public void refreshAffectedPartitions(HMSExternalTable table,
List<org.apache.doris.thrift.THivePartitionUpdate> partitionUpdates) {
List<org.apache.doris.thrift.THivePartitionUpdate> partitionUpdates,
List<String> modifiedPartNames, List<String> newPartNames) {
if (partitionUpdates == null || partitionUpdates.isEmpty()) {
return;
}

List<String> modifiedPartitionNames = new ArrayList<>();
List<String> newPartitionNames = new ArrayList<>();

for (org.apache.doris.thrift.THivePartitionUpdate update : partitionUpdates) {
String partitionName = update.getName();
// Skip if partition name is null/empty (non-partitioned table case)
Expand All @@ -593,10 +593,10 @@ public void refreshAffectedPartitions(HMSExternalTable table,
switch (update.getUpdateMode()) {
case APPEND:
case OVERWRITE:
modifiedPartitionNames.add(partitionName);
modifiedPartNames.add(partitionName);
break;
case NEW:
newPartitionNames.add(partitionName);
newPartNames.add(partitionName);
break;
default:
LOG.warn("Unknown update mode {} for partition {}",
Expand All @@ -605,20 +605,26 @@ public void refreshAffectedPartitions(HMSExternalTable table,
}
}

refreshAffectedPartitionsCache(table, modifiedPartNames, newPartNames);
}

public void refreshAffectedPartitionsCache(HMSExternalTable table,
List<String> modifiedPartNames, List<String> newPartNames) {

// Invalidate cache for modified partitions (both partition cache and file cache)
for (String partitionName : modifiedPartitionNames) {
for (String partitionName : modifiedPartNames) {
invalidatePartitionCache(table, partitionName);
}

// Add new partitions to partition values cache
if (!newPartitionNames.isEmpty()) {
addPartitionsCache(table.getOrBuildNameMapping(), newPartitionNames,
if (!newPartNames.isEmpty()) {
addPartitionsCache(table.getOrBuildNameMapping(), newPartNames,
table.getPartitionColumnTypes(Optional.empty()));
}

// Log summary
LOG.info("Refreshed cache for table {}: {} modified partitions, {} new partitions",
table.getName(), modifiedPartitionNames.size(), newPartitionNames.size());
table.getName(), modifiedPartNames.size(), newPartNames.size());
}

public void invalidateDbCache(String dbName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@
import org.apache.doris.transaction.TransactionType;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

Expand Down Expand Up @@ -86,34 +86,27 @@ protected void doAfterCommit() throws DdlException {

// For partitioned tables, do selective partition refresh
// For non-partitioned tables, do full table cache invalidation
List<String> affectedPartitionNames = null;
List<String> modifiedPartNames = Lists.newArrayList();
List<String> newPartNames = Lists.newArrayList();
if (hmsTable.isPartitionedTable() && partitionUpdates != null && !partitionUpdates.isEmpty()) {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog());
cache.refreshAffectedPartitions(hmsTable, partitionUpdates);

// Collect partition names for edit log
affectedPartitionNames = new ArrayList<>();
for (THivePartitionUpdate update : partitionUpdates) {
String partitionName = update.getName();
if (partitionName != null && !partitionName.isEmpty()) {
affectedPartitionNames.add(partitionName);
}
}
cache.refreshAffectedPartitions(hmsTable, partitionUpdates, modifiedPartNames, newPartNames);
} else {
// Non-partitioned table or no partition updates, do full table refresh
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(hmsTable);
}

// Write edit log to notify other FEs
ExternalObjectLog log;
if (affectedPartitionNames != null && !affectedPartitionNames.isEmpty()) {
if (!modifiedPartNames.isEmpty() || !newPartNames.isEmpty()) {
// Partition-level refresh for other FEs
log = ExternalObjectLog.createForRefreshPartitions(
hmsTable.getCatalog().getId(),
table.getDatabase().getFullName(),
table.getName(),
affectedPartitionNames);
modifiedPartNames,
newPartNames);
} else {
// Full table refresh for other FEs
log = ExternalObjectLog.createForRefreshTable(
Expand Down
Loading