diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java index 955bfd4279fd5c..825bdef9f09819 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java @@ -59,6 +59,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Optional; +import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -363,7 +364,7 @@ public MTMVRefreshSnapshot getRefreshSnapshot() { * @return mvPartitionName ==> mvPartitionKeyDesc */ public Map generateMvPartitionDescs() throws AnalysisException { - Map mtmvItems = getAndCopyPartitionItems(); + Map mtmvItems = getAndCopyPartitionItems(OptionalLong.empty()); Map result = Maps.newHashMap(); for (Entry entry : mtmvItems.entrySet()) { result.put(entry.getKey(), entry.getValue().toPartitionKeyDesc()); @@ -392,7 +393,7 @@ public Pair>, Map> calculateDoublyPartit Map baseToMv = Maps.newHashMap(); Map> relatedPartitionDescs = MTMVPartitionUtil .generateRelatedPartitionDescs(mvPartitionInfo, mvProperties); - Map mvPartitionItems = getAndCopyPartitionItems(); + Map mvPartitionItems = getAndCopyPartitionItems(OptionalLong.empty()); for (Entry entry : mvPartitionItems.entrySet()) { Set basePartitionNames = relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(), Sets.newHashSet()); @@ -425,7 +426,7 @@ public Map> calculatePartitionMappings() throws AnalysisExce Map> res = Maps.newHashMap(); Map> relatedPartitionDescs = MTMVPartitionUtil .generateRelatedPartitionDescs(mvPartitionInfo, mvProperties); - Map mvPartitionItems = getAndCopyPartitionItems(); + Map mvPartitionItems = getAndCopyPartitionItems(OptionalLong.empty()); for (Entry entry : mvPartitionItems.entrySet()) { res.put(entry.getKey(), relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(), Sets.newHashSet())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 8a9ce4a2a2a545..547a520a061e63 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -110,6 +110,7 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; +import java.util.OptionalLong; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; @@ -3260,7 +3261,7 @@ public PartitionType getPartitionType() { } @Override - public Map getAndCopyPartitionItems() throws AnalysisException { + public Map getAndCopyPartitionItems(OptionalLong snapshotId) throws AnalysisException { if (!tryReadLock(1, TimeUnit.MINUTES)) { throw new AnalysisException("get table read lock timeout, database=" + getDBName() + ",table=" + getName()); } @@ -3284,7 +3285,8 @@ public List getPartitionColumns() { } @Override - public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context) + public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, + OptionalLong snapshotId) throws AnalysisException { Map partitionVersions = context.getBaseVersions().getPartitionVersions(); long partitionId = getPartitionOrAnalysisException(partitionName).getId(); @@ -3294,7 +3296,7 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont } @Override - public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) { + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, OptionalLong snapshotId) { Map tableVersions = context.getBaseVersions().getTableVersions(); long visibleVersion = tableVersions.containsKey(id) ? tableVersions.get(id) : getVisibleVersion(); return new MTMVVersionSnapshot(visibleVersion, id); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index aacd9268ae35cf..98984467d75b5c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -84,6 +84,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Optional; +import java.util.OptionalLong; import java.util.Set; import java.util.stream.Collectors; @@ -748,7 +749,7 @@ public Set getPartitionColumnNames() { } @Override - public Map getAndCopyPartitionItems() { + public Map getAndCopyPartitionItems(OptionalLong snapshotId) { HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMetaStoreCache((HMSExternalCatalog) getCatalog()); HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues( @@ -763,8 +764,8 @@ public Map getAndCopyPartitionItems() { } @Override - public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context) - throws AnalysisException { + public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, + OptionalLong snapshotId) throws AnalysisException { HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMetaStoreCache((HMSExternalCatalog) getCatalog()); HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues( @@ -776,7 +777,8 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont } @Override - public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException { + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, OptionalLong snapshotId) + throws AnalysisException { if (getPartitionType() == PartitionType.UNPARTITIONED) { return new MTMVMaxTimestampSnapshot(getName(), getLastDdlTime()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index 5645c4e89e726c..632a0da0ebd316 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -67,6 +67,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalLong; import java.util.Set; import java.util.stream.Collectors; @@ -312,7 +313,7 @@ public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { } @Override - public Map getAndCopyPartitionItems() { + public Map getAndCopyPartitionItems(OptionalLong snapshotId) { return Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem()); } @@ -333,7 +334,8 @@ public List getPartitionColumns() { } @Override - public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context) + public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, + OptionalLong snapshotId) throws AnalysisException { PaimonPartition paimonPartition = getPartitionInfoFromCache().getNameToPartition().get(partitionName); if (paimonPartition == null) { @@ -343,7 +345,8 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont } @Override - public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException { + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, OptionalLong snapshotId) + throws AnalysisException { return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java index 1cfb5e021a5309..1bbc51fb004c57 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java @@ -50,6 +50,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.OptionalLong; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -329,7 +330,7 @@ public static boolean isSyncWithPartitions(MTMVRefreshContext context, String mt } for (String relatedPartitionName : relatedPartitionNames) { MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable - .getPartitionSnapshot(relatedPartitionName, context); + .getPartitionSnapshot(relatedPartitionName, context, OptionalLong.empty()); if (!mtmv.getRefreshSnapshot() .equalsWithRelatedPartition(mtmvPartitionName, relatedPartitionName, relatedPartitionCurrentSnapshot)) { @@ -446,7 +447,7 @@ private static boolean isSyncWithBaseTable(MTMVRefreshContext context, String mt if (!baseTable.needAutoRefresh()) { return true; } - MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(context); + MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(context, OptionalLong.empty()); return mtmv.getRefreshSnapshot() .equalsWithBaseTable(mtmvPartitionName, new BaseTableInfo(baseTable), baseTableCurrentSnapshot); } @@ -482,7 +483,7 @@ private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMVRefres MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); for (String relatedPartitionName : relatedPartitionNames) { MTMVSnapshotIf partitionSnapshot = relatedTable - .getPartitionSnapshot(relatedPartitionName, context); + .getPartitionSnapshot(relatedPartitionName, context, OptionalLong.empty()); refreshPartitionSnapshot.getPartitions() .put(relatedPartitionName, partitionSnapshot); } @@ -497,7 +498,7 @@ private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMVRefres continue; } refreshPartitionSnapshot.addTableSnapshot(baseTableInfo, - ((MTMVRelatedTableIf) table).getTableSnapshot(context)); + ((MTMVRelatedTableIf) table).getTableSnapshot(context, OptionalLong.empty())); } return refreshPartitionSnapshot; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescInitGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescInitGenerator.java index 13b58239376116..ef3100dec4c732 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescInitGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescInitGenerator.java @@ -20,6 +20,7 @@ import org.apache.doris.common.AnalysisException; import java.util.Map; +import java.util.OptionalLong; /** * get all related partition descs @@ -29,6 +30,6 @@ public class MTMVRelatedPartitionDescInitGenerator implements MTMVRelatedPartiti @Override public void apply(MTMVPartitionInfo mvPartitionInfo, Map mvProperties, RelatedPartitionDescResult lastResult) throws AnalysisException { - lastResult.setItems(mvPartitionInfo.getRelatedTable().getAndCopyPartitionItems()); + lastResult.setItems(mvPartitionInfo.getRelatedTable().getAndCopyPartitionItems(OptionalLong.empty())); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java index 4a8b14603ce4d6..e18784ae253a0f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; +import java.util.OptionalLong; import java.util.Set; /** @@ -38,9 +39,10 @@ public interface MTMVRelatedTableIf extends TableIf { * Note: This method is called every time there is a refresh and transparent rewrite, * so if this method is slow, it will significantly reduce query performance * + * @param snapshotId * @return partitionName->PartitionItem */ - Map getAndCopyPartitionItems() throws AnalysisException; + Map getAndCopyPartitionItems(OptionalLong snapshotId) throws AnalysisException; /** * getPartitionType LIST/RANGE/UNPARTITIONED @@ -70,12 +72,14 @@ public interface MTMVRelatedTableIf extends TableIf { * If snapshots have already been obtained in bulk in the context, * the results should be obtained directly from the context * + * @param snapshotId * @param partitionName * @param context * @return partition snapshot at current time * @throws AnalysisException */ - MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context) throws AnalysisException; + MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, OptionalLong snapshotId) + throws AnalysisException; /** * getTableSnapshot @@ -83,11 +87,12 @@ public interface MTMVRelatedTableIf extends TableIf { * If snapshots have already been obtained in bulk in the context, * the results should be obtained directly from the context * + * @param snapshotId * @param context * @return table snapshot at current time * @throws AnalysisException */ - MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException; + MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, OptionalLong snapshotId) throws AnalysisException; /** * Does the current type of table allow timed triggering diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java index 997385742dc09a..96ac59b81216bc 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java @@ -35,6 +35,7 @@ import org.junit.Test; import java.util.List; +import java.util.OptionalLong; import java.util.Set; public class MTMVPartitionUtilTest { @@ -112,7 +113,7 @@ public void setUp() throws NoSuchMethodException, SecurityException, AnalysisExc minTimes = 0; result = true; - baseOlapTable.getTableSnapshot((MTMVRefreshContext) any); + baseOlapTable.getTableSnapshot((MTMVRefreshContext) any, (OptionalLong) any); minTimes = 0; result = baseSnapshotIf; @@ -132,7 +133,7 @@ public void setUp() throws NoSuchMethodException, SecurityException, AnalysisExc minTimes = 0; result = true; - baseOlapTable.getPartitionSnapshot(anyString, (MTMVRefreshContext) any); + baseOlapTable.getPartitionSnapshot(anyString, (MTMVRefreshContext) any, (OptionalLong) any); minTimes = 0; result = baseSnapshotIf;