Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
84 commits
Select commit Hold shift + click to select a range
ea2c3c7
1
zddr Oct 14, 2025
e08c923
2
zddr Oct 14, 2025
35cea48
Merge branch 'master' into mtmv_multi_pct_tables
zddr Oct 14, 2025
9877808
[improvement](mtmv) Support partition trace form multi join input or …
seawinde Sep 9, 2024
167adc2
Merge pull request #4 from zddr/support_pct_union_all
seawinde Oct 14, 2025
470a91c
call getRelatedTableInfos
zddr Oct 15, 2025
d562889
call getRelatedTableInfos
zddr Oct 15, 2025
e2b6735
fix
zddr Oct 15, 2025
b210415
fix
zddr Oct 15, 2025
60f9490
fix
zddr Oct 15, 2025
fd1a4e6
fix
zddr Oct 15, 2025
87105a6
compatible multi trace table
seawinde Oct 15, 2025
6c9dfca
compatible multi trace table
seawinde Oct 15, 2025
87cfeb0
Merge pull request #5 from zddr/support_pct_union_all
seawinde Oct 15, 2025
3167967
fix case
zddr Oct 16, 2025
1be567d
Merge remote-tracking branch 'origin/mtmv_multi_pct_tables' into mtmv…
zddr Oct 16, 2025
a868fcb
add case
zddr Oct 16, 2025
d78c306
add case
zddr Oct 16, 2025
0bffad2
add case
zddr Oct 16, 2025
c5903cd
add case
zddr Oct 16, 2025
6f02686
add case
zddr Oct 16, 2025
978eb9e
add case
zddr Oct 16, 2025
c952bd2
add case
zddr Oct 16, 2025
67e9e6f
1
zddr Oct 17, 2025
3dc7e96
1
zddr Oct 17, 2025
041ccb9
Merge remote-tracking branch 'origin/mtmv_multi_pct_tables' into mtmv…
zddr Oct 17, 2025
50a7b2b
support list
zddr Oct 20, 2025
8857d5b
support list
zddr Oct 20, 2025
f1c83f6
add case
zddr Oct 20, 2025
7fafd17
add case
zddr Oct 20, 2025
49edf2b
a
zddr Oct 20, 2025
4f8c88a
Merge branch 'master' into mtmv_multi_pct_tables
zddr Oct 20, 2025
3abacb8
Merge remote-tracking branch 'origin/mtmv_multi_pct_tables' into mtmv…
zddr Oct 20, 2025
227a674
support list
zddr Oct 20, 2025
dcb3fdd
delete filteded tables
zddr Oct 20, 2025
1eac21e
delete filteded tables
zddr Oct 20, 2025
ef03f61
delete filteded tables
zddr Oct 20, 2025
cba7dc4
add case
zddr Oct 20, 2025
11c2a5b
add case
zddr Oct 20, 2025
07c30d3
add case
zddr Oct 21, 2025
49ec6db
1
zddr Oct 21, 2025
ddc487f
add case
zddr Oct 21, 2025
f8beb46
1
zddr Oct 21, 2025
32d8373
add case
zddr Oct 21, 2025
1b9d209
add case
zddr Oct 21, 2025
c59173f
add case
zddr Oct 21, 2025
85162e7
support with and fix bug
seawinde Oct 21, 2025
d7a48e6
Merge pull request #6 from zddr/support_pct_union_all
seawinde Oct 21, 2025
9d04ee2
add case
zddr Oct 21, 2025
51e0581
Merge remote-tracking branch 'origin/mtmv_multi_pct_tables' into mtmv…
zddr Oct 21, 2025
63c1e4f
add case
zddr Oct 21, 2025
30a8953
add case
zddr Oct 21, 2025
e9c583d
add case
zddr Oct 21, 2025
0eb3292
add case
zddr Oct 21, 2025
8f95d54
add case
zddr Oct 21, 2025
c39f5b6
add case
zddr Oct 21, 2025
59ad641
add case
zddr Oct 21, 2025
5b63869
add case
zddr Oct 21, 2025
63611ae
1
zddr Oct 21, 2025
e3852ff
add case
zddr Oct 21, 2025
4e0a766
Merge remote-tracking branch 'origin/mtmv_multi_pct_tables' into mtmv…
zddr Oct 21, 2025
036ba4f
add case
zddr Oct 21, 2025
aa8b0c8
add case
zddr Oct 21, 2025
0391cea
1
zddr Oct 21, 2025
1f60457
add case
zddr Oct 22, 2025
af599cb
fix ut
zddr Oct 22, 2025
ac22c40
add case
zddr Oct 22, 2025
3b226bd
1
zddr Oct 22, 2025
a8154d3
add case
zddr Oct 22, 2025
9196dca
fix regression test and add regression test
seawinde Oct 22, 2025
c3e8859
Merge pull request #7 from zddr/support_pct_union_all
seawinde Oct 22, 2025
9bbabf2
fix p0
zddr Oct 22, 2025
b8fcfdc
Merge remote-tracking branch 'origin/mtmv_multi_pct_tables' into mtmv…
zddr Oct 22, 2025
ed92336
fix p0
zddr Oct 22, 2025
71189ae
add case
zddr Oct 22, 2025
c5e5a7d
add case
zddr Oct 22, 2025
bbaa6fa
add case
zddr Oct 22, 2025
81a576e
1
zddr Oct 22, 2025
1eff8e3
wait to merge1
seawinde Oct 22, 2025
2f4890f
wait to merge2
seawinde Oct 22, 2025
04e2a66
Merge pull request #8 from zddr/support_pct_union_all
seawinde Oct 22, 2025
7832081
fix external
zddr Oct 23, 2025
4a7c96d
fix test
seawinde Oct 24, 2025
0e43d2b
Merge pull request #9 from zddr/support_pct_union_all
seawinde Oct 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 36 additions & 45 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import org.apache.doris.catalog.OlapTableFactory.MTMVParams;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.datasource.CatalogMgr;
import org.apache.doris.info.TableNameInfo;
import org.apache.doris.job.common.TaskStatus;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.EnvInfo;
import org.apache.doris.mtmv.MTMVCache;
import org.apache.doris.mtmv.MTMVJobInfo;
Expand All @@ -40,17 +40,21 @@
import org.apache.doris.mtmv.MTMVRefreshInfo;
import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot;
import org.apache.doris.mtmv.MTMVRefreshSnapshot;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVSnapshotIf;
import org.apache.doris.mtmv.MTMVStatus;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
Expand Down Expand Up @@ -242,6 +246,7 @@ public boolean addTaskResult(MTMVTask task, MTMVRelation relation,
this.status.setRefreshState(MTMVRefreshState.FAIL);
}
this.jobInfo.addHistoryTask(task);
compatiblePctSnapshot(partitionSnapshots);
this.refreshSnapshot.updateSnapshots(partitionSnapshots, getPartitionNames());
Env.getCurrentEnv().getMtmvService()
.refreshComplete(this, relation, task);
Expand Down Expand Up @@ -397,64 +402,26 @@ public Map<String, PartitionKeyDesc> generateMvPartitionDescs() {
return result;
}

/**
* Calculate the partition and associated partition mapping relationship of the MTMV
* It is the result of real-time comparison calculation, so there may be some costs,
* so it should be called with caution.
* The reason for not directly calling `calculatePartitionMappings` and
* generating a reverse index is to directly generate two maps here,
* without the need to traverse them again
*
* @return mvPartitionName ==> relationPartitionNames and relationPartitionName ==> mvPartitionName
* @throws AnalysisException
*/
public Pair<Map<String, Set<String>>, Map<String, String>> calculateDoublyPartitionMappings()
throws AnalysisException {
if (mvPartitionInfo.getPartitionType() == MTMVPartitionType.SELF_MANAGE) {
return Pair.of(Maps.newHashMap(), Maps.newHashMap());
}
long start = System.currentTimeMillis();
Map<String, Set<String>> mvToBase = Maps.newHashMap();
Map<String, String> baseToMv = Maps.newHashMap();
Map<PartitionKeyDesc, Set<String>> relatedPartitionDescs = MTMVPartitionUtil
.generateRelatedPartitionDescs(mvPartitionInfo, mvProperties);
Map<String, PartitionItem> mvPartitionItems = getAndCopyPartitionItems();
for (Entry<String, PartitionItem> entry : mvPartitionItems.entrySet()) {
Set<String> basePartitionNames = relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(),
Sets.newHashSet());
String mvPartitionName = entry.getKey();
mvToBase.put(mvPartitionName, basePartitionNames);
for (String basePartitionName : basePartitionNames) {
baseToMv.put(basePartitionName, mvPartitionName);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("calculateDoublyPartitionMappings use [{}] mills, mvName is [{}]",
System.currentTimeMillis() - start, name);
}
return Pair.of(mvToBase, baseToMv);
}

/**
* Calculate the partition and associated partition mapping relationship of the MTMV
* It is the result of real-time comparison calculation, so there may be some costs,
* so it should be called with caution
*
* @return mvPartitionName ==> relationPartitionNames
* @return mvPartitionName ==> pctTable ==> pctPartitionName
* @throws AnalysisException
*/
public Map<String, Set<String>> calculatePartitionMappings() throws AnalysisException {
public Map<String, Map<MTMVRelatedTableIf, Set<String>>> calculatePartitionMappings() throws AnalysisException {
if (mvPartitionInfo.getPartitionType() == MTMVPartitionType.SELF_MANAGE) {
return Maps.newHashMap();
}
long start = System.currentTimeMillis();
Map<String, Set<String>> res = Maps.newHashMap();
Map<PartitionKeyDesc, Set<String>> relatedPartitionDescs = MTMVPartitionUtil
.generateRelatedPartitionDescs(mvPartitionInfo, mvProperties);
Map<String, Map<MTMVRelatedTableIf, Set<String>>> res = Maps.newHashMap();
Map<PartitionKeyDesc, Map<MTMVRelatedTableIf, Set<String>>> pctPartitionDescs = MTMVPartitionUtil
.generateRelatedPartitionDescs(mvPartitionInfo, mvProperties, getPartitionColumns());
Map<String, PartitionItem> mvPartitionItems = getAndCopyPartitionItems();
for (Entry<String, PartitionItem> entry : mvPartitionItems.entrySet()) {
res.put(entry.getKey(),
relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(), Sets.newHashSet()));
pctPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(), Maps.newHashMap()));
}
if (LOG.isDebugEnabled()) {
LOG.debug("calculatePartitionMappings use [{}] mills, mvName is [{}]",
Expand Down Expand Up @@ -580,4 +547,28 @@ private void compatibleInternal(CatalogMgr catalogMgr) throws Exception {
refreshSnapshot.compatible(this);
}
}

@Override
public void gsonPostProcess() throws IOException {
super.gsonPostProcess();
Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots = refreshSnapshot.getPartitionSnapshots();
compatiblePctSnapshot(partitionSnapshots);
}

private void compatiblePctSnapshot(Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots) {
BaseTableInfo relatedTableInfo = mvPartitionInfo.getRelatedTableInfo();
if (relatedTableInfo == null) {
return;
}
if (MapUtils.isEmpty(partitionSnapshots)) {
return;
}
for (MTMVRefreshPartitionSnapshot partitionSnapshot : partitionSnapshots.values()) {
Map<String, MTMVSnapshotIf> partitions = partitionSnapshot.getPartitions();
Map<BaseTableInfo, Map<String, MTMVSnapshotIf>> pcts = partitionSnapshot.getPcts();
if (!MapUtils.isEmpty(partitions) && MapUtils.isEmpty(pcts)) {
pcts.put(relatedTableInfo, partitions);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3418,7 +3418,7 @@ public List<Column> getPartitionColumns() {
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context,
Optional<MvccSnapshot> snapshot)
throws AnalysisException {
Map<String, Long> partitionVersions = context.getBaseVersions().getPartitionVersions();
Map<String, Long> partitionVersions = context.getBaseVersions().getPartitionVersions(this);
long partitionId = getPartitionOrAnalysisException(partitionName).getId();
long visibleVersion = partitionVersions.containsKey(partitionName) ? partitionVersions.get(partitionName)
: getPartitionOrAnalysisException(partitionName).getVisibleVersion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mtmv.BaseColInfo;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVBaseTableIf;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
Expand Down Expand Up @@ -209,12 +210,14 @@ public void run() throws JobException {
MTMVPlanUtil.ensureMTMVQueryUsable(mtmv, ctx);
}
if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) {
MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable();
if (!relatedTable.isValidRelatedTable()) {
throw new JobException("MTMV " + mtmv.getName() + "'s related table " + relatedTable.getName()
+ " is not a valid related table anymore, stop refreshing."
+ " e.g. Table has multiple partition columns"
+ " or including not supported transform functions.");
Set<MTMVRelatedTableIf> pctTables = mtmv.getMvPartitionInfo().getPctTables();
for (MTMVRelatedTableIf pctTable : pctTables) {
if (!pctTable.isValidRelatedTable()) {
throw new JobException("MTMV " + mtmv.getName() + "'s pct table " + pctTable.getName()
+ " is not a valid pct table anymore, stop refreshing."
+ " e.g. Table has multiple partition columns"
+ " or including not supported transform functions.");
}
}
syncPartitions = MTMVPartitionUtil.alignMvPartition(mtmv);
}
Expand Down Expand Up @@ -578,8 +581,11 @@ protected void closeOrReleaseResources() {
private Map<TableIf, String> getIncrementalTableMap() throws AnalysisException {
Map<TableIf, String> tableWithPartKey = Maps.newHashMap();
if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) {
tableWithPartKey
.put(mtmv.getMvPartitionInfo().getRelatedTable(), mtmv.getMvPartitionInfo().getRelatedCol());
List<BaseColInfo> pctInfos = mtmv.getMvPartitionInfo().getPctInfos();
for (BaseColInfo pctInfo : pctInfos) {
tableWithPartKey
.put(MTMVUtil.getTable(pctInfo.getTableInfo()), pctInfo.getColName());
}
}
return tableWithPartKey;
}
Expand Down
73 changes: 73 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseColInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.mtmv;

import com.google.gson.annotations.SerializedName;

import java.util.Objects;

public class BaseColInfo {
@SerializedName("ti")
private BaseTableInfo tableInfo;
@SerializedName("rn")
private String colName;

public BaseColInfo(String colName, BaseTableInfo tableInfo) {
this.colName = colName;
this.tableInfo = tableInfo;
}

public String getColName() {
return colName;
}

public void setColName(String colName) {
this.colName = colName;
}

public BaseTableInfo getTableInfo() {
return tableInfo;
}

public void setTableInfo(BaseTableInfo tableInfo) {
this.tableInfo = tableInfo;
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
BaseColInfo that = (BaseColInfo) o;
return Objects.equals(tableInfo, that.tableInfo) && Objects.equals(colName, that.colName);
}

@Override
public int hashCode() {
return Objects.hash(tableInfo, colName);
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("BaseColInfo{");
sb.append("colName='").append(colName).append('\'');
sb.append(", tableInfo=").append(tableInfo);
sb.append('}');
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

package org.apache.doris.mtmv;

import com.google.common.collect.Maps;

import java.util.Map;

public class MTMVBaseVersions {
private final Map<Long, Long> tableVersions;
private final Map<String, Long> partitionVersions;
private final Map<MTMVRelatedTableIf, Map<String, Long>> partitionVersions;

public MTMVBaseVersions(Map<Long, Long> tableVersions, Map<String, Long> partitionVersions) {
public MTMVBaseVersions(Map<Long, Long> tableVersions,
Map<MTMVRelatedTableIf, Map<String, Long>> partitionVersions) {
this.tableVersions = tableVersions;
this.partitionVersions = partitionVersions;
}
Expand All @@ -32,7 +35,7 @@ public Map<Long, Long> getTableVersions() {
return tableVersions;
}

public Map<String, Long> getPartitionVersions() {
return partitionVersions;
public Map<String, Long> getPartitionVersions(MTMVRelatedTableIf mtmvRelatedTableIf) {
return partitionVersions.getOrDefault(mtmvRelatedTableIf, Maps.newHashMap());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public static Pair<Boolean, String> compareDynamicPartition(OlapTable originalTa
@VisibleForTesting
public static Pair<Boolean, String> compareAutoPartition(OlapTable originalTable,
OlapTable relatedTable) throws AnalysisException {
if (!isDynamicPartition(relatedTable)) {
if (!isAutoPartition(relatedTable)) {
return Pair.of(false, "relatedTable is not dynamic partition.");
}
FunctionIntervalInfo originalFunctionIntervalInfo = PartitionExprUtil.getFunctionIntervalInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,21 @@ public void analyze(MTMVPartitionInfo mvPartitionInfo) throws AnalysisException
throw new AnalysisException(
String.format("timeUnit not support: %s, only support: %s", this.timeUnit, timeUnits));
}
MTMVRelatedTableIf relatedTable = mvPartitionInfo.getRelatedTable();
PartitionType partitionType = relatedTable.getPartitionType(MvccUtil.getSnapshotFromContext(relatedTable));
if (partitionType == PartitionType.RANGE) {
Type partitionColumnType = MTMVPartitionUtil
.getPartitionColumnType(mvPartitionInfo.getRelatedTable(), mvPartitionInfo.getRelatedCol());
if (!partitionColumnType.isDateType()) {
throw new AnalysisException(
"partitionColumnType should be date/datetime "
+ "when PartitionType is range and expr is date_trunc");
List<BaseColInfo> pctInfos = mvPartitionInfo.getPctInfos();
for (BaseColInfo pctInfo : pctInfos) {
MTMVRelatedTableIf pctTable = MTMVUtil.getRelatedTable(pctInfo.getTableInfo());
PartitionType partitionType = pctTable.getPartitionType(MvccUtil.getSnapshotFromContext(pctTable));
if (partitionType == PartitionType.RANGE) {
Type partitionColumnType = MTMVPartitionUtil
.getPartitionColumnType(pctTable, pctInfo.getColName());
if (!partitionColumnType.isDateType()) {
throw new AnalysisException(
"partitionColumnType should be date/datetime "
+ "when PartitionType is range and expr is date_trunc");
}
} else {
throw new AnalysisException("date_trunc only support range partition");
}
} else {
throw new AnalysisException("date_trunc only support range partition");
}
}

Expand Down Expand Up @@ -125,9 +128,9 @@ private Optional<String> getDateFormat(Map<String, String> mvProperties) {

@Override
public PartitionKeyDesc generateRollUpPartitionKeyDesc(PartitionKeyDesc partitionKeyDesc,
MTMVPartitionInfo mvPartitionInfo) throws AnalysisException {
MTMVPartitionInfo mvPartitionInfo, MTMVRelatedTableIf pctTable) throws AnalysisException {
Type partitionColumnType = MTMVPartitionUtil
.getPartitionColumnType(mvPartitionInfo.getRelatedTable(), mvPartitionInfo.getRelatedCol());
.getPartitionColumnType(pctTable, mvPartitionInfo.getPartitionColByPctTable(pctTable));
// mtmv only support one partition column
Preconditions.checkState(partitionKeyDesc.getLowerValues().size() == 1,
"only support one partition column");
Expand Down Expand Up @@ -243,4 +246,18 @@ private String dateTimeToStr(DateTimeV2Literal literal,
"MTMV not support partition with column type : " + partitionColumnType);
}
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
MTMVPartitionExprDateTrunc that = (MTMVPartitionExprDateTrunc) o;
return Objects.equals(timeUnit, that.timeUnit);
}

@Override
public int hashCode() {
return Objects.hashCode(timeUnit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ String getRollUpIdentity(PartitionKeyDesc partitionKeyDesc, Map<String, String>
* @throws AnalysisException
*/
PartitionKeyDesc generateRollUpPartitionKeyDesc(
PartitionKeyDesc partitionKeyDesc, MTMVPartitionInfo mvPartitionInfo)
PartitionKeyDesc partitionKeyDesc, MTMVPartitionInfo mvPartitionInfo, MTMVRelatedTableIf pctTable)
throws AnalysisException;

/**
Expand Down
Loading