Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhance](mtmv)Enable the MTMVRelatedTableIf interface to support mvcc #44419

Merged
merged 2 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -363,7 +364,7 @@ public MTMVRefreshSnapshot getRefreshSnapshot() {
* @return mvPartitionName ==> mvPartitionKeyDesc
*/
public Map<String, PartitionKeyDesc> generateMvPartitionDescs() throws AnalysisException {
Map<String, PartitionItem> mtmvItems = getAndCopyPartitionItems();
Map<String, PartitionItem> mtmvItems = getAndCopyPartitionItems(OptionalLong.empty());
Map<String, PartitionKeyDesc> result = Maps.newHashMap();
for (Entry<String, PartitionItem> entry : mtmvItems.entrySet()) {
result.put(entry.getKey(), entry.getValue().toPartitionKeyDesc());
Expand Down Expand Up @@ -392,7 +393,7 @@ public Pair<Map<String, Set<String>>, Map<String, String>> calculateDoublyPartit
Map<String, String> baseToMv = Maps.newHashMap();
Map<PartitionKeyDesc, Set<String>> relatedPartitionDescs = MTMVPartitionUtil
.generateRelatedPartitionDescs(mvPartitionInfo, mvProperties);
Map<String, PartitionItem> mvPartitionItems = getAndCopyPartitionItems();
Map<String, PartitionItem> mvPartitionItems = getAndCopyPartitionItems(OptionalLong.empty());
for (Entry<String, PartitionItem> entry : mvPartitionItems.entrySet()) {
Set<String> basePartitionNames = relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(),
Sets.newHashSet());
Expand Down Expand Up @@ -425,7 +426,7 @@ public Map<String, Set<String>> calculatePartitionMappings() throws AnalysisExce
Map<String, Set<String>> res = Maps.newHashMap();
Map<PartitionKeyDesc, Set<String>> relatedPartitionDescs = MTMVPartitionUtil
.generateRelatedPartitionDescs(mvPartitionInfo, mvProperties);
Map<String, PartitionItem> mvPartitionItems = getAndCopyPartitionItems();
Map<String, PartitionItem> mvPartitionItems = getAndCopyPartitionItems(OptionalLong.empty());
for (Entry<String, PartitionItem> entry : mvPartitionItems.entrySet()) {
res.put(entry.getKey(),
relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(), Sets.newHashSet()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -3260,7 +3261,7 @@ public PartitionType getPartitionType() {
}

@Override
public Map<String, PartitionItem> getAndCopyPartitionItems() throws AnalysisException {
public Map<String, PartitionItem> getAndCopyPartitionItems(OptionalLong snapshotId) throws AnalysisException {
if (!tryReadLock(1, TimeUnit.MINUTES)) {
throw new AnalysisException("get table read lock timeout, database=" + getDBName() + ",table=" + getName());
}
Expand All @@ -3284,7 +3285,8 @@ public List<Column> getPartitionColumns() {
}

@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context)
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context,
OptionalLong snapshotId)
throws AnalysisException {
Map<String, Long> partitionVersions = context.getBaseVersions().getPartitionVersions();
long partitionId = getPartitionOrAnalysisException(partitionName).getId();
Expand All @@ -3294,7 +3296,7 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont
}

@Override
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) {
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, OptionalLong snapshotId) {
Map<Long, Long> tableVersions = context.getBaseVersions().getTableVersions();
long visibleVersion = tableVersions.containsKey(id) ? tableVersions.get(id) : getVisibleVersion();
return new MTMVVersionSnapshot(visibleVersion, id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -748,7 +749,7 @@ public Set<String> getPartitionColumnNames() {
}

@Override
public Map<String, PartitionItem> getAndCopyPartitionItems() {
public Map<String, PartitionItem> getAndCopyPartitionItems(OptionalLong snapshotId) {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) getCatalog());
HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues(
Expand All @@ -763,8 +764,8 @@ public Map<String, PartitionItem> getAndCopyPartitionItems() {
}

@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context)
throws AnalysisException {
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context,
OptionalLong snapshotId) throws AnalysisException {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) getCatalog());
HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues(
Expand All @@ -776,7 +777,8 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont
}

@Override
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException {
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, OptionalLong snapshotId)
throws AnalysisException {
if (getPartitionType() == PartitionType.UNPARTITIONED) {
return new MTMVMaxTimestampSnapshot(getName(), getLastDdlTime());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -312,7 +313,7 @@ public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
}

@Override
public Map<String, PartitionItem> getAndCopyPartitionItems() {
public Map<String, PartitionItem> getAndCopyPartitionItems(OptionalLong snapshotId) {
return Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem());
}

Expand All @@ -333,7 +334,8 @@ public List<Column> getPartitionColumns() {
}

@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context)
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context,
OptionalLong snapshotId)
throws AnalysisException {
PaimonPartition paimonPartition = getPartitionInfoFromCache().getNameToPartition().get(partitionName);
if (paimonPartition == null) {
Expand All @@ -343,7 +345,8 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont
}

@Override
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException {
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, OptionalLong snapshotId)
throws AnalysisException {
return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -329,7 +330,7 @@ public static boolean isSyncWithPartitions(MTMVRefreshContext context, String mt
}
for (String relatedPartitionName : relatedPartitionNames) {
MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable
.getPartitionSnapshot(relatedPartitionName, context);
.getPartitionSnapshot(relatedPartitionName, context, OptionalLong.empty());
if (!mtmv.getRefreshSnapshot()
.equalsWithRelatedPartition(mtmvPartitionName, relatedPartitionName,
relatedPartitionCurrentSnapshot)) {
Expand Down Expand Up @@ -446,7 +447,7 @@ private static boolean isSyncWithBaseTable(MTMVRefreshContext context, String mt
if (!baseTable.needAutoRefresh()) {
return true;
}
MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(context);
MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(context, OptionalLong.empty());
return mtmv.getRefreshSnapshot()
.equalsWithBaseTable(mtmvPartitionName, new BaseTableInfo(baseTable), baseTableCurrentSnapshot);
}
Expand Down Expand Up @@ -482,7 +483,7 @@ private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMVRefres
MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable();
for (String relatedPartitionName : relatedPartitionNames) {
MTMVSnapshotIf partitionSnapshot = relatedTable
.getPartitionSnapshot(relatedPartitionName, context);
.getPartitionSnapshot(relatedPartitionName, context, OptionalLong.empty());
refreshPartitionSnapshot.getPartitions()
.put(relatedPartitionName, partitionSnapshot);
}
Expand All @@ -497,7 +498,7 @@ private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMVRefres
continue;
}
refreshPartitionSnapshot.addTableSnapshot(baseTableInfo,
((MTMVRelatedTableIf) table).getTableSnapshot(context));
((MTMVRelatedTableIf) table).getTableSnapshot(context, OptionalLong.empty()));
}
return refreshPartitionSnapshot;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.common.AnalysisException;

import java.util.Map;
import java.util.OptionalLong;

/**
* get all related partition descs
Expand All @@ -29,6 +30,6 @@ public class MTMVRelatedPartitionDescInitGenerator implements MTMVRelatedPartiti
@Override
public void apply(MTMVPartitionInfo mvPartitionInfo, Map<String, String> mvProperties,
RelatedPartitionDescResult lastResult) throws AnalysisException {
lastResult.setItems(mvPartitionInfo.getRelatedTable().getAndCopyPartitionItems());
lastResult.setItems(mvPartitionInfo.getRelatedTable().getAndCopyPartitionItems(OptionalLong.empty()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;

/**
Expand All @@ -38,9 +39,10 @@ public interface MTMVRelatedTableIf extends TableIf {
* Note: This method is called every time there is a refresh and transparent rewrite,
* so if this method is slow, it will significantly reduce query performance
*
* @param snapshotId
* @return partitionName->PartitionItem
*/
Map<String, PartitionItem> getAndCopyPartitionItems() throws AnalysisException;
Map<String, PartitionItem> getAndCopyPartitionItems(OptionalLong snapshotId) throws AnalysisException;

/**
* getPartitionType LIST/RANGE/UNPARTITIONED
Expand Down Expand Up @@ -70,24 +72,27 @@ public interface MTMVRelatedTableIf extends TableIf {
* If snapshots have already been obtained in bulk in the context,
* the results should be obtained directly from the context
*
* @param snapshotId
* @param partitionName
* @param context
* @return partition snapshot at current time
* @throws AnalysisException
*/
MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context) throws AnalysisException;
MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, OptionalLong snapshotId)
throws AnalysisException;

/**
* getTableSnapshot
* It is best to use the version. If there is no version, use the last update time
* If snapshots have already been obtained in bulk in the context,
* the results should be obtained directly from the context
*
* @param snapshotId
* @param context
* @return table snapshot at current time
* @throws AnalysisException
*/
MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException;
MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, OptionalLong snapshotId) throws AnalysisException;

/**
* Does the current type of table allow timed triggering
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.junit.Test;

import java.util.List;
import java.util.OptionalLong;
import java.util.Set;

public class MTMVPartitionUtilTest {
Expand Down Expand Up @@ -112,7 +113,7 @@ public void setUp() throws NoSuchMethodException, SecurityException, AnalysisExc
minTimes = 0;
result = true;

baseOlapTable.getTableSnapshot((MTMVRefreshContext) any);
baseOlapTable.getTableSnapshot((MTMVRefreshContext) any, (OptionalLong) any);
minTimes = 0;
result = baseSnapshotIf;

Expand All @@ -132,7 +133,7 @@ public void setUp() throws NoSuchMethodException, SecurityException, AnalysisExc
minTimes = 0;
result = true;

baseOlapTable.getPartitionSnapshot(anyString, (MTMVRefreshContext) any);
baseOlapTable.getPartitionSnapshot(anyString, (MTMVRefreshContext) any, (OptionalLong) any);
minTimes = 0;
result = baseSnapshotIf;

Expand Down
Loading