Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -487,11 +487,34 @@ private static boolean isSyncWithBaseTable(MTMVRefreshContext context, String mt
if (!baseTable.needAutoRefresh()) {
return true;
}
MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(context, Optional.empty());
MTMVSnapshotIf baseTableCurrentSnapshot = getTableSnapshotFromContext(baseTable, context);
return mtmv.getRefreshSnapshot()
.equalsWithBaseTable(mtmvPartitionName, new BaseTableInfo(baseTable), baseTableCurrentSnapshot);
}

/**
* Try context first, then load via getTableSnapshot and cache
*
* @param mtmvRelatedTableIf Base table of materialized views
* @param context The context data persists for the duration of either a refresh task
* or a transparent rewrite operation
* @return The snapshot information of the MTMV
* @throws AnalysisException
*/
public static MTMVSnapshotIf getTableSnapshotFromContext(MTMVRelatedTableIf mtmvRelatedTableIf,
MTMVRefreshContext context)
throws AnalysisException {
BaseTableInfo baseTableInfo = new BaseTableInfo(mtmvRelatedTableIf);
Map<BaseTableInfo, MTMVSnapshotIf> baseTableSnapshotCache = context.getBaseTableSnapshotCache();
if (baseTableSnapshotCache.containsKey(baseTableInfo)) {
return baseTableSnapshotCache.get(baseTableInfo);
}
MTMVSnapshotIf baseTableCurrentSnapshot = mtmvRelatedTableIf.getTableSnapshot(context,
Optional.empty());
baseTableSnapshotCache.put(baseTableInfo, baseTableCurrentSnapshot);
return baseTableCurrentSnapshot;
}

/**
* Generate updated snapshots of partitions to determine if they are synchronized
*
Expand Down Expand Up @@ -538,7 +561,7 @@ private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMVRefres
continue;
}
refreshPartitionSnapshot.addTableSnapshot(baseTableInfo,
((MTMVRelatedTableIf) table).getTableSnapshot(context, Optional.empty()));
getTableSnapshotFromContext((MTMVRelatedTableIf) table, context));
}
return refreshPartitionSnapshot;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@
import org.apache.doris.catalog.MTMV;
import org.apache.doris.common.AnalysisException;

import com.google.common.collect.Maps;

import java.util.Map;
import java.util.Set;

public class MTMVRefreshContext {
private MTMV mtmv;
private Map<String, Set<String>> partitionMappings;
private MTMVBaseVersions baseVersions;
// Within the same context, repeated fetches of the same table's snapshot must return consistent values.
// Hence, the results are cached at this stage.
// The value is loaded/cached on the first fetch
private Map<BaseTableInfo, MTMVSnapshotIf> baseTableSnapshotCache = Maps.newHashMap();

public MTMVRefreshContext(MTMV mtmv) {
this.mtmv = mtmv;
Expand All @@ -44,6 +50,10 @@ public MTMVBaseVersions getBaseVersions() {
return baseVersions;
}

public Map<BaseTableInfo, MTMVSnapshotIf> getBaseTableSnapshotCache() {
return baseTableSnapshotCache;
}

public static MTMVRefreshContext buildContext(MTMV mtmv) throws AnalysisException {
MTMVRefreshContext context = new MTMVRefreshContext(mtmv);
context.partitionMappings = mtmv.calculatePartitionMappings();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.junit.Test;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

Expand Down Expand Up @@ -92,6 +93,10 @@ public void setUp() throws NoSuchMethodException, SecurityException, AnalysisExc
minTimes = 0;
result = versions;

context.getBaseTableSnapshotCache();
minTimes = 0;
result = Maps.newHashMap();

mtmv.getPartitions();
minTimes = 0;
result = Lists.newArrayList(p1);
Expand Down Expand Up @@ -296,4 +301,20 @@ public void testIsTableNamelike() {
Assert.assertFalse(MTMVPartitionUtil.isTableNamelike(new TableName("db1"), tableNameToCheck));
Assert.assertFalse(MTMVPartitionUtil.isTableNamelike(new TableName("ctl1"), tableNameToCheck));
}

@Test
public void testGetTableSnapshotFromContext() throws AnalysisException {
Map<BaseTableInfo, MTMVSnapshotIf> cache = Maps.newHashMap();
new Expectations() {
{
context.getBaseTableSnapshotCache();
minTimes = 0;
result = cache;
}
};
Assert.assertTrue(cache.isEmpty());
MTMVPartitionUtil.getTableSnapshotFromContext(baseOlapTable, context);
Assert.assertEquals(1, cache.size());
Assert.assertEquals(baseSnapshotIf, cache.values().iterator().next());
}
}